Python 多進程教程
(點擊
上方藍字
,快速關注我們)
編譯:伯樂在線 - tsteho
如有好文章投稿,請點擊 → 這裡了解詳情
Python2.6版本中新添了multiprocessing模塊。它最初由Jesse Noller和Richard Oudkerk定義在PEP 371中。就像你能通過threading模塊衍生線程一樣,multiprocessing 模塊允許你衍生進程。這裡用到的思想:因為你現在能衍生進程,所以你能夠避免使用全局解釋器鎖(GIL),並且充分利用機器的多個處理器。
多進程包也包含一些根本不在threading 模塊中的API。比如:有一個靈活的Pool類能讓你在多個輸入下並行化地執行函數。我們將在後面的小節講解Pool類。我們將以multiprocessing模塊的Process類開始講解。
開始學習multiprocessing模塊
Process這個類和threading模塊中的Thread類很像。讓我們創建一系列調用相同函數的進程,並且看看它是如何工作的。
import
os
from
multiprocessing
import
Process
def
doubler
(
number
)
:
"""
A doubling function that can be used by a process
"""
result
=
number
*
2
proc
=
os
.
getpid
()
(
"{0} doubled to {1} by process id: {2}"
.
format
(
number
,
result
,
proc
))
if
__name__
==
"__main__"
:
numbers
=
[
5
,
10
,
15
,
20
,
25
]
procs
=
[]
for
index
,
number
in
enumerate
(
numbers
)
:
proc
=
Process
(
target
=
doubler
,
args
=
(
number
,))
procs
.
append
(
proc
)
proc
.
start
()
for
proc
in
procs
:
proc
.
join
()
對於上面的例子,我們導入Process類、創建一個叫doubler的函數。在函數中,我們將傳入的數字乘上2。我們也用Python的os模塊來獲取當前進程的ID(pid)。這個ID將告訴我們哪個進程正在調用doubler函數。然後,在下面的代碼塊中,我們實例化了一系列的Process類並且啟動它們。最後一個循環只是調用每個進程的join()方法,該方法告訴Python等待進程直到它結束。如果你需要結束一個進程,你可以調用它的terminate()方法。
當你運行上面的代碼,你應該看到和下面類似的輸出結果:
5
doubled
to
10
by process id
:
10468
10
doubled
to
20
by process id
:
10469
15
doubled
to
30
by process id
:
10470
20
doubled
to
40
by process id
:
10471
25
doubled
to
50
by process id
:
10472
有時候,你最好給你的進程取一個易於理解的名字 。幸運的是,Process類確實允許你訪問同樣的進程。讓我們來看看如下例子:
import
os
from
multiprocessing
import
Process
,
current_process
def
doubler
(
number
)
:
"""
A doubling function that can be used by a process
"""
result
=
number
*
2
proc_name
=
current_process
().
name
(
"{0} doubled to {1} by: {2}"
.
format
(
number
,
result
,
proc_name
))
if
__name__
==
"__main__"
:
numbers
=
[
5
,
10
,
15
,
20
,
25
]
procs
=
[]
proc
=
Process
(
target
=
doubler
,
args
=
(
5
,))
for
index
,
number
in
enumerate
(
numbers
)
:
proc
=
Process
(
target
=
doubler
,
args
=
(
number
,))
procs
.
append
(
proc
)
proc
.
start
()
proc
=
Process
(
target
=
doubler
,
name
=
"Test"
,
args
=
(
2
,))
proc
.
start
()
procs
.
append
(
proc
)
for
proc
in
procs
:
proc
.
join
()
這一次,我們多導入了current_process。current_process基本上和threading模塊的current_thread是類似的東西。我們用它來獲取正在調用我們的函數的線程的名字。你將注意到我們沒有給前面的5個進程設置名字。然後我們將第6個進程的名字設置為「Test」。
讓我們看看我們將得到什麼樣的輸出結果:
5
doubled
to
10
by
:
Process
-
2
10
doubled
to
20
by
:
Process
-
3
15
doubled
to
30
by
:
Process
-
4
20
doubled
to
40
by
:
Process
-
5
25
doubled
to
50
by
:
Process
-
6
2
doubled
to
4
by
:
Test
輸出結果說明:默認情況下,multiprocessing模塊給每個進程分配了一個編號,而該編號被用來組成進程的名字的一部分。當然,如果我們給定了名字的話,並不會有編號被添加到名字中。
鎖
multiprocessing模塊支持鎖,它和threading模塊做的方式一樣。你需要做的只是導入Lock,獲取它,做一些事,釋放它。
from
multiprocessing
import
Process
,
Lock
def
printer
(
item
,
lock
)
:
"""
Prints out the item that was passed in
"""
lock
.
acquire
()
try
:
(
item
)
finally
:
lock
.
release
()
if
__name__
==
"__main__"
:
lock
=
Lock
()
items
=
[
"tango"
,
"foxtrot"
,
10
]
for
item
in
items
:
p
=
Process
(
target
=
printer
,
args
=
(
item
,
lock
))
p
.
start
()
我們在這裡創建了一個簡單的用於列印函數,你輸入什麼,它就輸出什麼。為了避免線程之間互相阻塞,我們使用Lock對象。代碼循環列表中的三個項並為它們各自都創建一個進程。每一個進程都將調用我們的函數,並且每次遍歷到的那一項作為參數傳入函數。因為我們現在使用了鎖,所以隊列中下一個進程將一直阻塞,直到之前的進程釋放鎖。
日誌
為進程創建日誌與為線程創建日誌有一些不同。它們存在不同是因為Python的logging包不使用共享鎖的進程,因此有可能以來自不同進程的信息作為結束的標誌。讓我們試著給前面的例子添加基本的日誌。代碼如下:
import
logging
import
multiprocessing
from
multiprocessing
import
Process
,
Lock
def
printer
(
item
,
lock
)
:
"""
Prints out the item that was passed in
"""
lock
.
acquire
()
try
:
(
item
)
finally
:
lock
.
release
()
if
__name__
==
"__main__"
:
lock
=
Lock
()
items
=
[
"tango"
,
"foxtrot"
,
10
]
multiprocessing
.
log_to_stderr
()
logger
=
multiprocessing
.
get_logger
()
logger
.
setLevel
(
logging
.
INFO
)
for
item
in
items
:
p
=
Process
(
target
=
printer
,
args
=
(
item
,
lock
))
p
.
start
()
最簡單的添加日誌的方法通過推送它到stderr實現。我們能通過調用thelog_to_stderr() 函數來實現該方法。然後我們調用get_logger 函數獲得一個logger實例,並將它的日誌等級設為INFO。之後的代碼是相同的。需要提示下這裡我並沒有調用join()方法。取而代之的:當它退出,父線程將自動調用join()方法。
當你這麼做了,你應該得到類似下面的輸出:
[
INFO
/
Process
-
1
]
child process calling
self
.
run
()
tango
[
INFO
/
Process
-
1
]
process shutting
down
[
INFO
/
Process
-
1
]
process exiting
with
exitcode
0
[
INFO
/
Process
-
2
]
child process calling
self
.
run
()
[
INFO
/
MainProcess
]
process shutting down
foxtrot
[
INFO
/
Process
-
2
]
process shutting
down
[
INFO
/
Process
-
3
]
child process calling
self
.
run
()
[
INFO
/
Process
-
2
]
process exiting
with
exitcode
0
10
[
INFO
/
MainProcess
]
calling join
()
for
process
Process
-
3
[
INFO
/
Process
-
3
]
process shutting
down
[
INFO
/
Process
-
3
]
process exiting
with
exitcode
0
[
INFO
/
MainProcess
]
calling join
()
for
process
Process
-
2
現在如果你想要保存日誌到硬碟中,那麼這件事就顯得有些棘手。你能在Python的logging Cookbook閱讀一些有關那類話題。
Pool類
Pool類被用來代表一個工作進程池。它有讓你將任務轉移到工作進程的方法。讓我們看下面一個非常簡單的例子。
from
multiprocessing
import
Pool
def
doubler
(
number
)
:
return
number
*
2
if
__name__
==
"__main__"
:
numbers
=
[
5
,
10
,
20
]
pool
=
Pool
(
processes
=
3
)
(
pool
.
map
(
doubler
,
numbers
))
基本上執行上述代碼之後,一個Pool的實例被創建,並且該實例創建了3個工作進程。然後我們使用map 方法將一個函數和一個可迭代對象映射到每個進程。最後我們列印出這個例子的結果:[10, 20, 40]。
你也能通過apply_async方法獲得池中進程的運行結果:
from
multiprocessing
import
Pool
def
doubler
(
number
)
:
return
number
*
2
if
__name__
==
"__main__"
:
pool
=
Pool
(
processes
=
3
)
result
=
pool
.
apply_async
(
doubler
,
(
25
,))
(
result
.
get
(
timeout
=
1
))
我們上面做的事實際上就是請求進程的運行結果。那就是get函數的用途。它嘗試去獲取我們的結果。你能夠注意到我們設置了timeout,這是為了預防我們調用的函數發生異常的情況。畢竟我們不想要它被無限期地阻塞。
進程通信
當遇到進程間通信的情況,multiprocessing 模塊提供了兩個主要的方法:Queues 和 Pipes。Queue 實現上既是線程安全的也是進程安全的。讓我們看一個相當簡單的並且基於 Queue的例子。代碼來自於我的文章(threading articles)。
from
multiprocessing
import
Process
,
Queue
sentinel
= -
1
def
creator
(
data
,
q
)
:
"""
Creates data to be consumed and waits for the consumer
to finish processing
"""
(
"Creating data and putting it on the queue"
)
for
item
in
data
:
q
.
put
(
item
)
def
my_consumer
(
q
)
:
"""
Consumes some data and works on it
In this case, all it does is double the input
"""
while
True
:
data
=
q
.
get
()
(
"data found to be processed: {}"
.
format
(
data
))
processed
=
data
*
2
(
processed
)
if
data
is
sentinel
:
break
if
__name__
==
"__main__"
:
q
=
Queue
()
data
=
[
5
,
10
,
13
,
-
1
]
process_one
=
Process
(
target
=
creator
,
args
=
(
data
,
q
))
process_two
=
Process
(
target
=
my_consumer
,
args
=
(
q
,))
process_one
.
start
()
process_two
.
start
()
q
.
close
()
q
.
join_thread
()
process_one
.
join
()
process_two
.
join
()
在這裡我們只需要導入Queue和Process。Queue用來創建數據和添加數據到隊列中,Process用來消耗數據並執行它。通過使用Queue的put()和get()方法,我們就能添加數據到Queue、從Queue獲取數據。代碼的最後一塊只是創建了Queue 對象以及兩個Process對象,並且運行它們。你能注意到我們在進程對象上調用join()方法,而不是在Queue本身上調用。
總結
我們這裡有大量的資料。你已經學習如何使用multiprocessing模塊指定不變的函數、使用Queues在進程間通信、給進程命名等很多事。在Python文檔中也有很多本文沒有接觸到的知識點,因此也務必深入了解下文檔。與此同時,你現在知道如何用Python利用你電腦所有的處理能力了!
相關閱讀
有關multiprocessing模塊的Python文檔(multiprocessing module)
Python模塊周刊:multiprocessing
Python的並發–Porting a Queue to multiprocessing
看完本文有收穫?請轉
發分享給更多人
關注「P
ython開發者」,提升Python技能
※Python 模擬登陸百度雲盤實戰教程
※Houdini中Expressions,HScript,Python,VEX區別
※如何用 Python 檢測偽造的視頻
※為什麼數據科學家偏愛Python
※超詳細的Python實現新浪微博模擬登陸
TAG:Python |
※Python多進程編程
※深入Python多進程編程基礎
※入門Python多線程/多進程編程
※Python學習之進程和線程
※Python網路編程——進程
※Python學習之多進程詳解
※Android 進程和線程
※Python的分散式進程
※python 進程實現多任務
※用 Python 管理系統進程
※python threading中處理主進程和子線程的關係
※Python 並發編程之線程池/進程池
※深入Python多進程通信原理與實戰——圖文
※使用 shell 構建多進程的 CommandlineFu 爬蟲
※Linux進程基礎
※Linux進程管理
※Linux查殺stopped進程
※人性化的子進程:Delegator.py
※一篇文章學會使用 Android IPC 多進程
※python logging 日誌模塊以及多進程日誌