初探 Python 3 的非同步 IO 編程
(點擊
上方藍字
,快速關注我們)
來源:keakon
www.keakon.net/2015/09/07/初探Python3的非同步IO編程
如有好文章投稿,請點擊 → 這裡了解詳情
小編註:本文寫於2015年9月7日
上周終於把知乎日報的新版本做完了,於是趁著這幾天的休息,有精力折騰一些感興趣的玩意了。
雖然工作時並不會接觸到 Python 3,但還是對它抱有不少好奇心,於是把 Python 版本更新到了 3.4,開始了折騰之旅。在各種更新中,我最感興趣的當屬 asyncio 模塊了,所以就從非同步 IO 開始探索吧。
探索之前,先簡單介紹下各種 IO 模型:
最容易做的是阻塞 IO,即讀寫數據時,需要等待操作完成,才能繼續執行。進階的做法就是用多線程來處理需要 IO 的部分,缺點是開銷會有些大。
接著是非阻塞 IO,即讀寫數據時,如果暫時不可讀寫,則立刻返回,而不等待。因為不知道什麼時候是可讀寫的,所以輪詢時可能會浪費 CPU 時間。
然後是 IO 復用,即在讀寫數據前,先檢查哪些描述符是可讀寫的,再去讀寫。select 和 poll 就是這樣做的,它們會遍歷所有被監視的描述符,查看是否滿足,這個檢查的過程是阻塞的。而 epoll、kqueue 和 /dev/poll 則做了些改進,事先註冊需要檢查哪些描述符的哪些事件,當狀態發生變化時,內核會調用對應的回調函數,將這些描述符保存下來;下次獲取可用的描述符時,直接返回這些發生變化的描述符即可。
再之後是信號驅動,即描述符就緒時,內核發送 SIGIO 信號,再由信號處理程序去處理這些信號即可。不過信號處理的時機是從內核態返回用戶態時,感覺也得把這些事件收集起來才好處理,有點像模擬 IO 復用了。
最後是非同步 IO,即讀寫數據時,只註冊事件,內核完成讀寫後(讀取的數據會複製到用戶態),再調用事件處理函數。這整個過程都不會阻塞調用線程,不過實現它的操作系統比較少,Windows 上有比較成熟的 IOCP,Linux 上的 AIO 則有不少缺點。
雖然真正的非同步 IO 需要中間任何步驟都沒有阻塞,這對於某些只是偶爾需要處理 IO 請求的情況確實有用(比如文本編輯器偶爾保存一下文件);但對於伺服器端編程的大多數情況而言,它的主線程就是用來處理 IO 請求的,如果在空閑時不阻塞在 IO 等待上,也沒有別的事情能做,所以本文就不糾結這個非同步是否名副其實了。
在 Python 2 的時代,高性能的網路編程主要是使用 Twisted、Tornado 和 gevent 這三個庫。
我對 Twisted 不熟,只知道它的缺點是比較重,性能相對而言並不算好。
Tornado 平時用得比較多,缺點是寫非同步調用時特別麻煩。gevent 我只能算接觸過,缺點是不太乾淨。
由於它們都各自有一個 IO loop,不好混用,而 Tornado 的 web 框架相對而言比較完善,因此成了我的首選。
而從 Python 3.4 開始,標準庫里又新增了 asyncio 這個模塊。
從原理上來說,它和 Tornado 其實差不多,都是註冊 IO 事件,然後在 IO loop 中等待事件發生,然後調用相應的處理函數。
不同之處在於 Python 3 增加了一些新的特性,而 Tornado 需要兼容 Python 2,所以寫起來會比較麻煩。
舉例來說,Python 3.3 可以在 generator 中 return 返回值(相當於 raise StopIteration),而 Tornado 中需要 raise 一個 Return 對象。此外,Python 3.3 還增加了 yield from 語法,減輕了在 generator 中處理另一個 generator 的工作量(省去了循環和 try … except …)。
不過,雖然 asyncio 有那麼多得天獨厚的優勢,卻不一定比 Tornado 的性能更好,所以我寫個簡單的例子測試一下。
比較方法就是寫個最簡單的 HTTP 伺服器,不做任何檢查,讀取到任何內容都輸出一個 hello world,並斷開連接。
測試的客戶端就懶得寫了,直接用 ab 即可:
ab -n 10000 -c 10 "http://0.0.0.0:8000/"
Tornado 版是這樣:
from
tornado
.
gen import coroutine
from
tornado
.
ioloop import IOLoop
from
tornado
.
tcpserver import TCPServer
class
Server
(
TCPServer
)
:
@
coroutine
def handle_stream
(
self
,
stream
,
address
)
:
try
:
yield
stream
.
read_bytes
(
1024
,
partial
=
True
)
yield
stream
.
write
(
b
"HTTP 1.0 200 OKrnrnhello world"
)
finally
:
stream
.
close
()
server
=
Server
()
server
.
bind
(
8000
)
server
.
start
(
1
)
IOLoop
.
current
().
start
()
在我的電腦上大概 4000 QPS。
asyncio 版是這樣:
import asyncio
class
Server
(
asyncio
.
Protocol
)
:
def connection_made
(
self
,
transport
)
:
self
.
transport
=
transport
def data_received
(
self
,
data
)
:
try
:
self
.
transport
.
write
(
b
"HTTP/1.1 200 OKrnrnhello world"
)
finally
:
self
.
transport
.
close
()
loop
=
asyncio
.
get_event_loop
()
server
=
loop
.
create_server
(
Server
,
""
,
8000
)
loop
.
run_until_complete
(
server
)
loop
.
run_forever
()
在我的電腦上大概 3000 QPS,比 Tornado 版慢了一些。此外,asyncio 的 transport 在 write 時不用 yield from,這點可能有些不一致。
asyncio 還有個高級版的 API:
import
asyncio
@
asyncio
.
coroutine
def handle
(
reader
,
writer
)
:
yield from
reader
.
read
(
1024
)
writer
.
write
(
b
"HTTP/1.1 200 OKrnrnhello world"
)
yield from
writer
.
drain
()
writer
.
close
()
loop
=
asyncio
.
get_event_loop
()
task
=
asyncio
.
start_server
(
handle
,
""
,
8000
,
loop
=
loop
)
server
=
loop
.
run_until_complete
(
task
)
loop
.
run_forever
()
在我的電腦上大概 2200 QPS。這下讀寫都要 yield from 了,一致性上來說會好些。
以框架的性能而言,其實都夠用,開銷都不超過 1 毫秒,而 web 請求一般都需要 10 毫秒的以上的處理時間。
於是順便再測一下和 MySQL 的搭配,即在每個請求內調用一下 SELECT 1,然後輸出返回值。
因為自己懶得寫客戶端了,於是就用現成的 tornado_mysql 和 aiomysql 來測試了。原理應該都差不多,發送寫請求後就返回,等收到可讀事件時再獲取內容。
Tornado 版是這樣:
from
tornado
.
gen import coroutine
from
tornado
.
ioloop import IOLoop
from
tornado
.
tcpserver import TCPServer
from tornado_mysql import pools
class
Server
(
TCPServer
)
:
@
coroutine
def handle_stream
(
self
,
stream
,
address
)
:
try
:
yield
stream
.
read_bytes
(
1024
,
partial
=
True
)
cursor
=
yield
POOL
.
execute
(
b
"SELECT 1"
)
data
=
cursor
.
fetchone
()
yield
stream
.
write
(
"HTTP/1.1 200 OKrnrn{0[0]}"
.
format
(
data
).
encode
())
# Python 3.5 的 bytes 才能用 % 格式化
finally
:
stream
.
close
()
POOL
=
pools
.
Pool
(
dict
(
host
=
"127.0.0.1"
,
port
=
3306
,
user
=
"root"
,
passwd
=
"123"
,
db
=
"mysql"
),
max_idle_connections
=
10
,
max_open_connections
=
10
)
server
=
Server
()
server
.
bind
(
8000
)
server
.
start
(
1
)
IOLoop
.
current
().
start
()
在我的電腦上大概 680 QPS。
asyncio 版是這樣:
import asyncio
import aiomysql
class
Server
(
asyncio
.
Protocol
)
:
def connection_made
(
self
,
transport
)
:
self
.
transport
=
transport
class
Server
(
asyncio
.
Protocol
)
:
def connection_made
(
self
,
transport
)
:
self
.
transport
=
transport
def data_received
(
self
,
data
)
:
@
asyncio
.
coroutine
def handle
()
:
with
(
yield from
pool
)
as
conn
:
cursor
=
yield from
conn
.
cursor
()
yield from
cursor
.
execute
(
b
"SELECT 1"
)
result
=
yield from
cursor
.
fetchone
()
try
:
self
.
transport
.
write
(
"HTTP/1.1 200 OKrnrn{0[0]}"
.
format
(
result
).
encode
())
finally
:
self
.
transport
.
close
()
loop
.
create_task
(
handle
())
# 或者 asyncio.async(handle())
@
asyncio
.
coroutine
def get_pool
()
:
return
(
yield from
aiomysql
.
create_pool
(
host
=
"127.0.0.1"
,
port
=
3306
,
user
=
"root"
,
password
=
"123"
,
loop
=
loop
))
loop
=
asyncio
.
get_event_loop
()
pool
=
loop
.
run_until_complete
(
get_pool
())
server
=
loop
.
create_server
(
Server
,
""
,
8000
)
loop
.
run_until_complete
(
server
)
loop
.
run_forever
()
在我的電腦上大概 1250 QPS,比 Tornado 版快了不少。不過寫起來比較蛋疼,因為 data_received 方法里不能直接用 yield from。
用 cProfile 看了下,Tornado 版在 tornado.gen 和 functools 模塊里花了不少時間,可能是非同步調用過多了吧。
但如果不做非同步庫的開發者,而只就使用者的體驗而言,Tornado 會顯得更加靈活和易用。不過 asyncio 的高級 API 應該也能提供類似的體驗。
順便再用底層 socket 模塊寫個伺服器試試。
先用 poll 看看,錯誤處理什麼的就先不做了:
from functools import partial
import select
import socket
class
Server
:
def __init__
(
self
)
:
self
.
_sock
=
socket
.
socket
()
self
.
_poll
=
select
.
poll
()
self
.
_handlers
=
{}
self
.
_fd_events
=
{}
def start
(
self
)
:
sock
=
self
.
_sock
sock
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
,
1
)
sock
.
setblocking
(
0
)
sock
.
bind
((
""
,
8000
))
sock
.
listen
(
100
)
handlers
=
self
.
_handlers
poll
=
self
.
_poll
self
.
add_handler
(
sock
.
fileno
(),
self
.
_accept
,
select
.
POLLIN
)
while
True
:
poll_events
=
poll
.
poll
(
1
)
for
fd
,
event
in
poll_events
:
handler
=
handlers
.
get
(
fd
)
if
handler
:
handler
()
def _accept
(
self
)
:
for
i
in
range
(
100
)
:
try
:
conn
,
address
=
self
.
_sock
.
accept
()
except
OSError
:
break
else
:
conn
.
setblocking
(
0
)
fd
=
conn
.
fileno
()
self
.
add_handler
(
fd
,
partial
(
self
.
_read
,
conn
),
select
.
POLLIN
)
def _read
(
self
,
conn
)
:
fd
=
conn
.
fileno
()
self
.
remove_handler
(
fd
)
try
:
conn
.
recv
(
1024
)
except
:
conn
.
close
()
raise
else
:
self
.
add_handler
(
fd
,
partial
(
self
.
_write
,
conn
),
select
.
POLLOUT
)
def _write
(
self
,
conn
)
:
fd
=
conn
.
fileno
()
self
.
remove_handler
(
fd
)
try
:
conn
.
send
(
b
"HTTP 1.0 200 OKrnrnhello world"
)
finally
:
conn
.
close
()
def add_handler
(
self
,
fd
,
handler
,
event
)
:
self
.
_handlers
[
fd
]
=
handler
self
.
register
(
fd
,
event
)
def remove_handler
(
self
,
fd
)
:
self
.
_handlers
.
pop
(
fd
,
None
)
self
.
unregister
(
fd
)
def register
(
self
,
fd
,
event
)
:
if
fd
in
self
.
_fd_events
:
raise IOError
(
"fd %s already registered"
%
fd
)
self
.
_poll
.
register
(
fd
,
event
)
self
.
_fd_events
[
fd
]
=
event
def unregister
(
self
,
fd
)
:
event
=
self
.
_fd_events
.
pop
(
fd
,
None
)
if
event
is
not
None
:
self
.
_poll
.
unregister
(
fd
)
Server
().
start
()
在我的電腦上大概 7700 QPS,優勢巨大。
再用 kqueue 試試(我用的是 OS X):
from functools import partial
import select
import socket
class
Server
:
def __init__
(
self
)
:
self
.
_sock
=
socket
.
socket
()
self
.
_kqueue
=
select
.
kqueue
()
self
.
_handlers
=
{}
self
.
_fd_events
=
{}
def start
(
self
)
:
sock
=
self
.
_sock
sock
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
,
1
)
sock
.
setblocking
(
0
)
sock
.
bind
((
""
,
8000
))
sock
.
listen
(
100
)
self
.
add_handler
(
sock
.
fileno
(),
self
.
_accept
,
select
.
KQ_FILTER_READ
)
handlers
=
self
.
_handlers
while
True
:
kevents
=
self
.
_kqueue
.
control
(
None
,
1000
,
1
)
for
kevent
in
kevents
:
fd
=
kevent
.
ident
handler
=
handlers
.
get
(
fd
)
if
handler
:
handler
()
def _accept
(
self
)
:
for
i
in
range
(
100
)
:
try
:
conn
,
address
=
self
.
_sock
.
accept
()
except
OSError
:
break
else
:
conn
.
setblocking
(
0
)
fd
=
conn
.
fileno
()
self
.
add_handler
(
fd
,
partial
(
self
.
_read
,
conn
),
select
.
KQ_FILTER_READ
)
def _read
(
self
,
conn
)
:
fd
=
conn
.
fileno
()
self
.
remove_handler
(
fd
)
try
:
conn
.
recv
(
1024
)
except
:
conn
.
close
()
raise
else
:
self
.
add_handler
(
fd
,
partial
(
self
.
_write
,
conn
),
select
.
KQ_FILTER_WRITE
)
def _write
(
self
,
conn
)
:
fd
=
conn
.
fileno
()
self
.
remove_handler
(
fd
)
try
:
conn
.
send
(
b
"HTTP 1.0 200 OKrnrnhello world"
)
finally
:
conn
.
close
()
def add_handler
(
self
,
fd
,
handler
,
event
)
:
self
.
_handlers
[
fd
]
=
handler
self
.
register
(
fd
,
event
)
def remove_handler
(
self
,
fd
)
:
self
.
_handlers
.
pop
(
fd
,
None
)
self
.
unregister
(
fd
)
def register
(
self
,
fd
,
event
)
:
if
fd
in
self
.
_fd_events
:
raise IOError
(
"fd %s already registered"
%
fd
)
self
.
_control
(
fd
,
event
,
select
.
KQ_EV_ADD
)
self
.
_fd_events
[
fd
]
=
event
def unregister
(
self
,
fd
)
:
event
=
self
.
_fd_events
.
pop
(
fd
,
None
)
if
event
is
not
None
:
self
.
_control
(
fd
,
event
,
select
.
KQ_EV_DELETE
)
def _control
(
self
,
fd
,
event
,
flags
)
:
change_list
=
(
select
.
kevent
(
fd
,
event
,
flags
),)
self
.
_kqueue
.
control
(
change_list
,
0
)
Server
().
start
()
在我的電腦上大概 7200 QPS,比 poll 版稍慢。不過因為只有 10 個並發連接,而且沒有慢速網路的影響,所以 poll 的性能好並不奇怪。
再試試 Python 3.4 新增的 selectors 模塊,它的 DefaultSelector 會自動選擇所在平台最高效的實現,asyncio 就用到了這個模塊。
import selectors
import socket
class
Server
:
def __init__
(
self
)
:
self
.
_sock
=
socket
.
socket
()
self
.
_selector
=
selectors
.
DefaultSelector
()
def start
(
self
)
:
sock
=
self
.
_sock
sock
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
,
1
)
sock
.
setblocking
(
0
)
sock
.
bind
((
""
,
8000
))
sock
.
listen
(
100
)
selector
=
self
.
_selector
self
.
add_handler
(
sock
.
fileno
(),
self
.
_accept
,
selectors
.
EVENT_READ
)
while
True
:
events
=
selector
.
select
(
1
)
for
key
,
event
in
events
:
handler
,
data
=
key
.
data
if
data
:
handler
(
**
data
)
else
:
handler
()
def _accept
(
self
)
:
for
i
in
range
(
100
)
:
try
:
conn
,
address
=
self
.
_sock
.
accept
()
except
OSError
:
break
else
:
conn
.
setblocking
(
0
)
fd
=
conn
.
fileno
()
self
.
add_handler
(
fd
,
self
.
_read
,
selectors
.
EVENT_READ
,
{
"conn"
:
conn
})
def _read
(
self
,
conn
)
:
fd
=
conn
.
fileno
()
self
.
remove_handler
(
fd
)
try
:
conn
.
recv
(
1024
)
except
:
conn
.
close
()
raise
else
:
self
.
add_handler
(
fd
,
self
.
_write
,
selectors
.
EVENT_WRITE
,
{
"conn"
:
conn
})
def _write
(
self
,
conn
)
:
fd
=
conn
.
fileno
()
self
.
remove_handler
(
fd
)
try
:
conn
.
send
(
b
"HTTP 1.0 200 OKrnrnhello world"
)
finally
:
conn
.
close
()
def add_handler
(
self
,
fd
,
handler
,
event
,
data
=
None
)
:
self
.
_selector
.
register
(
fd
,
event
,
(
handler
,
data
))
def remove_handler
(
self
,
fd
)
:
self
.
_selector
.
unregister
(
fd
)
Server
().
start
()
在我的電腦上大概 6100 QPS,成績也還不錯。
從這些測試來看,如果想自己實現一個捨棄了一些功能和兼容性的 Tornado,應該能比它稍快一點,不過似乎沒多大必要。
所以暫時不糾結性能了,還是從使用的便利性上來考慮。Tornado 可以用 yield 取代 callback,我們也來實現這個 feature。
實現前先得了解下 yield。
當一個函數內部出現了 yield 語句時,它就不再是一個單純的函數了,而是一個生成器函數,調用它並不會執行它的代碼,而是返回一個生成器。
調用這個生成器的 send 方法時,才會執行內部的代碼。當執行到 yield 時,這個 send 方法就返回了,調用者可以得到其返回值。
send 方法在第一次調用時,參數必須為 None。Python 2 中可以用它的 next 方法,Python 3 中改成了 __next__ 方法,還可以用內置的 next 函數來調用。
send 方法可以被多次調用,參數會作為 yield 的返回值,回到生成器內上一次執行的地方,並繼續執行下去。
當生成器的代碼執行完時,會拋出一個 StopIteration 的異常。Python 3.3 開始可以在生成器里使用 return,返回值可以從 StopIteration 異常的 value 屬性獲取。
for … in … 循環會自動捕獲 StopIteration 異常,並作為循環停止的條件。
由此可見,yield 可以用於跳轉。而我們要做的,則是在遇到 IO 請求時,用 yield 返回 IO loop;當事件發生時,找到對應的生成器,用 send 方法繼續執行即可。
為了簡單起見,我就在 poll 版的基礎上進行改造了:
在我的電腦上大概 5300 QPS。
雖然成績比較尷尬,但畢竟用起來比前一個版本好多了。至於慢的原因,我估計是自己維護了一個堆棧的原因(也可能是有什麼 bug,畢竟寫這個感覺太跳躍了,能運行起來就謝天謝地了)。
實現時做了兩點假設:
handler 為 generator 時,視為非同步方法。
在非同步方法中 yield None 時,視為等待 IO;yield / yield from 非同步方法時,則是等待方法返回。
實現細節也沒什麼好說的了,只是覺得在實現 Stream 的 read / write 方法時,調用 IOLoop.add_handler 方法不太優雅。其實可以直接 yield 一個 fd 和 event,在 IOLoop.start 方法中再去註冊。不過這個重構其實蠻小的,我就不再貼一次代碼了,感興趣的可以自己試試。
於是這次初探就到此為止了,有空我也許會繼續完善它。至少這次探索,讓我覺得 Python 3 還是蠻有意思的。
看完本文有收穫?請轉
發分享給更多人
關注「P
ython開發者」,提升Python技能


※一個 Reentrant Error 引發的對 Python 信號機制的探索和思考
※為提高用戶體驗,Yelp 是如何無損壓縮圖片的
※BAT 面試官帶你刷真題、過筆試
※K-means 在 Python 中的實現
TAG:Python開發者 |