Python 並發編程之協程/非同步IO
點擊上方「
Python開發
」,選擇「置頂公眾號」
關鍵時刻,第一時間送達!
引言
隨著node.js的盛行,相信大家今年多多少少都聽到了非同步編程這個概念。Python社區雖然對於非同步編程的支持相比其他語言稍顯遲緩,但是也在Python3.4中加入了asyncio,在Python3.5上又提供了async/await語法層面的支持,剛正式發布的Python3.6中asynico也已經由臨時版改為了穩定版。下面我們就基於Python3.4+來了解一下非同步編程的概念以及asyncio的用法。
什麼是協程
通常在Python中我們進行並發編程一般都是使用多線程或者多進程來實現的,對於計算型任務由於GIL的存在我們通常使用多進程來實現,而對與IO型任務我們可以通過線程調度來讓線程在執行IO任務時讓出GIL,從而實現表面上的並發。
其實對於IO型任務我們還有一種選擇就是協程,協程是運行在單線程當中的「並發」,協程相比多線程一大優勢就是省去了多線程之間的切換開銷,獲得了更大的運行效率。Python中的asyncio也是基於協程來進行實現的。在進入asyncio之前我們先來了解一下Python中怎麼通過生成器進行協程來實現並發。
example1
我們先來看一個簡單的例子來了解一下什麼是協程(coroutine),對生成器不了解的朋友建議先看一下Stackoverflow上面的這篇高票回答(http://stackoverflow.com/questions/231767/what-does-the-yield-keyword-do)。
>>>
def
coroutine
()
:
...
reply
=
yield
"hello"
...
yield
reply
...
>>>
c
=
coroutine
()
>>>
next
(
c
)
"hello"
>>>
c
.
send
(
"world"
)
"world"
example2
下面這個程序我們要實現的功能就是模擬多個學生
同時向一個老師提交作業
,按照傳統的話我們或許要採用多線程/多進程,但是這裡我們可以採用生成器來實現協程用來模擬並發。
如果下面這個程序讀起來有點困難,可以直接跳到後面部分,並不影響閱讀,等你理解協程的本質,回過頭來看就很簡單了。
from
collections
import
deque
def
student
(
name
,
homeworks
)
:
for
homework
in
homeworks
.
items
()
:
yield
(
name
,
homework
[
0
],
homework
[
1
])
# 學生"生成"作業給老師
class
Teacher
(
object
)
:
def
__init__
(
self
,
students
)
:
self
.
students
=
deque
(
students
)
def
handle
(
self
)
:
"""老師處理學生作業"""
while
len
(
self
.
students
)
:
student
=
self
.
students
.
pop
()
try
:
homework
=
next
(
student
)
(
"handling"
,
homework
[
0
],
homework
[
1
],
homework
[
2
])
except
StopIteration
:
pass
else
:
self
.
students
.
appendleft
(
student
)
下面我們來調用一下這個程序。
Teacher
([
student
(
"Student1"
,
{
"math"
:
"1+1=2"
,
"cs"
:
"operating system"
}),
student
(
"Student2"
,
{
"math"
:
"2+2=4"
,
"cs"
:
"computer graphics"
}),
student
(
"Student3"
,
{
"math"
:
"3+3=5"
,
"cs"
:
"compiler construction"
})
]).
handle
()
這是輸出結果,我們僅僅只用了一個簡單的生成器就實現了並發(concurrence),注意不是並行(parallel),因為我們的程序僅僅是運行在一個單線程當中。
handling Student3 cs compiler
construction
handling Student2 cs computer graphics
handling Student1 cs operating system
handling Student3 math
3
+
3
=
5
handling Student2 math
2
+
2
=
4
handling Student1 math
1
+
1
=
2
使用asyncio模塊實現協程
從Python3.4開始asyncio模塊加入到了標準庫,通過asyncio我們可以輕鬆實現協程來完成非同步IO操作。
解釋一下下面這段代碼,我們創造了一個協程display_date(num, loop),然後它使用關鍵字yield from來等待協程asyncio.sleep(2)的返回結果。而在這等待的2s之間它會讓出CPU的執行權,直到asyncio.sleep(2)返回結果。
# coroutine.py
import
asyncio
import
datetime
@
asyncio
.
coroutine
# 聲明一個協程
def
display_date
(
num
,
loop
)
:
end_time
=
loop
.
time
()
+
10.0
while
True
:
(
"Loop: {} Time: {}"
.
format
(
num
,
datetime
.
datetime
.
now
()))
if
(
loop
.
time
()
+
1.0
)
>=
end_time
:
break
yield
from
asyncio
.
sleep
(
2
)
# 阻塞直到協程sleep(2)返回結果
loop
=
asyncio
.
get_event_loop
()
# 獲取一個event_loop
tasks
=
[
display_date
(
1
,
loop
),
display_date
(
2
,
loop
)]
loop
.
run_until_complete
(
asyncio
.
gather
(
*
tasks
))
# "阻塞"直到所有的tasks完成
loop
.
close
()
下面是運行結果,注意到並發的效果沒有,程序從開始到結束只用大約10s,而在這裡我們並沒有使用任何的多線程/多進程代碼。在實際項目中你可以將asyncio.sleep(secends)替換成相應的IO任務,比如資料庫/磁碟文件讀寫等操作。
ziwenxie
:: ~
?
python
coroutine
.
py
Loop
:
1
Time
:
2016
-
12
-
19
16
:
06
:
46.515329
Loop
:
2
Time
:
2016
-
12
-
19
16
:
06
:
46.515446
Loop
:
1
Time
:
2016
-
12
-
19
16
:
06
:
48.517613
Loop
:
2
Time
:
2016
-
12
-
19
16
:
06
:
48.517724
Loop
:
1
Time
:
2016
-
12
-
19
16
:
06
:
50.520005
Loop
:
2
Time
:
2016
-
12
-
19
16
:
06
:
50.520169
Loop
:
1
Time
:
2016
-
12
-
19
16
:
06
:
52.522452
Loop
:
2
Time
:
2016
-
12
-
19
16
:
06
:
52.522567
Loop
:
1
Time
:
2016
-
12
-
19
16
:
06
:
54.524889
Loop
:
2
Time
:
2016
-
12
-
19
16
:
06
:
54.525031
Loop
:
1
Time
:
2016
-
12
-
19
16
:
06
:
56.527713
Loop
:
2
Time
:
2016
-
12
-
19
16
:
06
:
56.528102
在Python3.5中為我們提供更直接的對協程的支持,引入了async/await關鍵字,上面的代碼我們可以這樣改寫,使用async代替了@asyncio.coroutine,使用了await代替了yield from,這樣我們的代碼變得更加簡潔可讀。
import
asyncio
import
datetime
async
def
display_date
(
num
,
loop
)
:
# 聲明一個協程
end_time
=
loop
.
time
()
+
10.0
while
True
:
(
"Loop: {} Time: {}"
.
format
(
num
,
datetime
.
datetime
.
now
()))
if
(
loop
.
time
()
+
1.0
)
>=
end_time
:
break
await
asyncio
.
sleep
(
2
)
# 等同於yield from
loop
=
asyncio
.
get_event_loop
()
# 獲取一個event_loop
tasks
=
[
display_date
(
1
,
loop
),
display_date
(
2
,
loop
)]
loop
.
run_until_complete
(
asyncio
.
gather
(
*
tasks
))
# "阻塞"直到所有的tasks完成
loop
.
close
()
asyncio模塊詳解
開啟事件循環有兩種方法,一種方法就是通過調用run_until_complete,另外一種就是調用run_forever。run_until_complete內置add_done_callback,使用run_forever的好處是可以通過自己自定義add_done_callback,具體差異請看下面兩個例子。
run_until_complete()
import
asyncio
async
def
slow_operation
(
future
)
:
await
asyncio
.
sleep
(
1
)
future
.
set_result
(
"Future is done!"
)
loop
=
asyncio
.
get_event_loop
()
future
=
asyncio
.
Future
()
asyncio
.
ensure_future
(
slow_operation
(
future
))
(
loop
.
is_running
())
# False
loop
.
run_until_complete
(
future
)
(
future
.
result
())
loop
.
close
()
run_forever()
run_forever相比run_until_complete的優勢是添加了一個add_done_callback,可以讓我們在task(future)完成的時候調用相應的方法進行後續處理。
import
asyncio
async
def
slow_operation
(
future
)
:
await
asyncio
.
sleep
(
1
)
future
.
set_result
(
"Future is done!"
)
def
got_result
(
future
)
:
(
future
.
result
())
loop
.
stop
()
loop
=
asyncio
.
get_event_loop
()
future
=
asyncio
.
Future
()
asyncio
.
ensure_future
(
slow_operation
(
future
))
future
.
add_done_callback
(
got_result
)
try
:
loop
.
run_forever
()
finally
:
loop
.
close
()
這裡還要注意一點,即使你調用了協程方法,但是如果事件循環沒有開啟,協程也不會執行,參考官方文檔的描述,我剛被坑過。
Calling a coroutine does not start its code running – the coroutine object returned by the call doesn』t do anything until you schedule its execution. There are two basic ways to start it running: call await coroutine or yield from coroutine from another coroutine (assuming the other coroutine is already running!), or schedule its execution using the ensure_future() function or the AbstractEventLoop.create_task() method. Coroutines (and tasks) can only run when the event loop is running.
Call
call_soon()
import
asyncio
def
hello_world
(
loop
)
:
(
"Hello World"
)
loop
.
stop
()
loop
=
asyncio
.
get_event_loop
()
# Schedule a call to hello_world()
loop
.
call_soon
(
hello_world
,
loop
)
# Blocking call interrupted by loop.stop()
loop
.
run_forever
()
loop
.
close
()
下面是運行結果,我們可以通過call_soon提前註冊我們的task,並且也可以根據返回的Handle進行cancel。
Hello World
call_later()
import
asyncio
import
datetime
def
display_date
(
end_time
,
loop
)
:
(
datetime
.
datetime
.
now
())
if
(
loop
.
time
()
+
1.0
)
<
end_time
:
loop
.
call_later
(
1
,
display_date
,
end_time
,
loop
)
else
:
loop
.
stop
()
loop
=
asyncio
.
get_event_loop
()
# Schedule the first call to display_date()
end_time
=
loop
.
time
()
+
5.0
loop
.
call_soon
(
display_date
,
end_time
,
loop
)
# Blocking call interrupted by loop.stop()
loop
.
run_forever
()
loop
.
close
()
改動一下上面的例子我們來看一下call_later的用法,注意這裡並沒有像上面那樣使用while循環進行操作,我們可以通過call_later來設置每隔1秒去調用display_date()方法。
2016
-
12
-
24
19
:
17
:
13.421649
2016
-
12
-
24
19
:
17
:
14.422933
2016
-
12
-
24
19
:
17
:
15.424315
2016
-
12
-
24
19
:
17
:
16.425571
2016
-
12
-
24
19
:
17
:
17.426874
Chain coroutines
import
asyncio
async
def
compute
(
x
,
y
)
:
(
"Compute %s + %s ..."
%
(
x
,
y
))
await
asyncio
.
sleep
(
1.0
)
# 協程compute不會繼續往下面執行,直到協程sleep返回結果
return
x
+
y
async
def
print_sum
(
x
,
y
)
:
result
=
await compute
(
x
,
y
)
# 協程print_sum不會繼續往下執行,直到協程compute返回結果
(
"%s + %s = %s"
%
(
x
,
y
,
result
))
loop
=
asyncio
.
get_event_loop
()
loop
.
run_until_complete
(
print_sum
(
1
,
2
))
loop
.
close
()
下面是輸出結果
ziwenxie
:: ~
?
python
chain
.
py
Compute
1
+
2
...
1
+
2
=
3
在爬蟲
中使用asyncio來實現非同步IO
下面我們來通過一個簡單的例子來看一下怎麼在Python爬蟲項目中使用asyncio。by the way: 根據我有限的實驗結果,如果要充分發揮asynio的威力,應該使用aiohttp而不是requests。而且也要合理使用concurrent.futures模塊提供的線程池/進程池,這一點我會在下一篇博文描述。
import
asyncio
import
requests
async
def
spider
(
loop
)
:
# run_in_exectuor會返回一個Future,而不是coroutine object
future1
=
loop
.
run_in_executor
(
None
,
requests
.
get
,
"https://www.python.org/"
)
future2
=
loop
.
run_in_executor
(
None
,
requests
.
get
,
"http://httpbin.org/"
)
# 通過命令行可以發現上面兩個網路IO在並發進行
response1
=
await
future1
# 阻塞直到future1完成
response2
=
await
future2
# 阻塞直到future2完成
(
len
(
response1
.
text
))
(
len
(
response2
.
text
))
return
"done"
loop
=
asyncio
.
get_event_loop
()
# If the argument is a coroutine object, it is wrapped by ensure_future().
result
=
loop
.
run_until_complete
(
spider
(
loop
))
(
result
)
loop
.
close
()
p.s: 如果你能自己體會到為什麼盲目地使用線程池/進程池並不能提高基於asynico模塊的程序的效率,我想你對協程的理解也差不多了。
References
DOCUMENTATION OF ASYNCIO
COROUTINES AND ASYNC/AWAIT
STACKOVERFLOW
PyMOTW-3
來源:ZiWenXie
www.ziwenxie.site/2016/12/19/python-asynico/
Python開發整理髮布,轉載請聯繫作者獲得授權
【點擊成為Java大神】


TAG:Python開發 |