當前位置:
首頁 > 知識 > Gevent 調度流程解析

Gevent 調度流程解析

(點擊

上方藍字

,快速關注我們)




來源:xybaby 


www.cnblogs.com/xybaby/p/6370799.html


如有好文章投稿,請點擊 → 這裡了解詳情




gevent是目前應用非常廣泛的網路庫,高效的輪詢IO庫libev加上greenlet實現的協程(coroutine),使得gevent的性能非常出色,尤其是在web應用中。



本文介紹gevent的調度流程,主要包括gevent對greenlet的封裝和使用,以及greenlet與libev的協作。閱讀本文需要對greenlet有一定的認識,可以參考這篇文章,另外,本文分析的gevent版本為1.2,可以通過gevent.version_info查看版本號。




gevent簡介:




gevent是基於協程(greenlet)的網路庫,底層的事件輪詢基於libev(早期是libevent),gevent的API概念和Python標準庫一致(如事件,隊列)。gevent有一個很有意思的東西-monkey-patch,能夠使python標準庫中的阻塞操作變成非同步,如socket的讀寫。




gevent來源於eventlet,自稱比後者實現更簡單、API更方便且性能更好,許多開源的web伺服器也使用了gevent,如gunicorn、paste,當然gevent本生也可以作為一個python web伺服器使用。這篇文章對常見的wsgi server進行性能對比,gevent不管在http1.0還是http1.1都表現非常出色。下圖是目前常用的http1.1標準下的表現:






gevent高效的秘訣就是greenlet和libev啦,greenlet在之前的博文有介紹,gevent對greenlet的使用比較限制,只能在兩層協程之間切換,簡單也不容易出錯。libev使用輪訓非阻塞的方式進行事件處理,比如unix下的epoll。早期gevent使用libevent,後來替換成libev,因為libev「提供更少的核心功能以求更改的效率」,這裡有libev和libevent的性能對比:







greenlet回顧




如果想了解gevent的調度流程,最重要的是對greenlet有基本的了解。下面總結一些個人認為比較重要的點:





  1. 每一個greenlet.greenlet實例都有一個parent(可指定,默認為創生新的greenlet.greenlet所在環境),當greenlet.greenlet實例執行完邏輯正常結束、或者拋出異常結束時,執行邏輯切回到其parent。



  2. 可以繼承greenlet.greenlet,子類需要實現run方法,當調用greenlet.switch方法時會調用到這個run方法




在gevent中,有兩個類繼承了greenlet.greenlet,分別是gevent.hub.Hub和gevent.greenlet.Greenlet。後文中,如果是greenlet.greenlet這種寫法,那麼指的是原生的類庫greentlet,如果是greenlet(或者Greenlet)那麼指gevent封裝後的greenlet。




greenlet調度流程




首先,給出總結性的結論,後面再結合實例和源碼一步步分析。



每個gevent線程都有一個hub,前面提到hub是greenlet.greenlet的實例。hub實例在需要的時候創生(Lazy Created),那麼其parent是main greenlet。之後任何的Greenlet(注意是greenlet.greenlet的子類)實例的parent都設置成hub。hub調用libev提供的事件循環來處理Greenlet代表的任務,當Greenlet實例結束(正常或者異常)之後,執行邏輯又切換到hub。




gevent調度示例1:




我們看下面最簡單的代碼:





>>> 

import

 

gevent


>>> 

gevent

.

sleep

(

1

)




上面的代碼很簡單,但事實上gevent的核心都包含在其中,接下來結合源碼進行分析。




首先看sleep函數(gevent.hub.sleep):





def sleep

(

seconds

=

0

,

 

ref

=

True

)

:


    

hub

 = 

get_hub

()


    

loop

 = 

hub

.

loop


    

if

 

seconds

 <= 

0

:


        

waiter

 = 

Waiter

()


        

loop

.

run_callback

(

waiter

.

switch

)


        

waiter

.

get

()


    

else

:


        

hub

.

wait

(

loop

.

timer

(

seconds

,

 

ref

=

ref

))




首先是獲取hub(第2行),然後在hub上wait這個定時器事件(第9行)。get_hub源碼如下(gevent.hub.get_hub):





def get_hub

(

*

args

,

 **

kwargs

)

:


    

"""


    Return the hub for the current thread.


 


    """


    

hub

 = 

_threadlocal

.

hub


    

if

 

hub 

is

 

None

:


        

hubtype

 = 

get_hub_class

()


        

hub

 = 

_threadlocal

.

hub

 = 

hubtype

(

*

args

,

 **

kwargs

)


    

return

 

hub




可以看到,hub是線程內唯一的,之前也提到過greenlet是線程獨立的,每個線程有各自的greenlet棧。hubtype默認就是gevent.hub.Hub,在hub的初始化函數(__init__)中,會創建loop屬性,默認也就是libev的python封裝。




回到sleep函數定義,hub.wait(loop.timer(seconds, ref=ref))。hub.wait函數非常關鍵,對於任何阻塞性操作,比如timer、io都會調用這個函數,其作用一句話概括:從當前協程切換到hub,直到watcher對應的事件就緒再從hub切換回來。wait函數源碼如下(gevent.hub.Hub.wait):





def wait

(

self

,

 

watcher

)

:


        

"""


        Wait until the *watcher* (which should not be started) is ready.


 


        """


        

waiter

 = 

Waiter

()


        

unique

 = 

object

()


        

watcher

.

start

(

waiter

.

switch

,

 

unique

)


        

try

:


            

result

 = 

waiter

.

get

()


            

if

 

result 

is

 

not

 

unique

:


                

raise InvalidSwitchError

(

"Invalid switch into %s: %r (expected %r)"

 % 

(

getcurrent

(),

 

result

,

 

unique

))


        

finally

:


            

watcher

.

stop

()




形參watcher就是loop.timer實例,其cython描述在corecext.pyx,我們簡單理解成是一個定時器事件就行了。上面的代碼中,創建了一個Waiter(gevent.hub.Waiter)對象,這個對象起什麼作用呢,這個類的doc寫得非常清楚:





Waiter

.

__doc_

_


 


A

 

low level communication utility 

for

 

greenlets

.


 


Waiter 

is

 

a

 

wrapper around 

greenlet

"

s

 

``

switch

()``

 

and

 

``

throw

()``

 

calls that makes them somewhat 

safer

:


 


switching will occur only 

if

 

the waiting greenlet 

is

 

executing

 :

meth

:

`

get

`

 

method 

currently

;


any

 

error raised 

in

 

the greenlet 

is

 

handled 

inside

 :

meth

:

`

switch

`

 

and

 :

meth

:

`

throw

`


if

 :

meth

:

`

switch

`

/:

meth

:

`

throw

`

 

is

 

called before the receiver 

calls

 :

meth

:

`

get

`,

 

then

 :

class

:

`

Waiter

`


will store the 

value

/

exception

.

 

The 

following

 :

meth

:

`

get

`

 

will 

return

 

the 

value

/

raise

 

the exception




簡而言之,是對greenlet.greenlet類switch 和 throw函數的分裝,用來存儲返回值greenlet的返回值或者捕獲在greenlet中拋出的異常。我們知道,在原生的greenlet中,如果一個greenlet拋出了異常,那麼該異常將會展開至其parent greenlet。




回到Hub.wait函數,第8行 watcher.start(waiter.switch, unique) 註冊了一個回調,在一定時間(1s)之後調用回調函數waiter.switch。注意,waiter.switch此時並沒有執行。然後第10行調用waiter.get。看看這個get函數(gevent.hub.Waiter.get):





def get

(

self

)

:


        

"""If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""


        

if

 

self

.

_exception 

is

 

not

 

_NONE

:


            

if

 

self

.

_exception 

is

 

None

:


                

return

 

self

.

value


            

else

:


                

getcurrent

().

throw

(

*

self

.

_exception

)


        

else

:


            

if

 

self

.

greenlet 

is

 

not

 

None

:


                

raise ConcurrentObjectUseError

(

"This Waiter is already used by %r"

 % 

(

self

.

greenlet

,

 

))


            

self

.

greenlet

 = 

getcurrent

()

 

# 存儲當前協程,之後從hub switch回來的時候使用


            

try

:


                

return

 

self

.

hub

.

switch

()

 

# switch到hub


            

finally

:


                

self

.

greenlet

 = 

None




核心的邏輯在第11到15行,11行中,getcurrent獲取當前的greenlet(在這個測試代碼中,是main greenlet,即最原始的greenlet),將其複製給waiter.greenlet。然後13行switch到hub,在greenlet回顧章節的第二條提到,greenlet.greenlet的子類需要重寫run方法,當調用子類的switch時會調用到該run方法。Hub的run方法實現如下:





def run

(

self

)

:


        

"""


        Entry-point to running the loop. This method is called automatically


        when the hub greenlet is scheduled; do not call it directly.


 


        :raises LoopExit: If the loop finishes running. This means


           that there are no other scheduled greenlets, and no active


           watchers or servers. In some situations, this indicates a


           programming error.


        """


        

assert

 

self

 

is

 

getcurrent

(),

 

"Do not call Hub.run() directly"


        

while

 

True

:


            

loop

 = 

self

.

loop


            

loop

.

error_handler

 = 

self


            

try

:


                

loop

.

run

()


            

finally

:


                

loop

.

error_handler

 = 

None

  

# break the refcount cycle


            

self

.

parent

.

throw

(

LoopExit

(

"This operation would block forever"

,

 

self

))



loop自然是libev的事件循環。doc中提到,這個loop理論上會一直循環,如果結束,那麼表明沒有任何監聽的事件(包括IO 定時等)。之前在Hub.wait函數中註冊了定時器,那麼在這個run中,如果時間到了,那麼會調用定時器的callback,也就是之前的waiter.switch, 我們再來看看這個函數(gevent.hub.Waiter.switch):





def 

switch

(

self

,

 

value

=

None

)

:


        

"""Switch to the greenlet if one"s available. Otherwise store the value."""


        

greenlet

 = 

self

.

greenlet


        

if

 

greenlet 

is

 

None

:


            

self

.

value

 = 

value


            

self

.

_exception

 = 

None


        

else

:


            

assert

 

getcurrent

()

 

is

 

self

.

hub

,

 

"Can only use Waiter.switch method from the Hub greenlet"


            

switch

 = 

greenlet

.

switch


            

try

:


                

switch

(

value

)


            

except

:


                

self

.

hub

.

handle_error

(

switch

,

 *

sys

.

exc_info

())




核心代碼在第8到13行,第8行保證調用到該函數的時候一定在hub這個協程中,這是很自然的,因為這個函數一定是在Hub.run中被調用。第11行switch到waiter.greenlet這個協程,在講解waiter.get的時候就提到了waiter.greenlet是main greenlet。注意,這裡得switch會回到main greenlet被切出的地方(也就是main greenlet掛起的地方),那就是在waiter.get的第10行,整個邏輯也就恢復到main greenlet繼續執行。




總結

:sleep的作用很簡單,觸發一個阻塞的操作,導致調用hub.wait,從當前greenlet.greenlet切換至Hub,超時之後再從hub切換到之前的greenlet繼續執行。通過這個例子可以知道,gevent將任何阻塞性的操作封裝成一個Watcher,然後從調用阻塞操作的協程切換到Hub,等到阻塞操作完成之後,再從Hub切換到之前的協程。




gevent調度示例2:




上面這個例子,雖然能夠理順gevent的調度流程,但事實上並沒有體現出gevent 協作的優勢。接下來看看gevent tutorial的例子:





import gevent


 


def foo

()

:


    

print

(

"Running in foo"

)


    

gevent

.

sleep

(

0

)


    

print

(

"Explicit context switch to foo again"

)


 


def bar

()

:


    

print

(

"Explicit context to bar"

)


    

gevent

.

sleep

(

0

)


    

print

(

"Implicit context switch back to bar"

)


 


gevent

.

joinall

([


    

gevent

.

spawn

(

foo

),


    

gevent

.

spawn

(

bar

),


])


 


# output


Running 

in

 

foo


Explicit context 

to

 

bar


Explicit context 

switch

 

to

 

foo again


Implicit context 

switch

 

back 

to

 

bar




從輸出可以看到, foo和bar依次輸出,顯然是在gevent.sleep的時候發生了執行流程切換,gevent.sleep再前面已經介紹了,那麼這裡主要關注spawn和joinall函數。




gevent.spawn本質調用了gevent.greenlet.Greenlet的類方法spawn:





@

classmethod


def spawn

(

cls

,

 *

args

,

 **

kwargs

)

:


   

g

 = 

cls

(

*

args

,

 **

kwargs

)


   

g

.

start

()


   

return

 

g




這個類方法調用了Greenlet的兩個函數,__init__ 和 start. init函數中最為關鍵的是這段代碼:





def __init__

(

self

,

 

run

=

None

,

 *

args

,

 **

kwargs

)

:


   

greenlet

.

__init__

(

self

,

 

None

,

 

get_hub

())

 

# 將新創生的greenlet實例的parent一律設置成hub



if

 

run 

is

 

not

 

None

:


   

self

.

_run

 = 

run




start函數的定義也很簡單(gevent.greenlet.Greenlet.start):





def start

(

self

)

:


   

"""Schedule the greenlet to run in this loop iteration"""


   

if

 

self

.

_start_event 

is

 

None

:


   

self

.

_start_event

 = 

self

.

parent

.

loop

.

run_callback

(

self

.

switch

)




註冊回調事件self.switch到hub.loop,注意Greenlet.switch最終會調用到Greenlet._run, 也就是spawn函數傳入的callable對象(foo、bar)。這裡僅僅是註冊,但還沒有開始事件輪詢,gevent.joinall就是用來啟動事件輪詢並等待運行結果的。




joinall函數會一路調用到gevent.hub.iwait函數:





def iwait

(

objects

,

 

timeout

=

None

,

 

count

=

None

)

:


    

"""


    Iteratively yield *objects* as they are ready, until all (or *count*) are ready


    or *timeout* expired.


    """


    

# QQQ would be nice to support iterable here that can be generated slowly (why?)


    

if

 

objects 

is

 

None

:


        

yield get_hub

().

join

(

timeout

=

timeout

)


        

return


 


    

count

 = 

len

(

objects

)

 

if

 

count 

is

 

None 

else

 

min

(

count

,

 

len

(

objects

))


    

waiter

 = 

_MultipleWaiter

()

 

# _MultipleWaiter是Waiter的子類


    

switch

 = 

waiter

.

switch


 


    

if

 

timeout 

is

 

not

 

None

:


        

timer

 = 

get_hub

().

loop

.

timer

(

timeout

,

 

priority

=-

1

)


        

timer

.

start

(

switch

,

 

_NONE

)


 


    

try

:


        

for

 

obj 

in

 

objects

:


            

obj

.

rawlink

(

switch

)

 

# 這裡往hub.loop註冊了回調



        

for

 

idx 

in

 

xrange

(

count

)

:


            

print

 

"for in iwait"

,

 

idx


            

item

 = 

waiter

.

get

()

 

# 這裡會切換到hub


            

print

 

"come here "

,

 

item

,

 

getcurrent

()


            

waiter

.

clear

()


            

if

 

item 

is

 

_NONE

:


                

return


            

yield item


    

finally

:


        

if

 

timeout 

is

 

not

 

None

:


            

timer

.

stop

()


        

for

 

obj 

in

 

objects

:


            

unlink

 = 

getattr

(

obj

,

 

"unlink"

,

 

None

)


            

if

 

unlink

:


                

try

:


                    

unlink

(

switch

)


                

except

:


                    

traceback

.

print_exc

()




然後iwait函數第23行開始的循環,逐個調用waiter.get。這裡的waiter是_MultipleWaiter(Waiter)的實例,其get函數最終調用到Waiter.get。前面已經詳細介紹了Waiter.get,簡而言之,就是switch到hub。我們利用greenlet的tracing功能可以看到整個greenlet.greenlet的switch流程,修改後的代碼如下:





import gevent


import greenlet


def callback

(

event

,

 

args

)

:


    

print 

event

,

 

args

[

0

],

 

"===:>>>>"

,

 

args

[

1

]


 


def foo

()

:


    

print

(

"Running in foo"

)


    

gevent

.

sleep

(

0

)


    

print

(

"Explicit context switch to foo again"

)


 


def bar

()

:


    

print

(

"Explicit context to bar"

)


    

gevent

.

sleep

(

0

)


    

print

(

"Implicit context switch back to bar"

)


 


print

 

"main greenlet info: "

,

 

greenlet

.

greenlet

.

getcurrent

()


print

 

"hub info"

,

 

gevent

.

get_hub

()


oldtrace

 = 

greenlet

.

settrace

(

callback

)


        


gevent

.

joinall

([


    

gevent

.

spawn

(

foo

),


    

gevent

.

spawn

(

bar

),


])


greenlet

.

settrace

(

oldtrace

)




切換流程及原因見下圖:







總結

:gevent.spawn創建一個新的Greenlet,並註冊到hub的loop上,調用gevent.joinall或者Greenlet.join的時候開始切換到hub。




本文通過兩個簡單的例子並結合源碼分析了gevent的協程調度流程。gevent的使用非常方便,尤其是在web server中,基本上應用App什麼都不用做就能享受gevent帶來的好處。筆者閱讀gevent源碼最重要的原因在於想了解gevent對greenlet的封裝和使用,greenlet很強大,強大到容易出錯,而gevent保證在兩層協程之間切換,值得借鑒!




參考






  • http://www.cnblogs.com/xybaby/p/6337944.html



  • http://www.gevent.org/



  • https://pypi.python.org/pypi/greenlet



  • http://software.schmorp.de/pkg/libev.html



  • http://libevent.org/



  • http://eventlet.net/



  • http://nichol.as/benchmark-of-python-web-servers



  • http://libev.schmorp.de/bench.html



  • http://sdiehl.github.io/gevent-tutorial/






看完本文有收穫?請轉

發分享給更多人


關注「P

ython開發者」,提升Python技能


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 Python開發者 的精彩文章:

入門 Python 要多久?
遺傳演算法中適值函數的標定與大變異演算法
用 Scikit-Learn 和 Pandas 學習線性回歸
Python 源碼閱讀:類型

TAG:Python開發者 |