當前位置:
首頁 > 知識 > 大頂堆的原理及Python實現

大頂堆的原理及Python實現


程序員大咖

點擊右側關注,免費進階高級!





作者:

李小文


Github: https://gith

ub.com/tushu

shu




提到大頂堆相信大家應該都不會覺得陌生,大名鼎鼎的KNN演算法就用到了大頂堆。本文就大頂堆的基本原理進行講解,並手把手、肩並肩地帶您實現這一演算法。

完整實現代碼請參考本人的github:

https:

//github.com/tushushu/imylu/blob/master/imylu/neighbors/max_heap.py


https:

//github.com/tushushu/imylu/blob/master/examples/max_heap_example.py



1. 原理篇


我們用大白話講講大頂堆是怎麼一回事。


1.1 什麼是「堆」


在實際生活中,「堆」非常常見,比如工地旁邊會有「土堆」,一些垃圾站會有「垃圾堆」。這些「堆」通常都是由一些相似的物體放在一起,形成上窄下寬的結構。


1.2 完全二叉樹


百度百科說:對於深度為K的,有n個節點的二叉樹,當且僅當其每一個節點都與深度為K的滿二叉樹中編號從1至n的節點一一對應時稱之為完全二叉樹。 這描述讓人聽起來有點懵逼,說得簡單點,完全二叉樹只有兩種情況:


(1) 完美二叉樹,即每一層節點數都是上一層的二倍


(2) 完美二叉樹扣掉若干個節點,"扣"的順序是由下向上、由右向左


1.2 什麼是「大頂堆」


如下圖所示,計算機中的「大頂堆」就是把數據放在一顆完全二叉樹中,形狀看上去跟我們提到的「土堆」,「垃圾堆」差不多。跟普通二叉樹的區別就是,每個父節點的值都大於子節點的值,即兒子不如爹,所以用大頂堆來描述「富不過三代」再貼切不過。


1.3 如何建立大頂堆


建立一個大頂堆需要告訴計算機:這,就是大頂堆!然後要說明這個大頂堆目前的大小是0,未來不能超過多大。由於大頂堆是個完全二叉樹,層序遍歷的時候元素都是連續的,中間沒有「空位」,所以很方便用數組來表示這棵樹。那麼我們就再開闢一個數組,用於存儲大頂堆的元素。


1.4 元素上浮


之前說過大頂堆的特徵是「兒子不如爹」,那麼如果大頂堆的最後一個元素比爹還大,那麼這個兒子就要升級當爹了,爹也要降級為兒子,聽起來有點亂…這個過程就是元素的上浮過程。如果上浮一次之後,發現兒子還是比爹大,就繼續上浮,直到上浮到爹比兒子大或者上浮到堆頂為止。


1.5 元素下沉


同理,如果大頂堆的第一個元素比兒子還小,那麼這個爹就要降級為兒子了,兒子也要升級當爹,聽起來仍然有點亂…這個過程就是元素的下沉過程。如果下沉一次之後,發現爹還是比兒子小,就繼續下沉,直到下沉到兒子比爹小或者下沉到堆底為止。


1.6 插入元素


在大頂堆中插入一個元素,分為如下兩種情況:
(1) 堆未滿,將元素放在當前最後一個元素的後面,然後執行上浮過程;(2) 堆已滿,如果該元素大於堆頂則無法插入,小於堆頂則替換堆頂,再執行下沉過程。


1.7 推出頂部元素


大頂堆的交換頂部元素A和最後一個元素B,堆的size減1,再將頂部的B執行下沉過程,最後返回元素A。注意,雖然堆的size減小了1,但實際上並沒有元素被刪除,數組長度也沒有任何變化,被pop的元素只是被放在了數組中size之後的位置。


1.8 大頂堆有什麼用


大頂堆的典型應用有3個: (1) 堆排序降序,我們把頂點元素不停地pop出來,由於每次pop出的元素都是當時最大的,所以把pop的值收集起來就是一個降序數組; (2) 堆排序升序,同方法(1),由於頂點元素每次都被pop方法放在了數組的最後一個元素的位置,所以全部pop完畢之後堆中的數組已經是一個升序數組; (3) 從N個元素中查找最小的K個元素,把N個元素逐個插入大小為K的大頂堆中,最後大頂堆中的元素就是我們要找的TOP K。


2. 實現篇


本人用全宇宙最簡單的編程語言——Python實現了大頂堆演算法,沒有依賴任何第三方庫,便於學習和使用。簡單說明一下實現過程,更詳細的注釋請參考本人github上的代碼。


2.1 創建MaxHeap類


初始化,存儲最大元素數量、元素值計算函數、元素列表,當前元素數量。

class

 

MaxHeap

(

object

):


    

def

 

__init__(

self

, max_size, fn)

:
        

self

.max_size = max_size
        

self

.fn = fn
        

self

.items = [None] * max_size
        

self

.size = 

0



2.2 獲取大頂堆各個屬性

def

 

__str__(

self

)

:
    item_values = str([

self

.fn(

self

.items[i]) 

for

 i 

in

 range(

self

.size)])
    

return

 

"Size: %d
Max size: %d
Item_values: %s
"

 % (

self

.size, 

self

.max_size, item_values)

2.3 檢查大頂堆是否已滿

@property

def

 

full(

self

)

:
    

return

 

self

.size == 

self

.max_size

2.4 添加元素

def

 

add(

self

, item)

:
    

if

 

self

.

full:


        

if

 

self

.fn(item) < 

self

.value(

0

):
            

self

.items[

0

] = item
            

self

._shift_down(

0

)
    

else:


        

self

.items[

self

.size] = item
        

self

.size += 

1


        

self

._shift_up(

self

.size - 

1

)

2.5 推出頂部元素

def

 

pop(

self

)

:
    assert 

self

.size > 

0

"Cannot pop item! The MaxHeap is empty!"


    ret = 

self

.items[

0

]
    

self

.items[

0

] = 

self

.items[

self

.size - 

1

]
    

self

.items[

self

.size - 

1

] = None
    

self

.size -= 

1


    

self

._shift_down(

0

)
    

return

 ret

2.6 元素上浮

def _shift_up(

self

, idx):
    assert idx < 

self

.size, 

"The parameter idx must be less than heap"s size!"


    

parent

 = (idx - 

1

// 2


    

while

 

parent

 >= 

0

 

and

 

self

.value(

parent

) < 

self

.value(idx):
        

self

.items[

parent

], 

self

.items[idx] = 

self

.items[idx], 

self

.items[

parent

]
        idx = 

parent


        

parent

 = (idx - 

1

// 2



2.7 元素下沉

def

 

_shift_down(

self

, idx)

:
    child = (idx + 

1

) * 

2

 - 

1


    

while

 child < 

self

.

size:


        

if

 child + 

1

 < 

self

.size 

and

 

self

.value(child + 

1

) > 

self

.value(child):
            child += 

1


        

if

 

self

.value(idx) < 

self

.value(child):
            

self

.items[idx], 

self

.items[child] = 

self

.items[child], 

self

.items[idx]
            idx = child
            child = (idx + 

1

) * 

2

 - 

1


        

else:


            

break



3 效果評估


3.1 heap有效性校驗


檢查當前堆是否符合大頂堆的定義。

def 

is_valid

(

heap

):
    ret 

= []
    

for

 i 

in

 

range

(

1

, heap.size

):
        parent 

= (i - 

1

// 2


        ret.append(heap.

value

(parent) >= heap.

value

(i))
    

return

 all(ret)

3.2 線性查找


用「笨」辦法查找最小的k個元素。

def

 

exhausted_search(nums, k)

:


    rets = []
    idxs = []
    key = 

None


    

for

 _ 

in

 range(k):
        val = float(

"inf"

)
        

for

 i, num 

in

 enumerate(nums):
            

if

 num < val 

and

 i 

not

 

in

 idxs:
                key = i
                val = num
        idxs.append(key)
        rets.append(val)
    

return

 rets

3.3 main函數


主函數分為如下幾個部分:




  1. 隨機生成數據集,即測試用例



  2. 建立大頂堆



  3. 執行「笨」辦法查找



  4. 比較「笨」辦法和大頂堆的查找結果

def main():
    

# Test


    

print

(

"Testing MaxHeap..."

)
    test_times = 

100


    run_time_1 = run_time_2 = 

0


    

for

 

_

 in range(test_times):
        

# Generate dataset randomly


        low = 

0


        high = 

1000


        n_rows = 

10000


        k = 

100


        nums = gen_data(low, high, n_rows)

        

# Build Max Heap


        heap = MaxHeap(k, lambda 

x

x

)
        start = 

time

()
        

for

 num in nums:
            heap.add(num)
        ret1 = copy(heap.items)
        run_time_1 += 

time

() - start

        

# Exhausted search


        start = 

time

()
        ret2 = exhausted_search(nums, k)
        run_time_2 += 

time

() - start

        

# Compare result


        ret1.sort()
        assert ret1 == ret2, 

"target:%s
k:%d
restult1:%s
restult2:%s
"

 % (
            str(nums), k, str(ret1), str(ret2))
    

print

(

"%d tests passed!"

 % test_times)
    

print

(

"Max Heap Search %.2f s"

 % run_time_1)
    

print

(

"Exhausted search %.2f s"

 % run_time_2)

3.2 效果展示


針對top k查找隨機生成了100個測試用例,線性查找用時8.76秒,大頂堆用時1.74秒,效果還算不錯~


【點擊成為源碼大神】

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 Python開發 的精彩文章:

其實,我一直都喜歡你...
這個產品經理這麼萌,需求肯定都答應的啦

TAG:Python開發 |