Gevent 調度流程解析
(點擊
上方藍字
,快速關注我們)
來源:xybaby
www.cnblogs.com/xybaby/p/6370799.html
如有好文章投稿,請點擊 → 這裡了解詳情
gevent是目前應用非常廣泛的網路庫,高效的輪詢IO庫libev加上greenlet實現的協程(coroutine),使得gevent的性能非常出色,尤其是在web應用中。
本文介紹gevent的調度流程,主要包括gevent對greenlet的封裝和使用,以及greenlet與libev的協作。閱讀本文需要對greenlet有一定的認識,可以參考這篇文章,另外,本文分析的gevent版本為1.2,可以通過gevent.version_info查看版本號。
gevent簡介:
gevent是基於協程(greenlet)的網路庫,底層的事件輪詢基於libev(早期是libevent),gevent的API概念和Python標準庫一致(如事件,隊列)。gevent有一個很有意思的東西-monkey-patch,能夠使python標準庫中的阻塞操作變成非同步,如socket的讀寫。
gevent來源於eventlet,自稱比後者實現更簡單、API更方便且性能更好,許多開源的web伺服器也使用了gevent,如gunicorn、paste,當然gevent本生也可以作為一個python web伺服器使用。這篇文章對常見的wsgi server進行性能對比,gevent不管在http1.0還是http1.1都表現非常出色。下圖是目前常用的http1.1標準下的表現:
gevent高效的秘訣就是greenlet和libev啦,greenlet在之前的博文有介紹,gevent對greenlet的使用比較限制,只能在兩層協程之間切換,簡單也不容易出錯。libev使用輪訓非阻塞的方式進行事件處理,比如unix下的epoll。早期gevent使用libevent,後來替換成libev,因為libev「提供更少的核心功能以求更改的效率」,這裡有libev和libevent的性能對比:
greenlet回顧
:如果想了解gevent的調度流程,最重要的是對greenlet有基本的了解。下面總結一些個人認為比較重要的點:
每一個greenlet.greenlet實例都有一個parent(可指定,默認為創生新的greenlet.greenlet所在環境),當greenlet.greenlet實例執行完邏輯正常結束、或者拋出異常結束時,執行邏輯切回到其parent。
可以繼承greenlet.greenlet,子類需要實現run方法,當調用greenlet.switch方法時會調用到這個run方法
在gevent中,有兩個類繼承了greenlet.greenlet,分別是gevent.hub.Hub和gevent.greenlet.Greenlet。後文中,如果是greenlet.greenlet這種寫法,那麼指的是原生的類庫greentlet,如果是greenlet(或者Greenlet)那麼指gevent封裝後的greenlet。
greenlet調度流程
:首先,給出總結性的結論,後面再結合實例和源碼一步步分析。
每個gevent線程都有一個hub,前面提到hub是greenlet.greenlet的實例。hub實例在需要的時候創生(Lazy Created),那麼其parent是main greenlet。之後任何的Greenlet(注意是greenlet.greenlet的子類)實例的parent都設置成hub。hub調用libev提供的事件循環來處理Greenlet代表的任務,當Greenlet實例結束(正常或者異常)之後,執行邏輯又切換到hub。
gevent調度示例1:
我們看下面最簡單的代碼:
>>>
import
gevent
>>>
gevent
.
sleep
(
1
)
上面的代碼很簡單,但事實上gevent的核心都包含在其中,接下來結合源碼進行分析。
首先看sleep函數(gevent.hub.sleep):
def sleep
(
seconds
=
0
,
ref
=
True
)
:
hub
=
get_hub
()
loop
=
hub
.
loop
if
seconds
<=
0
:
waiter
=
Waiter
()
loop
.
run_callback
(
waiter
.
switch
)
waiter
.
get
()
else
:
hub
.
wait
(
loop
.
timer
(
seconds
,
ref
=
ref
))
首先是獲取hub(第2行),然後在hub上wait這個定時器事件(第9行)。get_hub源碼如下(gevent.hub.get_hub):
def get_hub
(
*
args
,
**
kwargs
)
:
"""
Return the hub for the current thread.
"""
hub
=
_threadlocal
.
hub
if
hub
is
None
:
hubtype
=
get_hub_class
()
hub
=
_threadlocal
.
hub
=
hubtype
(
*
args
,
**
kwargs
)
return
hub
可以看到,hub是線程內唯一的,之前也提到過greenlet是線程獨立的,每個線程有各自的greenlet棧。hubtype默認就是gevent.hub.Hub,在hub的初始化函數(__init__)中,會創建loop屬性,默認也就是libev的python封裝。
回到sleep函數定義,hub.wait(loop.timer(seconds, ref=ref))。hub.wait函數非常關鍵,對於任何阻塞性操作,比如timer、io都會調用這個函數,其作用一句話概括:從當前協程切換到hub,直到watcher對應的事件就緒再從hub切換回來。wait函數源碼如下(gevent.hub.Hub.wait):
def wait
(
self
,
watcher
)
:
"""
Wait until the *watcher* (which should not be started) is ready.
"""
waiter
=
Waiter
()
unique
=
object
()
watcher
.
start
(
waiter
.
switch
,
unique
)
try
:
result
=
waiter
.
get
()
if
result
is
not
unique
:
raise InvalidSwitchError
(
"Invalid switch into %s: %r (expected %r)"
%
(
getcurrent
(),
result
,
unique
))
finally
:
watcher
.
stop
()
形參watcher就是loop.timer實例,其cython描述在corecext.pyx,我們簡單理解成是一個定時器事件就行了。上面的代碼中,創建了一個Waiter(gevent.hub.Waiter)對象,這個對象起什麼作用呢,這個類的doc寫得非常清楚:
Waiter
.
__doc_
_
A
low level communication utility
for
greenlets
.
Waiter
is
a
wrapper around
greenlet
"s
``
switch
()``
and
``
throw
()``
calls that makes them somewhat
safer
:
*
switching will occur only
if
the waiting greenlet
is
executing
:
meth
:
`
get
`
method
currently
;
*
any
error raised
in
the greenlet
is
handled
inside
:
meth
:
`
switch
`
and
:
meth
:
`
throw
`
*
if
:
meth
:
`
switch
`
/:
meth
:
`
throw
`
is
called before the receiver
calls
:
meth
:
`
get
`,
then
:
class
:
`
Waiter
`
will store the
value
/
exception
.
The
following
:
meth
:
`
get
`
will
return
the
value
/
raise
the exception
簡而言之,是對greenlet.greenlet類switch 和 throw函數的分裝,用來存儲返回值greenlet的返回值或者捕獲在greenlet中拋出的異常。我們知道,在原生的greenlet中,如果一個greenlet拋出了異常,那麼該異常將會展開至其parent greenlet。
回到Hub.wait函數,第8行 watcher.start(waiter.switch, unique) 註冊了一個回調,在一定時間(1s)之後調用回調函數waiter.switch。注意,waiter.switch此時並沒有執行。然後第10行調用waiter.get。看看這個get函數(gevent.hub.Waiter.get):
def get
(
self
)
:
"""If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
if
self
.
_exception
is
not
_NONE
:
if
self
.
_exception
is
None
:
return
self
.
value
else
:
getcurrent
().
throw
(
*
self
.
_exception
)
else
:
if
self
.
greenlet
is
not
None
:
raise ConcurrentObjectUseError
(
"This Waiter is already used by %r"
%
(
self
.
greenlet
,
))
self
.
greenlet
=
getcurrent
()
# 存儲當前協程,之後從hub switch回來的時候使用
try
:
return
self
.
hub
.
switch
()
# switch到hub
finally
:
self
.
greenlet
=
None
核心的邏輯在第11到15行,11行中,getcurrent獲取當前的greenlet(在這個測試代碼中,是main greenlet,即最原始的greenlet),將其複製給waiter.greenlet。然後13行switch到hub,在greenlet回顧章節的第二條提到,greenlet.greenlet的子類需要重寫run方法,當調用子類的switch時會調用到該run方法。Hub的run方法實現如下:
def run
(
self
)
:
"""
Entry-point to running the loop. This method is called automatically
when the hub greenlet is scheduled; do not call it directly.
:raises LoopExit: If the loop finishes running. This means
that there are no other scheduled greenlets, and no active
watchers or servers. In some situations, this indicates a
programming error.
"""
assert
self
is
getcurrent
(),
"Do not call Hub.run() directly"
while
True
:
loop
=
self
.
loop
loop
.
error_handler
=
self
try
:
loop
.
run
()
finally
:
loop
.
error_handler
=
None
# break the refcount cycle
self
.
parent
.
throw
(
LoopExit
(
"This operation would block forever"
,
self
))
loop自然是libev的事件循環。doc中提到,這個loop理論上會一直循環,如果結束,那麼表明沒有任何監聽的事件(包括IO 定時等)。之前在Hub.wait函數中註冊了定時器,那麼在這個run中,如果時間到了,那麼會調用定時器的callback,也就是之前的waiter.switch, 我們再來看看這個函數(gevent.hub.Waiter.switch):
def
switch
(
self
,
value
=
None
)
:
"""Switch to the greenlet if one"s available. Otherwise store the value."""
greenlet
=
self
.
greenlet
if
greenlet
is
None
:
self
.
value
=
value
self
.
_exception
=
None
else
:
assert
getcurrent
()
is
self
.
hub
,
"Can only use Waiter.switch method from the Hub greenlet"
switch
=
greenlet
.
switch
try
:
switch
(
value
)
except
:
self
.
hub
.
handle_error
(
switch
,
*
sys
.
exc_info
())
核心代碼在第8到13行,第8行保證調用到該函數的時候一定在hub這個協程中,這是很自然的,因為這個函數一定是在Hub.run中被調用。第11行switch到waiter.greenlet這個協程,在講解waiter.get的時候就提到了waiter.greenlet是main greenlet。注意,這裡得switch會回到main greenlet被切出的地方(也就是main greenlet掛起的地方),那就是在waiter.get的第10行,整個邏輯也就恢復到main greenlet繼續執行。
總結
:sleep的作用很簡單,觸發一個阻塞的操作,導致調用hub.wait,從當前greenlet.greenlet切換至Hub,超時之後再從hub切換到之前的greenlet繼續執行。通過這個例子可以知道,gevent將任何阻塞性的操作封裝成一個Watcher,然後從調用阻塞操作的協程切換到Hub,等到阻塞操作完成之後,再從Hub切換到之前的協程。gevent調度示例2:
上面這個例子,雖然能夠理順gevent的調度流程,但事實上並沒有體現出gevent 協作的優勢。接下來看看gevent tutorial的例子:
import gevent
def foo
()
:
(
"Running in foo"
)
gevent
.
sleep
(
0
)
(
"Explicit context switch to foo again"
)
def bar
()
:
(
"Explicit context to bar"
)
gevent
.
sleep
(
0
)
(
"Implicit context switch back to bar"
)
gevent
.
joinall
([
gevent
.
spawn
(
foo
),
gevent
.
spawn
(
bar
),
])
# output
Running
in
foo
Explicit context
to
bar
Explicit context
switch
to
foo again
Implicit context
switch
back
to
bar
從輸出可以看到, foo和bar依次輸出,顯然是在gevent.sleep的時候發生了執行流程切換,gevent.sleep再前面已經介紹了,那麼這裡主要關注spawn和joinall函數。
gevent.spawn本質調用了gevent.greenlet.Greenlet的類方法spawn:
@
classmethod
def spawn
(
cls
,
*
args
,
**
kwargs
)
:
g
=
cls
(
*
args
,
**
kwargs
)
g
.
start
()
return
g
這個類方法調用了Greenlet的兩個函數,__init__ 和 start. init函數中最為關鍵的是這段代碼:
def __init__
(
self
,
run
=
None
,
*
args
,
**
kwargs
)
:
greenlet
.
__init__
(
self
,
None
,
get_hub
())
# 將新創生的greenlet實例的parent一律設置成hub
if
run
is
not
None
:
self
.
_run
=
run
start函數的定義也很簡單(gevent.greenlet.Greenlet.start):
def start
(
self
)
:
"""Schedule the greenlet to run in this loop iteration"""
if
self
.
_start_event
is
None
:
self
.
_start_event
=
self
.
parent
.
loop
.
run_callback
(
self
.
switch
)
註冊回調事件self.switch到hub.loop,注意Greenlet.switch最終會調用到Greenlet._run, 也就是spawn函數傳入的callable對象(foo、bar)。這裡僅僅是註冊,但還沒有開始事件輪詢,gevent.joinall就是用來啟動事件輪詢並等待運行結果的。
joinall函數會一路調用到gevent.hub.iwait函數:
def iwait
(
objects
,
timeout
=
None
,
count
=
None
)
:
"""
Iteratively yield *objects* as they are ready, until all (or *count*) are ready
or *timeout* expired.
"""
# QQQ would be nice to support iterable here that can be generated slowly (why?)
if
objects
is
None
:
yield get_hub
().
join
(
timeout
=
timeout
)
return
count
=
len
(
objects
)
if
count
is
None
else
min
(
count
,
len
(
objects
))
waiter
=
_MultipleWaiter
()
# _MultipleWaiter是Waiter的子類
switch
=
waiter
.
switch
if
timeout
is
not
None
:
timer
=
get_hub
().
loop
.
timer
(
timeout
,
priority
=-
1
)
timer
.
start
(
switch
,
_NONE
)
try
:
for
obj
in
objects
:
obj
.
rawlink
(
switch
)
# 這裡往hub.loop註冊了回調
for
idx
in
xrange
(
count
)
:
"for in iwait"
,
idx
item
=
waiter
.
get
()
# 這裡會切換到hub
"come here "
,
item
,
getcurrent
()
waiter
.
clear
()
if
item
is
_NONE
:
return
yield item
finally
:
if
timeout
is
not
None
:
timer
.
stop
()
for
obj
in
objects
:
unlink
=
getattr
(
obj
,
"unlink"
,
None
)
if
unlink
:
try
:
unlink
(
switch
)
except
:
traceback
.
print_exc
()
然後iwait函數第23行開始的循環,逐個調用waiter.get。這裡的waiter是_MultipleWaiter(Waiter)的實例,其get函數最終調用到Waiter.get。前面已經詳細介紹了Waiter.get,簡而言之,就是switch到hub。我們利用greenlet的tracing功能可以看到整個greenlet.greenlet的switch流程,修改後的代碼如下:
import gevent
import greenlet
def callback
(
event
,
args
)
:
event
,
args
[
0
],
"===:>>>>"
,
args
[
1
]
def foo
()
:
(
"Running in foo"
)
gevent
.
sleep
(
0
)
(
"Explicit context switch to foo again"
)
def bar
()
:
(
"Explicit context to bar"
)
gevent
.
sleep
(
0
)
(
"Implicit context switch back to bar"
)
"main greenlet info: "
,
greenlet
.
greenlet
.
getcurrent
()
"hub info"
,
gevent
.
get_hub
()
oldtrace
=
greenlet
.
settrace
(
callback
)
gevent
.
joinall
([
gevent
.
spawn
(
foo
),
gevent
.
spawn
(
bar
),
])
greenlet
.
settrace
(
oldtrace
)
切換流程及原因見下圖:
總結
:gevent.spawn創建一個新的Greenlet,並註冊到hub的loop上,調用gevent.joinall或者Greenlet.join的時候開始切換到hub。本文通過兩個簡單的例子並結合源碼分析了gevent的協程調度流程。gevent的使用非常方便,尤其是在web server中,基本上應用App什麼都不用做就能享受gevent帶來的好處。筆者閱讀gevent源碼最重要的原因在於想了解gevent對greenlet的封裝和使用,greenlet很強大,強大到容易出錯,而gevent保證在兩層協程之間切換,值得借鑒!
參考
http://www.cnblogs.com/xybaby/p/6337944.html
http://www.gevent.org/
https://pypi.python.org/pypi/greenlet
http://software.schmorp.de/pkg/libev.html
http://libevent.org/
http://eventlet.net/
http://nichol.as/benchmark-of-python-web-servers
http://libev.schmorp.de/bench.html
http://sdiehl.github.io/gevent-tutorial/
看完本文有收穫?請轉
發分享給更多人
關注「P
ython開發者」,提升Python技能


※入門 Python 要多久?
※遺傳演算法中適值函數的標定與大變異演算法
※用 Scikit-Learn 和 Pandas 學習線性回歸
※Python 源碼閱讀:類型
TAG:Python開發者 |