當前位置:
首頁 > 知識 > Python 多進程教程

Python 多進程教程

(點擊

上方藍字

,快速關注我們)




編譯:伯樂在線 - tsteho


如有好文章投稿,請點擊 → 這裡了解詳情




Python2.6版本中新添了multiprocessing模塊。它最初由Jesse Noller和Richard Oudkerk定義在PEP 371中。就像你能通過threading模塊衍生線程一樣,multiprocessing 模塊允許你衍生進程。這裡用到的思想:因為你現在能衍生進程,所以你能夠避免使用全局解釋器鎖(GIL),並且充分利用機器的多個處理器。




多進程包也包含一些根本不在threading 模塊中的API。比如:有一個靈活的Pool類能讓你在多個輸入下並行化地執行函數。我們將在後面的小節講解Pool類。我們將以multiprocessing模塊的Process類開始講解。





開始學習multiprocessing模塊




Process這個類和threading模塊中的Thread類很像。讓我們創建一系列調用相同函數的進程,並且看看它是如何工作的。





import

os



from

multiprocessing

import

Process



def

doubler

(

number

)

:


"""


A doubling function that can be used by a process


"""


result

=

number

*

2


proc

=

os

.

getpid

()


print

(

"{0} doubled to {1} by process id: {2}"

.

format

(


number

,

result

,

proc

))



if

__name__

==

"__main__"

:


numbers

=

[

5

,

10

,

15

,

20

,

25

]


procs

=

[]



for

index

,

number

in

enumerate

(

numbers

)

:


proc

=

Process

(

target

=

doubler

,

args

=

(

number

,))


procs

.

append

(

proc

)


proc

.

start

()



for

proc

in

procs

:


proc

.

join

()




對於上面的例子,我們導入Process類、創建一個叫doubler的函數。在函數中,我們將傳入的數字乘上2。我們也用Python的os模塊來獲取當前進程的ID(pid)。這個ID將告訴我們哪個進程正在調用doubler函數。然後,在下面的代碼塊中,我們實例化了一系列的Process類並且啟動它們。最後一個循環只是調用每個進程的join()方法,該方法告訴Python等待進程直到它結束。如果你需要結束一個進程,你可以調用它的terminate()方法。




當你運行上面的代碼,你應該看到和下面類似的輸出結果:





5

doubled

to

10

by process id

:

10468


10

doubled

to

20

by process id

:

10469


15

doubled

to

30

by process id

:

10470


20

doubled

to

40

by process id

:

10471


25

doubled

to

50

by process id

:

10472




有時候,你最好給你的進程取一個易於理解的名字 。幸運的是,Process類確實允許你訪問同樣的進程。讓我們來看看如下例子:





import

os



from

multiprocessing

import

Process

,

current_process




def

doubler

(

number

)

:


"""


A doubling function that can be used by a process


"""


result

=

number

*

2


proc_name

=

current_process

().

name


print

(

"{0} doubled to {1} by: {2}"

.

format

(


number

,

result

,

proc_name

))




if

__name__

==

"__main__"

:


numbers

=

[

5

,

10

,

15

,

20

,

25

]


procs

=

[]


proc

=

Process

(

target

=

doubler

,

args

=

(

5

,))



for

index

,

number

in

enumerate

(

numbers

)

:


proc

=

Process

(

target

=

doubler

,

args

=

(

number

,))


procs

.

append

(

proc

)


proc

.

start

()



proc

=

Process

(

target

=

doubler

,

name

=

"Test"

,

args

=

(

2

,))


proc

.

start

()


procs

.

append

(

proc

)



for

proc

in

procs

:


proc

.

join

()




這一次,我們多導入了current_process。current_process基本上和threading模塊的current_thread是類似的東西。我們用它來獲取正在調用我們的函數的線程的名字。你將注意到我們沒有給前面的5個進程設置名字。然後我們將第6個進程的名字設置為「Test」。




讓我們看看我們將得到什麼樣的輸出結果:





5

doubled

to

10

by

:

Process

-

2


10

doubled

to

20

by

:

Process

-

3


15

doubled

to

30

by

:

Process

-

4


20

doubled

to

40

by

:

Process

-

5


25

doubled

to

50

by

:

Process

-

6


2

doubled

to

4

by

:

Test




輸出結果說明:默認情況下,multiprocessing模塊給每個進程分配了一個編號,而該編號被用來組成進程的名字的一部分。當然,如果我們給定了名字的話,並不會有編號被添加到名字中。







multiprocessing模塊支持鎖,它和threading模塊做的方式一樣。你需要做的只是導入Lock,獲取它,做一些事,釋放它。





from

multiprocessing

import

Process

,

Lock




def

printer

(

item

,

lock

)

:


"""


Prints out the item that was passed in


"""


lock

.

acquire

()


try

:


print

(

item

)


finally

:


lock

.

release

()



if

__name__

==

"__main__"

:


lock

=

Lock

()


items

=

[

"tango"

,

"foxtrot"

,

10

]


for

item

in

items

:


p

=

Process

(

target

=

printer

,

args

=

(

item

,

lock

))


p

.

start

()




我們在這裡創建了一個簡單的用於列印函數,你輸入什麼,它就輸出什麼。為了避免線程之間互相阻塞,我們使用Lock對象。代碼循環列表中的三個項並為它們各自都創建一個進程。每一個進程都將調用我們的函數,並且每次遍歷到的那一項作為參數傳入函數。因為我們現在使用了鎖,所以隊列中下一個進程將一直阻塞,直到之前的進程釋放鎖。




日誌




為進程創建日誌與為線程創建日誌有一些不同。它們存在不同是因為Python的logging包不使用共享鎖的進程,因此有可能以來自不同進程的信息作為結束的標誌。讓我們試著給前面的例子添加基本的日誌。代碼如下:





import

logging


import

multiprocessing



from

multiprocessing

import

Process

,

Lock



def

printer

(

item

,

lock

)

:


"""


Prints out the item that was passed in


"""


lock

.

acquire

()


try

:


print

(

item

)


finally

:


lock

.

release

()



if

__name__

==

"__main__"

:


lock

=

Lock

()


items

=

[

"tango"

,

"foxtrot"

,

10

]


multiprocessing

.

log_to_stderr

()


logger

=

multiprocessing

.

get_logger

()


logger

.

setLevel

(

logging

.

INFO

)


for

item

in

items

:


p

=

Process

(

target

=

printer

,

args

=

(

item

,

lock

))


p

.

start

()




最簡單的添加日誌的方法通過推送它到stderr實現。我們能通過調用thelog_to_stderr() 函數來實現該方法。然後我們調用get_logger 函數獲得一個logger實例,並將它的日誌等級設為INFO。之後的代碼是相同的。需要提示下這裡我並沒有調用join()方法。取而代之的:當它退出,父線程將自動調用join()方法。




當你這麼做了,你應該得到類似下面的輸出:





[

INFO

/

Process

-

1

]

child process calling

self

.

run

()


tango


[

INFO

/

Process

-

1

]

process shutting

down


[

INFO

/

Process

-

1

]

process exiting

with

exitcode

0


[

INFO

/

Process

-

2

]

child process calling

self

.

run

()


[

INFO

/

MainProcess

]

process shutting down


foxtrot


[

INFO

/

Process

-

2

]

process shutting

down


[

INFO

/

Process

-

3

]

child process calling

self

.

run

()


[

INFO

/

Process

-

2

]

process exiting

with

exitcode

0


10


[

INFO

/

MainProcess

]

calling join

()

for

process

Process

-

3


[

INFO

/

Process

-

3

]

process shutting

down


[

INFO

/

Process

-

3

]

process exiting

with

exitcode

0


[

INFO

/

MainProcess

]

calling join

()

for

process

Process

-

2




現在如果你想要保存日誌到硬碟中,那麼這件事就顯得有些棘手。你能在Python的logging Cookbook閱讀一些有關那類話題。




Pool類




Pool類被用來代表一個工作進程池。它有讓你將任務轉移到工作進程的方法。讓我們看下面一個非常簡單的例子。





from

multiprocessing

import

Pool



def

doubler

(

number

)

:


return

number

*

2



if

__name__

==

"__main__"

:


numbers

=

[

5

,

10

,

20

]


pool

=

Pool

(

processes

=

3

)


print

(

pool

.

map

(

doubler

,

numbers

))




基本上執行上述代碼之後,一個Pool的實例被創建,並且該實例創建了3個工作進程。然後我們使用map 方法將一個函數和一個可迭代對象映射到每個進程。最後我們列印出這個例子的結果:[10, 20, 40]。




你也能通過apply_async方法獲得池中進程的運行結果:





from

multiprocessing

import

Pool



def

doubler

(

number

)

:


return

number

*

2



if

__name__

==

"__main__"

:


pool

=

Pool

(

processes

=

3

)


result

=

pool

.

apply_async

(

doubler

,

(

25

,))


print

(

result

.

get

(

timeout

=

1

))




我們上面做的事實際上就是請求進程的運行結果。那就是get函數的用途。它嘗試去獲取我們的結果。你能夠注意到我們設置了timeout,這是為了預防我們調用的函數發生異常的情況。畢竟我們不想要它被無限期地阻塞。




進程通信




當遇到進程間通信的情況,multiprocessing 模塊提供了兩個主要的方法:Queues 和 Pipes。Queue 實現上既是線程安全的也是進程安全的。讓我們看一個相當簡單的並且基於 Queue的例子。代碼來自於我的文章(threading articles)。





from

multiprocessing

import

Process

,

Queue



sentinel

= -

1



def

creator

(

data

,

q

)

:


"""


Creates data to be consumed and waits for the consumer


to finish processing


"""


print

(

"Creating data and putting it on the queue"

)


for

item

in

data

:



q

.

put

(

item

)




def

my_consumer

(

q

)

:


"""


Consumes some data and works on it



In this case, all it does is double the input


"""


while

True

:


data

=

q

.

get

()


print

(

"data found to be processed: {}"

.

format

(

data

))


processed

=

data

*

2


print

(

processed

)



if

data

is

sentinel

:


break




if

__name__

==

"__main__"

:


q

=

Queue

()


data

=

[

5

,

10

,

13

,

-

1

]


process_one

=

Process

(

target

=

creator

,

args

=

(

data

,

q

))


process_two

=

Process

(

target

=

my_consumer

,

args

=

(

q

,))


process_one

.

start

()


process_two

.

start

()



q

.

close

()


q

.

join_thread

()



process_one

.

join

()


process_two

.

join

()




在這裡我們只需要導入Queue和Process。Queue用來創建數據和添加數據到隊列中,Process用來消耗數據並執行它。通過使用Queue的put()和get()方法,我們就能添加數據到Queue、從Queue獲取數據。代碼的最後一塊只是創建了Queue 對象以及兩個Process對象,並且運行它們。你能注意到我們在進程對象上調用join()方法,而不是在Queue本身上調用。




總結




我們這裡有大量的資料。你已經學習如何使用multiprocessing模塊指定不變的函數、使用Queues在進程間通信、給進程命名等很多事。在Python文檔中也有很多本文沒有接觸到的知識點,因此也務必深入了解下文檔。與此同時,你現在知道如何用Python利用你電腦所有的處理能力了!




相關閱讀






  • 有關multiprocessing模塊的Python文檔(multiprocessing module)



  • Python模塊周刊:multiprocessing



  • Python的並發–Porting a Queue to multiprocessing




看完本文有收穫?請轉

發分享給更多人


關注「P

ython開發者」,提升Python技能


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 Python 的精彩文章:

Python 模擬登陸百度雲盤實戰教程
Houdini中Expressions,HScript,Python,VEX區別
如何用 Python 檢測偽造的視頻
為什麼數據科學家偏愛Python
超詳細的Python實現新浪微博模擬登陸

TAG:Python |

您可能感興趣

Python多進程編程
深入Python多進程編程基礎
入門Python多線程/多進程編程
Python學習之進程和線程
Python網路編程——進程
Python學習之多進程詳解
Android 進程和線程
Python的分散式進程
python 進程實現多任務
用 Python 管理系統進程
python threading中處理主進程和子線程的關係
Python 並發編程之線程池/進程池
深入Python多進程通信原理與實戰——圖文
使用 shell 構建多進程的 CommandlineFu 爬蟲
Linux進程基礎
Linux進程管理
Linux查殺stopped進程
人性化的子進程:Delegator.py
一篇文章學會使用 Android IPC 多進程
python logging 日誌模塊以及多進程日誌