當前位置:
首頁 > 知識 > Python 並發編程之協程/非同步IO

Python 並發編程之協程/非同步IO

點擊上方「

Python開發

」,選擇「置頂公眾號」


關鍵時刻,第一時間送達!








引言




隨著node.js的盛行,相信大家今年多多少少都聽到了非同步編程這個概念。Python社區雖然對於非同步編程的支持相比其他語言稍顯遲緩,但是也在Python3.4中加入了asyncio,在Python3.5上又提供了async/await語法層面的支持,剛正式發布的Python3.6中asynico也已經由臨時版改為了穩定版。下面我們就基於Python3.4+來了解一下非同步編程的概念以及asyncio的用法。




什麼是協程



通常在Python中我們進行並發編程一般都是使用多線程或者多進程來實現的,對於計算型任務由於GIL的存在我們通常使用多進程來實現,而對與IO型任務我們可以通過線程調度來讓線程在執行IO任務時讓出GIL,從而實現表面上的並發。




其實對於IO型任務我們還有一種選擇就是協程,協程是運行在單線程當中的「並發」,協程相比多線程一大優勢就是省去了多線程之間的切換開銷,獲得了更大的運行效率。Python中的asyncio也是基於協程來進行實現的。在進入asyncio之前我們先來了解一下Python中怎麼通過生成器進行協程來實現並發。




example1




我們先來看一個簡單的例子來了解一下什麼是協程(coroutine),對生成器不了解的朋友建議先看一下Stackoverflow上面的這篇高票回答(http://stackoverflow.com/questions/231767/what-does-the-yield-keyword-do)。





>>>

def

coroutine

()

:


...

    

reply

=

yield

"hello"


...

    

yield

reply


...


>>>

c

=

coroutine

()


>>>

next

(

c

)


"hello"


>>>

c

.

send

(

"world"

)

"world"




example2




下面這個程序我們要實現的功能就是模擬多個學生

同時向一個老師提交作業

,按照傳統的話我們或許要採用多線程/多進程,但是這裡我們可以採用生成器來實現協程用來模擬並發。





如果下面這個程序讀起來有點困難,可以直接跳到後面部分,並不影響閱讀,等你理解協程的本質,回過頭來看就很簡單了。





from

collections

import

deque


def

student

(

name

,

homeworks

)

:


    

for

homework

in

homeworks

.

items

()

:


        

yield

(

name

,

homework

[

0

],

homework

[

1

])

  

# 學生"生成"作業給老師


class

Teacher

(

object

)

:


    

def

__init__

(

self

,

students

)

:


        

self

.

students

=

deque

(

students

)


    

def

handle

(

self

)

:


        

"""老師處理學生作業"""


        

while

len

(

self

.

students

)

:


            

student

=

self

.

students

.

pop

()


            

try

:


                

homework

=

next

(

student

)


                

print

(

"handling"

,

homework

[

0

],

homework

[

1

],

homework

[

2

])


            

except

StopIteration

:


                

pass


            

else

:


                

self

.

students

.

appendleft

(

student

)




下面我們來調用一下這個程序。





Teacher

([


    

student

(

"Student1"

,

{

"math"

:

"1+1=2"

,

"cs"

:

"operating system"

}),


    

student

(

"Student2"

,

{

"math"

:

"2+2=4"

,

"cs"

:

"computer graphics"

}),


    

student

(

"Student3"

,

{

"math"

:

"3+3=5"

,

"cs"

:

"compiler construction"

})


]).

handle

()




這是輸出結果,我們僅僅只用了一個簡單的生成器就實現了並發(concurrence),注意不是並行(parallel),因為我們的程序僅僅是運行在一個單線程當中。





handling Student3 cs compiler

construction


handling Student2 cs computer graphics


handling Student1 cs operating system


handling Student3 math

3

+

3

=

5


handling Student2 math

2

+

2

=

4


handling Student1 math

1

+

1

=

2




使用asyncio模塊實現協程




從Python3.4開始asyncio模塊加入到了標準庫,通過asyncio我們可以輕鬆實現協程來完成非同步IO操作。




解釋一下下面這段代碼,我們創造了一個協程display_date(num, loop),然後它使用關鍵字yield from來等待協程asyncio.sleep(2)的返回結果。而在這等待的2s之間它會讓出CPU的執行權,直到asyncio.sleep(2)返回結果。





# coroutine.py


import

asyncio


import

datetime


@

asyncio

.

coroutine

  

# 聲明一個協程


def

display_date

(

num

,

loop

)

:


    

end_time

=

loop

.

time

()

+

10.0


    

while

True

:


        

print

(

"Loop: {} Time: {}"

.

format

(

num

,

datetime

.

datetime

.

now

()))


        

if

(

loop

.

time

()

+

1.0

)

>=

end_time

:


            

break


        

yield

from

asyncio

.

sleep

(

2

)

  

# 阻塞直到協程sleep(2)返回結果


loop

=

asyncio

.

get_event_loop

()

  

# 獲取一個event_loop


tasks

=

[

display_date

(

1

,

loop

),

display_date

(

2

,

loop

)]


loop

.

run_until_complete

(

asyncio

.

gather

(

*

tasks

))

  

# "阻塞"直到所有的tasks完成


loop

.

close

()




下面是運行結果,注意到並發的效果沒有,程序從開始到結束只用大約10s,而在這裡我們並沒有使用任何的多線程/多進程代碼。在實際項目中你可以將asyncio.sleep(secends)替換成相應的IO任務,比如資料庫/磁碟文件讀寫等操作。





ziwenxie

:: ~

?

python

coroutine

.

py


Loop

:

1

Time

:

2016

-

12

-

19

16

:

06

:

46.515329


Loop

:

2

Time

:

2016

-

12

-

19

16

:

06

:

46.515446


Loop

:

1

Time

:

2016

-

12

-

19

16

:

06

:

48.517613


Loop

:

2

Time

:

2016

-

12

-

19

16

:

06

:

48.517724


Loop

:

1

Time

:

2016

-

12

-

19

16

:

06

:

50.520005


Loop

:

2

Time

:

2016

-

12

-

19

16

:

06

:

50.520169


Loop

:

1

Time

:

2016

-

12

-

19

16

:

06

:

52.522452


Loop

:

2

Time

:

2016

-

12

-

19

16

:

06

:

52.522567


Loop

:

1

Time

:

2016

-

12

-

19

16

:

06

:

54.524889


Loop

:

2

Time

:

2016

-

12

-

19

16

:

06

:

54.525031


Loop

:

1

Time

:

2016

-

12

-

19

16

:

06

:

56.527713


Loop

:

2

Time

:

2016

-

12

-

19

16

:

06

:

56.528102




在Python3.5中為我們提供更直接的對協程的支持,引入了async/await關鍵字,上面的代碼我們可以這樣改寫,使用async代替了@asyncio.coroutine,使用了await代替了yield from,這樣我們的代碼變得更加簡潔可讀。





import

asyncio


import

datetime


async

def

display_date

(

num

,

loop

)

:  

# 聲明一個協程


    

end_time

=

loop

.

time

()

+

10.0


    

while

True

:


        

print

(

"Loop: {} Time: {}"

.

format

(

num

,

datetime

.

datetime

.

now

()))


        

if

(

loop

.

time

()

+

1.0

)

>=

end_time

:


            

break


        

await

asyncio

.

sleep

(

2

)

  

# 等同於yield from


loop

=

asyncio

.

get_event_loop

()

  

# 獲取一個event_loop


tasks

=

[

display_date

(

1

,

loop

),

display_date

(

2

,

loop

)]


loop

.

run_until_complete

(

asyncio

.

gather

(

*

tasks

))

  

# "阻塞"直到所有的tasks完成


loop

.

close

()




asyncio模塊詳解




開啟事件循環有兩種方法,一種方法就是通過調用run_until_complete,另外一種就是調用run_forever。run_until_complete內置add_done_callback,使用run_forever的好處是可以通過自己自定義add_done_callback,具體差異請看下面兩個例子。




run_until_complete()





import

asyncio


async

def

slow_operation

(

future

)

:


    

await

asyncio

.

sleep

(

1

)


    

future

.

set_result

(

"Future is done!"

)


loop

=

asyncio

.

get_event_loop

()


future

=

asyncio

.

Future

()


asyncio

.

ensure_future

(

slow_operation

(

future

))


print

(

loop

.

is_running

())

  

# False


loop

.

run_until_complete

(

future

)


print

(

future

.

result

())


loop

.

close

()




run_forever()




run_forever相比run_until_complete的優勢是添加了一個add_done_callback,可以讓我們在task(future)完成的時候調用相應的方法進行後續處理。





import

asyncio


async

def

slow_operation

(

future

)

:


    

await

asyncio

.

sleep

(

1

)


    

future

.

set_result

(

"Future is done!"

)


def

got_result

(

future

)

:


    

print

(

future

.

result

())


    

loop

.

stop

()


loop

=

asyncio

.

get_event_loop

()


future

=

asyncio

.

Future

()


asyncio

.

ensure_future

(

slow_operation

(

future

))


future

.

add_done_callback

(

got_result

)


try

:


    

loop

.

run_forever

()


finally

:


    

loop

.

close

()




這裡還要注意一點,即使你調用了協程方法,但是如果事件循環沒有開啟,協程也不會執行,參考官方文檔的描述,我剛被坑過。





Calling a coroutine does not start its code running – the coroutine object returned by the call doesn』t do anything until you schedule its execution. There are two basic ways to start it running: call await coroutine or yield from coroutine from another coroutine (assuming the other coroutine is already running!), or schedule its execution using the ensure_future() function or the AbstractEventLoop.create_task() method. Coroutines (and tasks) can only run when the event loop is running.




Call




call_soon()





import

asyncio


def

hello_world

(

loop

)

:


    

print

(

"Hello World"

)


    

loop

.

stop

()


loop

=

asyncio

.

get_event_loop

()


# Schedule a call to hello_world()


loop

.

call_soon

(

hello_world

,

loop

)


# Blocking call interrupted by loop.stop()


loop

.

run_forever

()


loop

.

close

()




下面是運行結果,我們可以通過call_soon提前註冊我們的task,並且也可以根據返回的Handle進行cancel。





Hello World




call_later()





import

asyncio


import

datetime


def

display_date

(

end_time

,

loop

)

:


    

print

(

datetime

.

datetime

.

now

())


    

if

(

loop

.

time

()

+

1.0

)

<

end_time

:


        

loop

.

call_later

(

1

,

display_date

,

end_time

,

loop

)


    

else

:


        

loop

.

stop

()


loop

=

asyncio

.

get_event_loop

()


# Schedule the first call to display_date()


end_time

=

loop

.

time

()

+

5.0


loop

.

call_soon

(

display_date

,

end_time

,

loop

)


# Blocking call interrupted by loop.stop()


loop

.

run_forever

()


loop

.

close

()




改動一下上面的例子我們來看一下call_later的用法,注意這裡並沒有像上面那樣使用while循環進行操作,我們可以通過call_later來設置每隔1秒去調用display_date()方法。





2016

-

12

-

24

19

:

17

:

13.421649


2016

-

12

-

24

19

:

17

:

14.422933


2016

-

12

-

24

19

:

17

:

15.424315


2016

-

12

-

24

19

:

17

:

16.425571


2016

-

12

-

24

19

:

17

:

17.426874




Chain coroutines





import

asyncio


async

def

compute

(

x

,

y

)

:


    

print

(

"Compute %s + %s ..."

%

(

x

,

y

))


    

await

asyncio

.

sleep

(

1.0

)

  

# 協程compute不會繼續往下面執行,直到協程sleep返回結果


    

return

x

+

y


async

def

print_sum

(

x

,

y

)

:


    

result

=

await compute

(

x

,

y

)

  

# 協程print_sum不會繼續往下執行,直到協程compute返回結果


    

print

(

"%s + %s = %s"

%

(

x

,

y

,

result

))


loop

=

asyncio

.

get_event_loop

()


loop

.

run_until_complete

(

print_sum

(

1

,

2

))


loop

.

close

()




下面是輸出結果





ziwenxie

:: ~

?

python

chain

.

py


Compute

1

+

2

...


1

+

2

=

3




在爬蟲

中使用asyncio來實現非同步IO




下面我們來通過一個簡單的例子來看一下怎麼在Python爬蟲項目中使用asyncio。by the way: 根據我有限的實驗結果,如果要充分發揮asynio的威力,應該使用aiohttp而不是requests。而且也要合理使用concurrent.futures模塊提供的線程池/進程池,這一點我會在下一篇博文描述。





import

asyncio


import

requests


async

def

spider

(

loop

)

:


    

# run_in_exectuor會返回一個Future,而不是coroutine object


    

future1

=

loop

.

run_in_executor

(

None

,

requests

.

get

,

"https://www.python.org/"

)


    

future2

=

loop

.

run_in_executor

(

None

,

requests

.

get

,

"http://httpbin.org/"

)


    

# 通過命令行可以發現上面兩個網路IO在並發進行


    

response1

=

await

future1

  

# 阻塞直到future1完成


    

response2

=

await

future2

  

# 阻塞直到future2完成


    

print

(

len

(

response1

.

text

))


    

print

(

len

(

response2

.

text

))


    

return

"done"


loop

=

asyncio

.

get_event_loop

()


# If the argument is a coroutine object, it is wrapped by ensure_future().


result

=

loop

.

run_until_complete

(

spider

(

loop

))


print

(

result

)


loop

.

close

()





p.s: 如果你能自己體會到為什麼盲目地使用線程池/進程池並不能提高基於asynico模塊的程序的效率,我想你對協程的理解也差不多了。




References






  • DOCUMENTATION OF ASYNCIO



  • COROUTINES AND ASYNC/AWAIT



  • STACKOVERFLOW



  • PyMOTW-3






  • 來源:ZiWenXie




  • www.ziwenxie.site/2016/12/19/python-asynico/



  • Python開發整理髮布,轉載請聯繫作者獲得授權


【點擊成為Java大神】

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 Python開發 的精彩文章:

Git 最佳實踐:commit msg

TAG:Python開發 |