當前位置:
首頁 > 知識 > 初探 Python 3 的非同步 IO 編程

初探 Python 3 的非同步 IO 編程

(點擊

上方藍字

,快速關注我們)




來源:keakon


www.keakon.net/2015/09/07/初探Python3的非同步IO編程


如有好文章投稿,請點擊 → 這裡了解詳情





小編註:本文寫於2015年9月7日





上周終於把知乎日報的新版本做完了,於是趁著這幾天的休息,有精力折騰一些感興趣的玩意了。




雖然工作時並不會接觸到 Python 3,但還是對它抱有不少好奇心,於是把 Python 版本更新到了 3.4,開始了折騰之旅。在各種更新中,我最感興趣的當屬 asyncio 模塊了,所以就從非同步 IO 開始探索吧。 




探索之前,先簡單介紹下各種 IO 模型:




最容易做的是阻塞 IO,即讀寫數據時,需要等待操作完成,才能繼續執行。進階的做法就是用多線程來處理需要 IO 的部分,缺點是開銷會有些大。



接著是非阻塞 IO,即讀寫數據時,如果暫時不可讀寫,則立刻返回,而不等待。因為不知道什麼時候是可讀寫的,所以輪詢時可能會浪費 CPU 時間。




然後是 IO 復用,即在讀寫數據前,先檢查哪些描述符是可讀寫的,再去讀寫。select 和 poll 就是這樣做的,它們會遍歷所有被監視的描述符,查看是否滿足,這個檢查的過程是阻塞的。而 epoll、kqueue 和 /dev/poll 則做了些改進,事先註冊需要檢查哪些描述符的哪些事件,當狀態發生變化時,內核會調用對應的回調函數,將這些描述符保存下來;下次獲取可用的描述符時,直接返回這些發生變化的描述符即可。




再之後是信號驅動,即描述符就緒時,內核發送 SIGIO 信號,再由信號處理程序去處理這些信號即可。不過信號處理的時機是從內核態返回用戶態時,感覺也得把這些事件收集起來才好處理,有點像模擬 IO 復用了。




最後是非同步 IO,即讀寫數據時,只註冊事件,內核完成讀寫後(讀取的數據會複製到用戶態),再調用事件處理函數。這整個過程都不會阻塞調用線程,不過實現它的操作系統比較少,Windows 上有比較成熟的 IOCP,Linux 上的 AIO 則有不少缺點。




雖然真正的非同步 IO 需要中間任何步驟都沒有阻塞,這對於某些只是偶爾需要處理 IO 請求的情況確實有用(比如文本編輯器偶爾保存一下文件);但對於伺服器端編程的大多數情況而言,它的主線程就是用來處理 IO 請求的,如果在空閑時不阻塞在 IO 等待上,也沒有別的事情能做,所以本文就不糾結這個非同步是否名副其實了。



在 Python 2 的時代,高性能的網路編程主要是使用 Twisted、Tornado 和 gevent 這三個庫。




我對 Twisted 不熟,只知道它的缺點是比較重,性能相對而言並不算好。


Tornado 平時用得比較多,缺點是寫非同步調用時特別麻煩。gevent 我只能算接觸過,缺點是不太乾淨。




由於它們都各自有一個 IO loop,不好混用,而 Tornado 的 web 框架相對而言比較完善,因此成了我的首選。




而從 Python 3.4 開始,標準庫里又新增了 asyncio 這個模塊。




從原理上來說,它和 Tornado 其實差不多,都是註冊 IO 事件,然後在 IO loop 中等待事件發生,然後調用相應的處理函數。



不同之處在於 Python 3 增加了一些新的特性,而 Tornado 需要兼容 Python 2,所以寫起來會比較麻煩。




舉例來說,Python 3.3 可以在 generator 中 return 返回值(相當於 raise StopIteration),而 Tornado 中需要 raise 一個 Return 對象。此外,Python 3.3 還增加了 yield from 語法,減輕了在 generator 中處理另一個 generator 的工作量(省去了循環和 try … except …)。




不過,雖然 asyncio 有那麼多得天獨厚的優勢,卻不一定比 Tornado 的性能更好,所以我寫個簡單的例子測試一下。




比較方法就是寫個最簡單的 HTTP 伺服器,不做任何檢查,讀取到任何內容都輸出一個 hello world,並斷開連接。




測試的客戶端就懶得寫了,直接用 ab 即可:




ab -n 10000 -c 10 "http://0.0.0.0:8000/"




Tornado 版是這樣:





from

tornado

.

gen import coroutine


from

tornado

.

ioloop import IOLoop


from

tornado

.

tcpserver import TCPServer


 


class

Server

(

TCPServer

)

:


    

@

coroutine


    def handle_stream

(

self

,

stream

,

address

)

:


        

try

:


            

yield

stream

.

read_bytes

(

1024

,

partial

=

True

)


            

yield

stream

.

write

(

b

"HTTP 1.0 200 OKrnrnhello world"

)


        

finally

:


            

stream

.

close

()


 


server

=

Server

()


server

.

bind

(

8000

)


server

.

start

(

1

)


IOLoop

.

current

().

start

()




在我的電腦上大概 4000 QPS。




asyncio 版是這樣:





import asyncio


 


class

Server

(

asyncio

.

Protocol

)

:


    

def connection_made

(

self

,

transport

)

:


        

self

.

transport

=

transport


 


    def data_received

(

self

,

data

)

:


        

try

:


            

self

.

transport

.

write

(

b

"HTTP/1.1 200 OKrnrnhello world"

)


        

finally

:


            

self

.

transport

.

close

()


 


loop

=

asyncio

.

get_event_loop

()


server

=

loop

.

create_server

(

Server

,

""

,

8000

)


loop

.

run_until_complete

(

server

)


loop

.

run_forever

()




在我的電腦上大概 3000 QPS,比 Tornado 版慢了一些。此外,asyncio 的 transport 在 write 時不用 yield from,這點可能有些不一致。




asyncio 還有個高級版的 API:





import

asyncio


 


@

asyncio

.

coroutine


def handle

(

reader

,

writer

)

:


    

yield from

reader

.

read

(

1024

)


    

writer

.

write

(

b

"HTTP/1.1 200 OKrnrnhello world"

)


    

yield from

writer

.

drain

()


    

writer

.

close

()


 


loop

=

asyncio

.

get_event_loop

()


task

=

asyncio

.

start_server

(

handle

,

""

,

8000

,

loop

=

loop

)


server

=

loop

.

run_until_complete

(

task

)


loop

.

run_forever

()




在我的電腦上大概 2200 QPS。這下讀寫都要 yield from 了,一致性上來說會好些。




以框架的性能而言,其實都夠用,開銷都不超過 1 毫秒,而 web 請求一般都需要 10 毫秒的以上的處理時間。




於是順便再測一下和 MySQL 的搭配,即在每個請求內調用一下 SELECT 1,然後輸出返回值。




因為自己懶得寫客戶端了,於是就用現成的 tornado_mysql 和 aiomysql 來測試了。原理應該都差不多,發送寫請求後就返回,等收到可讀事件時再獲取內容。




Tornado 版是這樣:





from

tornado

.

gen import coroutine


from

tornado

.

ioloop import IOLoop


from

tornado

.

tcpserver import TCPServer


from tornado_mysql import pools


 


class

Server

(

TCPServer

)

:


    

@

coroutine


    def handle_stream

(

self

,

stream

,

address

)

:


        

try

:


            

yield

stream

.

read_bytes

(

1024

,

partial

=

True

)


            

cursor

=

yield

POOL

.

execute

(

b

"SELECT 1"

)


            

data

=

cursor

.

fetchone

()


            

yield

stream

.

write

(

"HTTP/1.1 200 OKrnrn{0[0]}"

.

format

(

data

).

encode

())

  

# Python 3.5 的 bytes 才能用 % 格式化


        

finally

:


            

stream

.

close

()


 


POOL

=

pools

.

Pool

(


    

dict

(

host

=

"127.0.0.1"

,

port

=

3306

,

user

=

"root"

,

passwd

=

"123"

,

db

=

"mysql"

),


    

max_idle_connections

=

10

,


    

max_open_connections

=

10

)


 


server

=

Server

()


server

.

bind

(

8000

)


server

.

start

(

1

)


IOLoop

.

current

().

start

()




在我的電腦上大概 680 QPS。




asyncio 版是這樣:





import asyncio


 


import aiomysql


 


class

Server

(

asyncio

.

Protocol

)

:


    

def connection_made

(

self

,

transport

)

:


        

self

.

transport

=

transport


 


class

Server

(

asyncio

.

Protocol

)

:


    

def connection_made

(

self

,

transport

)

:


        

self

.

transport

=

transport


 


    def data_received

(

self

,

data

)

:


        

@

asyncio

.

coroutine


        def handle

()

:


            

with

(

yield from

pool

)

as

conn

:


                

cursor

=

yield from

conn

.

cursor

()


                

yield from

cursor

.

execute

(

b

"SELECT 1"

)


                

result

=

yield from

cursor

.

fetchone

()


            

try

:


                

self

.

transport

.

write

(

"HTTP/1.1 200 OKrnrn{0[0]}"

.

format

(

result

).

encode

())


            

finally

:


                

self

.

transport

.

close

()


        

loop

.

create_task

(

handle

())

  

# 或者 asyncio.async(handle())


 


@

asyncio

.

coroutine


def get_pool

()

:


    

return

(

yield from

aiomysql

.

create_pool

(

host

=

"127.0.0.1"

,

port

=

3306

,

user

=

"root"

,

password

=

"123"

,

loop

=

loop

))


 


loop

=

asyncio

.

get_event_loop

()


pool

=

loop

.

run_until_complete

(

get_pool

())


 


server

=

loop

.

create_server

(

Server

,

""

,

8000

)


loop

.

run_until_complete

(

server

)


loop

.

run_forever

()




在我的電腦上大概 1250 QPS,比 Tornado 版快了不少。不過寫起來比較蛋疼,因為 data_received 方法里不能直接用 yield from。




用 cProfile 看了下,Tornado 版在 tornado.gen 和 functools 模塊里花了不少時間,可能是非同步調用過多了吧。




但如果不做非同步庫的開發者,而只就使用者的體驗而言,Tornado 會顯得更加靈活和易用。不過 asyncio 的高級 API 應該也能提供類似的體驗。




順便再用底層 socket 模塊寫個伺服器試試。




先用 poll 看看,錯誤處理什麼的就先不做了:





from functools import partial


import select


import socket


 


class

Server

:


    

def __init__

(

self

)

:


        

self

.

_sock

=

socket

.

socket

()


        

self

.

_poll

=

select

.

poll

()


        

self

.

_handlers

=

{}


        

self

.

_fd_events

=

{}


 


    

def start

(

self

)

:


        

sock

=

self

.

_sock


        

sock

.

setsockopt

(

socket

.

SOL_SOCKET

,

socket

.

SO_REUSEADDR

,

1

)


        

sock

.

setblocking

(

0

)


        

sock

.

bind

((

""

,

8000

))


        

sock

.

listen

(

100

)


 


        

handlers

=

self

.

_handlers


        

poll

=

self

.

_poll


        

self

.

add_handler

(

sock

.

fileno

(),

self

.

_accept

,

select

.

POLLIN

)


 


        

while

True

:


            

poll_events

=

poll

.

poll

(

1

)


            

for

fd

,

event

in

poll_events

:


                

handler

=

handlers

.

get

(

fd

)


                

if

handler

:


                    

handler

()


 


    

def _accept

(

self

)

:


        

for

i

in

range

(

100

)

:


            

try

:


                

conn

,

address

=

self

.

_sock

.

accept

()


            

except

OSError

:


                

break


            

else

:


                

conn

.

setblocking

(

0

)


                

fd

=

conn

.

fileno

()


                

self

.

add_handler

(

fd

,

partial

(

self

.

_read

,

conn

),

select

.

POLLIN

)


 


    

def _read

(

self

,

conn

)

:


        

fd

=

conn

.

fileno

()


        

self

.

remove_handler

(

fd

)


        

try

:


            

conn

.

recv

(

1024

)


        

except

:


            

conn

.

close

()


            

raise


        

else

:


            

self

.

add_handler

(

fd

,

partial

(

self

.

_write

,

conn

),

select

.

POLLOUT

)


 


    

def _write

(

self

,

conn

)

:


        

fd

=

conn

.

fileno

()


        

self

.

remove_handler

(

fd

)


        

try

:


            

conn

.

send

(

b

"HTTP 1.0 200 OKrnrnhello world"

)


        

finally

:


            

conn

.

close

()


 


    

def add_handler

(

self

,

fd

,

handler

,

event

)

:


        

self

.

_handlers

[

fd

]

=

handler


        

self

.

register

(

fd

,

event

)


 


    

def remove_handler

(

self

,

fd

)

:


        

self

.

_handlers

.

pop

(

fd

,

None

)


        

self

.

unregister

(

fd

)


 


    

def register

(

self

,

fd

,

event

)

:


        

if

fd

in

self

.

_fd_events

:


            

raise IOError

(

"fd %s already registered"

%

fd

)


        

self

.

_poll

.

register

(

fd

,

event

)


        

self

.

_fd_events

[

fd

]

=

event


 


    def unregister

(

self

,

fd

)

:


        

event

=

self

.

_fd_events

.

pop

(

fd

,

None

)


        

if

event

is

not

None

:


            

self

.

_poll

.

unregister

(

fd

)


 


Server

().

start

()




在我的電腦上大概 7700 QPS,優勢巨大。




再用 kqueue 試試(我用的是 OS X):





from functools import partial


import select


import socket


 


class

Server

:


    

def __init__

(

self

)

:


        

self

.

_sock

=

socket

.

socket

()


        

self

.

_kqueue

=

select

.

kqueue

()


        

self

.

_handlers

=

{}


        

self

.

_fd_events

=

{}


 


    

def start

(

self

)

:


        

sock

=

self

.

_sock


        

sock

.

setsockopt

(

socket

.

SOL_SOCKET

,

socket

.

SO_REUSEADDR

,

1

)


        

sock

.

setblocking

(

0

)


        

sock

.

bind

((

""

,

8000

))


        

sock

.

listen

(

100

)


 


        

self

.

add_handler

(

sock

.

fileno

(),

self

.

_accept

,

select

.

KQ_FILTER_READ

)


        

handlers

=

self

.

_handlers


 


        

while

True

:


            

kevents

=

self

.

_kqueue

.

control

(

None

,

1000

,

1

)


            

for

kevent

in

kevents

:


                

fd

=

kevent

.

ident


                

handler

=

handlers

.

get

(

fd

)


                

if

handler

:


                    

handler

()


 


    

def _accept

(

self

)

:


        

for

i

in

range

(

100

)

:


            

try

:


                

conn

,

address

=

self

.

_sock

.

accept

()


            

except

OSError

:


                

break


            

else

:


                

conn

.

setblocking

(

0

)


                

fd

=

conn

.

fileno

()


                

self

.

add_handler

(

fd

,

partial

(

self

.

_read

,

conn

),

select

.

KQ_FILTER_READ

)


 


    

def _read

(

self

,

conn

)

:


        

fd

=

conn

.

fileno

()


        

self

.

remove_handler

(

fd

)


        

try

:


            

conn

.

recv

(

1024

)


        

except

:


            

conn

.

close

()


            

raise


        

else

:


            

self

.

add_handler

(

fd

,

partial

(

self

.

_write

,

conn

),

select

.

KQ_FILTER_WRITE

)


 


    

def _write

(

self

,

conn

)

:


        

fd

=

conn

.

fileno

()


        

self

.

remove_handler

(

fd

)


        

try

:


            

conn

.

send

(

b

"HTTP 1.0 200 OKrnrnhello world"

)


        

finally

:


            

conn

.

close

()


 


    

def add_handler

(

self

,

fd

,

handler

,

event

)

:


        

self

.

_handlers

[

fd

]

=

handler


        

self

.

register

(

fd

,

event

)


 


    

def remove_handler

(

self

,

fd

)

:


        

self

.

_handlers

.

pop

(

fd

,

None

)


        

self

.

unregister

(

fd

)


 


    

def register

(

self

,

fd

,

event

)

:


        

if

fd

in

self

.

_fd_events

:


            

raise IOError

(

"fd %s already registered"

%

fd

)


        

self

.

_control

(

fd

,

event

,

select

.

KQ_EV_ADD

)


        

self

.

_fd_events

[

fd

]

=

event


 


    def unregister

(

self

,

fd

)

:


        

event

=

self

.

_fd_events

.

pop

(

fd

,

None

)


        

if

event

is

not

None

:


            

self

.

_control

(

fd

,

event

,

select

.

KQ_EV_DELETE

)


 


    

def _control

(

self

,

fd

,

event

,

flags

)

:


        

change_list

=

(

select

.

kevent

(

fd

,

event

,

flags

),)


        

self

.

_kqueue

.

control

(

change_list

,

0

)


 


Server

().

start

()




在我的電腦上大概 7200 QPS,比 poll 版稍慢。不過因為只有 10 個並發連接,而且沒有慢速網路的影響,所以 poll 的性能好並不奇怪。




再試試 Python 3.4 新增的 selectors 模塊,它的 DefaultSelector 會自動選擇所在平台最高效的實現,asyncio 就用到了這個模塊。





import selectors


import socket


 


class

Server

:


    

def __init__

(

self

)

:


        

self

.

_sock

=

socket

.

socket

()


        

self

.

_selector

=

selectors

.

DefaultSelector

()


 


    

def start

(

self

)

:


        

sock

=

self

.

_sock


        

sock

.

setsockopt

(

socket

.

SOL_SOCKET

,

socket

.

SO_REUSEADDR

,

1

)


        

sock

.

setblocking

(

0

)


        

sock

.

bind

((

""

,

8000

))


        

sock

.

listen

(

100

)


 


        

selector

=

self

.

_selector


        

self

.

add_handler

(

sock

.

fileno

(),

self

.

_accept

,

selectors

.

EVENT_READ

)


 


        

while

True

:


            

events

=

selector

.

select

(

1

)


            

for

key

,

event

in

events

:


                

handler

,

data

=

key

.

data


                

if

data

:


                    

handler

(

**

data

)


                

else

:


                    

handler

()


 


    

def _accept

(

self

)

:


        

for

i

in

range

(

100

)

:


            

try

:


                

conn

,

address

=

self

.

_sock

.

accept

()


            

except

OSError

:


                

break


            

else

:


                

conn

.

setblocking

(

0

)


                

fd

=

conn

.

fileno

()


                

self

.

add_handler

(

fd

,

self

.

_read

,

selectors

.

EVENT_READ

,

{

"conn"

:

conn

})


 


    

def _read

(

self

,

conn

)

:


        

fd

=

conn

.

fileno

()


        

self

.

remove_handler

(

fd

)


        

try

:


            

conn

.

recv

(

1024

)


        

except

:


            

conn

.

close

()


            

raise


        

else

:


            

self

.

add_handler

(

fd

,

self

.

_write

,

selectors

.

EVENT_WRITE

,

{

"conn"

:

conn

})


 


    

def _write

(

self

,

conn

)

:


        

fd

=

conn

.

fileno

()


        

self

.

remove_handler

(

fd

)


        

try

:


            

conn

.

send

(

b

"HTTP 1.0 200 OKrnrnhello world"

)


        

finally

:


            

conn

.

close

()


 


    

def add_handler

(

self

,

fd

,

handler

,

event

,

data

=

None

)

:


        

self

.

_selector

.

register

(

fd

,

event

,

(

handler

,

data

))


 


    

def remove_handler

(

self

,

fd

)

:


        

self

.

_selector

.

unregister

(

fd

)


 


Server

().

start

()




在我的電腦上大概 6100 QPS,成績也還不錯。




從這些測試來看,如果想自己實現一個捨棄了一些功能和兼容性的 Tornado,應該能比它稍快一點,不過似乎沒多大必要。




所以暫時不糾結性能了,還是從使用的便利性上來考慮。Tornado 可以用 yield 取代 callback,我們也來實現這個 feature。




實現前先得了解下 yield。




當一個函數內部出現了 yield 語句時,它就不再是一個單純的函數了,而是一個生成器函數,調用它並不會執行它的代碼,而是返回一個生成器。




調用這個生成器的 send 方法時,才會執行內部的代碼。當執行到 yield 時,這個 send 方法就返回了,調用者可以得到其返回值。




send 方法在第一次調用時,參數必須為 None。Python 2 中可以用它的 next 方法,Python 3 中改成了 __next__ 方法,還可以用內置的 next 函數來調用。


send 方法可以被多次調用,參數會作為 yield 的返回值,回到生成器內上一次執行的地方,並繼續執行下去。




當生成器的代碼執行完時,會拋出一個 StopIteration 的異常。Python 3.3 開始可以在生成器里使用 return,返回值可以從 StopIteration 異常的 value 屬性獲取。




for … in … 循環會自動捕獲 StopIteration 異常,並作為循環停止的條件。




由此可見,yield 可以用於跳轉。而我們要做的,則是在遇到 IO 請求時,用 yield 返回 IO loop;當事件發生時,找到對應的生成器,用 send 方法繼續執行即可。




為了簡單起見,我就在 poll 版的基礎上進行改造了:







在我的電腦上大概 5300 QPS。




雖然成績比較尷尬,但畢竟用起來比前一個版本好多了。至於慢的原因,我估計是自己維護了一個堆棧的原因(也可能是有什麼 bug,畢竟寫這個感覺太跳躍了,能運行起來就謝天謝地了)。


實現時做了兩點假設:






  1. handler 為 generator 時,視為非同步方法。



  2. 在非同步方法中 yield None 時,視為等待 IO;yield / yield from 非同步方法時,則是等待方法返回。




實現細節也沒什麼好說的了,只是覺得在實現 Stream 的 read / write 方法時,調用 IOLoop.add_handler 方法不太優雅。其實可以直接 yield 一個 fd 和 event,在 IOLoop.start 方法中再去註冊。不過這個重構其實蠻小的,我就不再貼一次代碼了,感興趣的可以自己試試。




於是這次初探就到此為止了,有空我也許會繼續完善它。至少這次探索,讓我覺得 Python 3 還是蠻有意思的。




看完本文有收穫?請轉

發分享給更多人


關注「P

ython開發者」,提升Python技能


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 Python開發者 的精彩文章:

一個 Reentrant Error 引發的對 Python 信號機制的探索和思考
為提高用戶體驗,Yelp 是如何無損壓縮圖片的
BAT 面試官帶你刷真題、過筆試
K-means 在 Python 中的實現

TAG:Python開發者 |