當前位置:
首頁 > 知識 > 基於協程的 Python 網路庫 gevent 介紹

基於協程的 Python 網路庫 gevent 介紹

(點擊

上方藍字

,快速關注我們)




來源:思誠之道


www.bjhee.com/gevent.html


如有好文章投稿,請點擊 → 這裡了解詳情




繼續Python協程方面的介紹,這次要講的是gevent,它是一個並發網路庫。它的協程是基於greenlet的,並基於libev實現快速事件循環(Linux上是epoll,FreeBSD上是kqueue,Mac OS X上是select)。有了gevent,協程的使用將無比簡單,你根本無須像greenlet一樣顯式的切換,每當一個協程阻塞時,程序將自動調度,gevent處理了所有的底層細節。讓我們看個例子來感受下吧。




import

gevent



def

test1

()

:


    

print

12


    

gevent

.

sleep

(

0

)


    

print

34



def

test2

()

:


    

print

56


    

gevent

.

sleep

(

0

)


    

print

78



gevent

.

joinall

([

    

gevent

.

spawn

(

test1

),


    

gevent

.

spawn

(

test2

),


])




解釋下,」gevent.spawn()」方法會創建一個新的greenlet協程對象,並運行它。」gevent.joinall()」方法會等待所有傳入的greenlet協程運行結束後再退出,這個方法可以接受一個」timeout」參數來設置超時時間,單位是秒。運行上面的程序,執行順序如下:






  1. 先進入協程test1,列印12



  2. 遇到」gevent.sleep(0)」時,test1被阻塞,自動切換到協程test2,列印56



  3. 之後test2被阻塞,這時test1阻塞已結束,自動切換回test1,列印34



  4. 當test1運行完畢返回後,此時test2阻塞已結束,再自動切換回test2,列印78



  5. 所有協程執行完畢,程序退出




所以,程序運行下來的輸出就是:





12


56


34


78




注意,這裡與上一篇greenlet中第一個例子運行的結果不一樣,greenlet一個協程運行完後,必須顯式切換,不然會返回其父協程。而在gevent中,一個協程運行完後,它會自動調度那些未完成的協程。




我們換一個更有意義的例子:





import

gevent


import

socket



urls

=

[

"www.baidu.com"

,

"www.gevent.org"

,

"www.python.org"

]


jobs

=

[

gevent

.

spawn

(

socket

.

gethostbyname

,

url

)

for

url

in

urls

]


gevent

.

joinall

(

jobs

,

timeout

=

5

)



print

[

job

.

value

for

job

in

jobs

]




我們通過協程分別獲取三個網站的IP地址,由於打開遠程地址會引起IO阻塞,所以gevent會自動調度不同的協程。另外,我們可以通過協程對象的」value」屬性,來獲取協程函數的返回值。




猴子補丁 Monkey patching




細心的朋友們在運行上面例子時會發現,其實程序運行的時間同不用協程是一樣的,是三個網站打開時間的總和。可是理論上協程是非阻塞的,那運行時間應該等於最長的那個網站打開時間呀?其實這是因為Python標準庫里的socket是阻塞式的,DNS解析無法並發,包括像urllib庫也一樣,所以這種情況下用協程完全沒意義。那怎麼辦?




一種方法是使用gevent下的socket模塊,我們可以通過」from gevent import socket」來導入。不過更常用的方法是使用猴子布丁(Monkey patching):





from

gevent

import

monkey

;

monkey

.

patch_socket

()


import

gevent


import

socket



urls

=

[

"www.baidu.com"

,

"www.gevent.org"

,

"www.python.org"

]


jobs

=

[

gevent

.

spawn

(

socket

.

gethostbyname

,

url

)

for

url

in

urls

]


gevent

.

joinall

(

jobs

,

timeout

=

5

)



print

[

job

.

value

for

job

in

jobs

]




上述代碼的第一行就是對socket標準庫打上猴子補丁,此後socket標準庫中的類和方法都會被替換成非阻塞式的,所有其他的代碼都不用修改,這樣協程的效率就真正體現出來了。Python中其它標準庫也存在阻塞的情況,gevent提供了」monkey.patch_all()」方法將所有標準庫都替換。





from gevent import monkey; monkey.patch_all()




使用猴子補丁褒貶不一,但是官網上還是建議使用」patch_all()」,而且在程序的第一行就執行。




獲取協程狀態




協程狀態有已啟動和已停止,分別可以用協程對象的」started」屬性和」ready()」方法來判斷。對於已停止的協程,可以用」successful()」方法來判斷其是否成功運行且沒拋異常。如果協程執行完有返回值,可以通過」value」屬性來獲取。另外,greenlet協程運行過程中發生的異常是不會被拋出到協程外的,因此需要用協程對象的」exception」屬性來獲取協程中的異常。下面的例子很好的演示了各種方法和屬性的使用。





#coding:utf8


import

gevent



def

win

()

:


    

return

"You win!"



def

fail

()

:


    

raise

Exception

(

"You failed!"

)



winner

=

gevent

.

spawn

(

win

)


loser

=

gevent

.

spawn

(

fail

)



print

winner

.

started

# True


print

loser

.

started

  

# True



# 在Greenlet中發生的異常,不會被拋到Greenlet外面。


# 控制台會打出Stacktrace,但程序不會停止


try

:


    

gevent

.

joinall

([

winner

,

loser

])


except

Exception

as

e

:


    

# 這段永遠不會被執行


    

print

"This will never be reached"



print

winner

.

ready

()

# True


print

loser

.

ready

()

  

# True



print

winner

.

value

# "You win!"


print

loser

.

value

  

# None



print

winner

.

successful

()

# True


print

loser

.

successful

()

  

# False



# 這裡可以通過raise loser.exception 或 loser.get()


# 來將協程中的異常拋出


print

loser

.

exception




協程運行超時




之前我們講過在」gevent.joinall()」方法中可以傳入timeout參數來設置超時,我們也可以在全局範圍內設置超時時間:





import

gevent


from

gevent

import

Timeout



timeout

=

Timeout

(

2

)

  

# 2 seconds


timeout

.

start

()



def

wait

()

:


    

gevent

.

sleep

(

10

)



try

:


    

gevent

.

spawn

(

wait

).

join

()


except

Timeout

:


    

print

(

"Could not complete"

)




上例中,我們將超時設為2秒,此後所有協程的運行,如果超過兩秒就會拋出」Timeout」異常。我們也可以將超時設置在with語句內,這樣該設置只在with語句塊中有效:





with

Timeout

(

1

)

:


    

gevent

.

sleep

(

10

)




此外,我們可以指定超時所拋出的異常,來替換默認的」Timeout」異常。比如下例中超時就會拋出我們自定義的」TooLong」異常。





class

TooLong

(

Exception

)

:


    

pass



with

Timeout

(

1

,

TooLong

)

:


    

gevent

.

sleep

(

10

)




協程間通訊




greenlet協程間的非同步通訊可以使用事件(Event)對象。該對象的」wait()」方法可以阻塞當前協程,而」set()」方法可以喚醒之前阻塞的協程。在下面的例子中,5個waiter協程都會等待事件evt,當setter協程在3秒後設置evt事件,所有的waiter協程即被喚醒。





#coding:utf8


import

gevent


from

gevent

.

event

import

Event



evt

=

Event

()



def

setter

()

:


    

print

"Wait for me"


    

gevent

.

sleep

(

3

)

  

# 3秒後喚醒所有在evt上等待的協程


    

print

"Ok, I"m done"


    

evt

.

set

()

  

# 喚醒



def

waiter

()

:


    

print

"I"ll wait for you"


    

evt

.

wait

()

  

# 等待


    

print

"Finish waiting"



gevent

.

joinall

([


    

gevent

.

spawn

(

setter

),


    

gevent

.

spawn

(

waiter

),


    

gevent

.

spawn

(

waiter

),


    

gevent

.

spawn

(

waiter

),


    

gevent

.

spawn

(

waiter

),


    

gevent

.

spawn

(

waiter

)


])




除了Event事件外,gevent還提供了AsyncResult事件,它可以在喚醒時傳遞消息。讓我們將上例中的setter和waiter作如下改動:





from

gevent

.

event

import

AsyncResult


aevt

=

AsyncResult

()



def

setter

()

:


    

print

"Wait for me"


    

gevent

.

sleep

(

3

)

  

# 3秒後喚醒所有在evt上等待的協程


    

print

"Ok, I"m done"


    

aevt

.

set

(

"Hello!"

)

  

# 喚醒,並傳遞消息



def

waiter

()

:


    

print

(

"I"ll wait for you"

)


    

message

=

aevt

.

get

()

  

# 等待,並在喚醒時獲取消息


    

print

"Got wake up message: %s"

%

message




隊列 Queue




隊列Queue的概念相信大家都知道,我們可以用它的put和get方法來存取隊列中的元素。gevent的隊列對象可以讓greenlet協程之間安全的訪問。運行下面的程序,你會看到3個消費者會分別消費隊列中的產品,且消費過的產品不會被另一個消費者再取到:





import

gevent


from

gevent

.

queue

import

Queue



products

=

Queue

()



def

consumer

(

name

)

:


    

while

not

products

.

empty

()

:


        

print

"%s got product %s"

%

(

name

,

products

.

get

())


        

gevent

.

sleep

(

0

)



    

print

"%s Quit"



def

producer

()

:


    

for

i

in

xrange

(

1

,

10

)

:


        

products

.

put

(

i

)



gevent

.

joinall

([


    

gevent

.

spawn

(

producer

),


    

gevent

.

spawn

(

consumer

,

"steve"

),


    

gevent

.

spawn

(

consumer

,

"john"

),


    

gevent

.

spawn

(

consumer

,

"nancy"

),


])




put和get方法都是阻塞式的,它們都有非阻塞的版本:put_nowait和get_nowait。如果調用get方法時隊列為空,則拋出」gevent.queue.Empty」異常。




信號量




信號量可以用來限制協程並發的個數。它有兩個方法,acquire和release。顧名思義,acquire就是獲取信號量,而release就是釋放。當所有信號量都已被獲取,那剩餘的協程就只能等待任一協程釋放信號量後才能得以運行:





import

gevent


from

gevent

.

coros

import

BoundedSemaphore



sem

=

BoundedSemaphore

(

2

)



def

worker

(

n

)

:


    

sem

.

acquire

()


    

print

(

"Worker %i acquired semaphore"

%

n

)


    

gevent

.

sleep

(

0

)


    

sem

.

release

()


    

print

(

"Worker %i released semaphore"

%

n

)



gevent

.

joinall

([

gevent

.

spawn

(

worker

,

i

)

for

i

in

xrange

(

0

,

6

)])




上面的例子中,我們初始化了」BoundedSemaphore」信號量,並將其個數定為2。所以同一個時間,只能有兩個worker協程被調度。程序運行後的結果如下:





Worker

0

acquired semaphore


Worker

1

acquired semaphore


Worker

0

released semaphore


Worker

1

released semaphore


Worker

2

acquired semaphore


Worker

3

acquired semaphore


Worker

2

released semaphore


Worker

3

released semaphore


Worker

4

acquired semaphore


Worker

4

released semaphore


Worker

5

acquired semaphore


Worker

5

released

semaphore




如果信號量個數為1,那就等同於同步鎖。




協程本地變數




同線程類似,協程也有本地變數,也就是只在當前協程內可被訪問的變數:





import

gevent


from

gevent

.

local

import

local



data

=

local

()



def

f1

()

:


    

data

.

x

=

1


    

print

data

.

x



def

f2

()

:


    

try

:


        

print

data

.

x


    

except

AttributeError

:


        

print

"x is not visible"



gevent

.

joinall

([


    

gevent

.

spawn

(

f1

),


    

gevent

.

spawn

(

f2

)


])




通過將變數存放在local對象中,即可將其的作用域限制在當前協程內,當其他協程要訪問該變數時,就會拋出異常。不同協程間可以有重名的本地變數,而且互相不影響。因為協程本地變數的實現,就是將其存放在以的」greenlet.getcurrent()」的返回為鍵值的私有的命名空間內。




實際應用




講到這裡,大家肯定很想看一個gevent的實際應用吧,這裡有一個簡單的聊天室程序,基於Flask實現,大家可以參考下。




更多參考資料






  • gevent的官方文檔



  • gevent社區提供的教程




看完本文有收穫?請轉

發分享給更多人


關注「P

ython開發者」,提升Python技能


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 Python開發者 的精彩文章:

用 Python 進行貝葉斯模型建模(3)
IEEE Spectrum 2017 編程語言排行:Python 奪冠
Python 求職 Top10 城市,來看看是否有你所在的城市
機器學習演算法實踐:樸素貝葉斯 (Naive Bayes)

TAG:Python開發者 |

您可能感興趣

破解 Kotlin 協程
協程-以Python和Go為例
Python協程中使用上下文
Python 並發編程之協程/非同步IO
Swoole 2.1 正式版:協程+通道帶來全新的 PHP 編程模式
Kotlin 1.3帶來穩定的協程、合約及其他
php實現協程,真正的非同步
一個故事講完進程、線程和協程
C+協程的近況、設計與實現中的細節和決策