在上一課中我們說過,由於 GIL 的存在,CPython 中的多執行緒並不能發揮 CPU 的多核優勢,如果希望突破 GIL 的限制,可以考慮使用多程序。對於多程序的程式,每個程序都有一個屬於自己的 GIL,所以多程序不會受到 GIL 的影響。那麼,我們應該如何在 Python 程式中建立和使用多程序呢?
###建立程序
在 Python 中可以基於Process
類來建立程序,雖然程序和執行緒有著本質的差別,但是Process
類和Thread
類的用法卻非常類似。在使用Process
類的構造器建立物件時,也是透過target
引數傳入一個函式來指定程序要執行的程式碼,而args
和kwargs
引數可以指定該函式使用的引數值。
from multiprocessing import Process, current_process
from time import sleep
def sub_task(content, nums):
# 透過current_process函式獲取當前程序物件
# 透過程序物件的pid和name屬性獲取程序的ID號和名字
print(f'PID: {current_process().pid}')
print(f'Name: {current_process().name}')
# 透過下面的輸出不難發現,每個程序都有自己的nums列表,程序之間本就不共享記憶體
# 在建立子程序時複製了父程序的資料結構,三個程序從列表中pop(0)得到的值都是20
counter, total = 0, nums.pop(0)
print(f'Loop count: {total}')
sleep(0.5)
while counter < total:
counter += 1
print(f'{counter}: {content}')
sleep(0.01)
def main():
nums = [20, 30, 40]
# 建立並啟動程序來執行指定的函式
Process(target=sub_task, args=('Ping', nums)).start()
Process(target=sub_task, args=('Pong', nums)).start()
# 在主程序中執行sub_task函式
sub_task('Good', nums)
if __name__ == '__main__':
main()
說明:上面的程式碼透過
current_process
函式獲取當前程序物件,再透過程序物件的pid
屬性獲取程序ID。在 Python 中,使用os
模組的getpid
函式也可以達到同樣的效果。
如果願意,也可以使用os
模組的fork
函式來建立程序,呼叫該函式時,作業系統自動把當前程序(父程序)複製一份(子程序),父程序的fork
函式會返回子程序的ID,而子程序中的fork
函式會返回0
,也就是說這個函式呼叫一次會在父程序和子程序中得到兩個不同的返回值。需要注意的是,Windows 系統並不支援fork
函式,如果你使用的是 Linux 或 macOS 系統,可以試試下面的程式碼。
import os
print(f'PID: {os.getpid()}')
pid = os.fork()
if pid == 0:
print(f'子程序 - PID: {os.getpid()}')
print('Todo: 在子程序中執行的程式碼')
else:
print(f'父程序 - PID: {os.getpid()}')
print('Todo: 在父程序中執行的程式碼')
簡而言之,我們還是推薦大家透過直接使用Process
類、繼承Process
類和使用程序池(ProcessPoolExecutor
)這三種方式來建立和使用多程序,這三種方式不同於上面的fork
函式,能夠保證程式碼的相容性和可移植性。具體的做法跟之前講過的建立和使用多執行緒的方式比較接近,此處不再進行贅述。
對於爬蟲這類 I/O 密集型任務來說,使用多程序並沒有什麼優勢;但是對於計算密集型任務來說,多程序相比多執行緒,在效率上會有顯著的提升,我們可以透過下面的程式碼來加以證明。下面的程式碼會透過多執行緒和多程序兩種方式來判斷一組大整數是不是質數,很顯然這是一個計算密集型任務,我們將任務分別放到多個執行緒和多個程序中來加速程式碼的執行,讓我們看看多執行緒和多程序的程式碼具體表現有何不同。
我們先實現一個多執行緒的版本,程式碼如下所示。
import concurrent.futures
PRIMES = [
1116281,
1297337,
104395303,
472882027,
533000389,
817504243,
982451653,
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419
] * 5
def is_prime(n):
"""判斷素數"""
for i in range(2, int(n ** 0.5) + 1):
if n % i == 0:
return False
return n != 1
def main():
"""主函式"""
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
假設上面的程式碼儲存在名為example.py
的檔案中,在 Linux 或 macOS 系統上,可以使用time python example.py
命令執行程式並獲得作業系統關於執行時間的統計,在我的 macOS 上,某次的執行結果的最後一行輸出如下所示。
python example09.py 38.69s user 1.01s system 101% cpu 39.213 total
從執行結果可以看出,多執行緒的程式碼只能讓 CPU 利用率達到100%,這其實已經證明了多執行緒的程式碼無法利用 CPU 多核特性來加速程式碼的執行,我們再看看多程序的版本,我們將上面程式碼中的執行緒池(ThreadPoolExecutor
)更換為程序池(ProcessPoolExecutor
)。
多程序的版本。
import concurrent.futures
PRIMES = [
1116281,
1297337,
104395303,
472882027,
533000389,
817504243,
982451653,
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419
] * 5
def is_prime(n):
"""判斷素數"""
for i in range(2, int(n ** 0.5) + 1):
if n % i == 0:
return False
return n != 1
def main():
"""主函式"""
with concurrent.futures.ProcessPoolExecutor(max_workers=16) as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
提示:執行上面的程式碼時,可以透過作業系統的任務管理器(資源監視器)來檢視是否啟動了多個 Python 直譯器程序。
我們仍然透過time python example.py
的方式來執行上述程式碼,執行結果的最後一行如下所示。
python example09.py 106.63s user 0.57s system 389% cpu 27.497 total
可以看出,多程序的版本在我使用的這臺電腦上,讓 CPU 的利用率達到了將近400%,而執行程式碼時使用者態耗費的 CPU 的時間(106.63秒)幾乎是程式碼執行總時間(27.497秒)的4倍,從這兩點都可以看出,我的電腦使用了一款4核的 CPU。當然,要知道自己的電腦有幾個 CPU 或幾個核,可以直接使用下面的程式碼。
import os
print(os.cpu_count())
綜上所述,多程序可以突破 GIL 的限制,充分利用 CPU 多核特性,對於計算密集型任務,這一點是相當重要的。常見的計算密集型任務包括科學計算、影象處理、音影片編解碼等,如果這些計算密集型任務本身是可以並行的,那麼使用多程序應該是更好的選擇。
在講解程序間通訊之前,先給大家一個任務:啟動兩個程序,一個輸出“Ping”,一個輸出“Pong”,兩個程序輸出的“Ping”和“Pong”加起來一共有50個時,就結束程式。聽起來是不是非常簡單,但是實際編寫程式碼時,由於多個程序之間不能夠像多個執行緒之間直接透過共享記憶體的方式交換資料,所以下面的程式碼是達不到我們想要的結果的。
from multiprocessing import Process
from time import sleep
counter = 0
def sub_task(string):
global counter
while counter < 50:
print(string, end='', flush=True)
counter += 1
sleep(0.01)
def main():
Process(target=sub_task, args=('Ping', )).start()
Process(target=sub_task, args=('Pong', )).start()
if __name__ == '__main__':
main()
上面的程式碼看起來沒毛病,但是最後的結果是“Ping”和“Pong”各輸出了50個。再次提醒大家,當我們在程式中建立程序的時候,子程序會複製父程序及其所有的資料結構,每個子程序有自己獨立的記憶體空間,這也就意味著兩個子程序中各有一個counter
變數,它們都會從0
加到50
,所以結果就可想而知了。要解決這個問題比較簡單的辦法是使用multiprocessing
模組中的Queue
類,它是可以被多個程序共享的佇列,底層是透過作業系統底層的管道和訊號量(semaphore)機制來實現的,程式碼如下所示。
import time
from multiprocessing import Process, Queue
def sub_task(content, queue):
counter = queue.get()
while counter < 50:
print(content, end='', flush=True)
counter += 1
queue.put(counter)
time.sleep(0.01)
counter = queue.get()
def main():
queue = Queue()
queue.put(0)
p1 = Process(target=sub_task, args=('Ping', queue))
p1.start()
p2 = Process(target=sub_task, args=('Pong', queue))
p2.start()
while p1.is_alive() and p2.is_alive():
pass
queue.put(50)
if __name__ == '__main__':
main()
提示:
multiprocessing.Queue
物件的get
方法預設在佇列為空時是會阻塞的,直到獲取到資料才會返回。如果不希望該方法阻塞以及需要指定阻塞的超時時間,可以透過指定block
和timeout
引數進行設定。
上面的程式碼透過Queue
類的get
和put
方法讓三個程序(p1
、p2
和主程序)實現了資料的共享,這就是所謂的程序間的通訊,透過這種方式,當Queue
中取出的值已經大於等於50
時,p1
和p2
就會跳出while
迴圈,從而終止程序的執行。程式碼第22行的迴圈是為了等待p1
和p2
兩個程序中的一個結束,這時候主程序還需要向Queue
中放置一個大於等於50
的值,這樣另一個尚未結束的程序也會因為讀到這個大於等於50
的值而終止。
程序間通訊的方式還有很多,比如使用套接字也可以實現兩個程序的通訊,甚至於這兩個程序並不在同一臺主機上,有興趣的讀者可以自行了解。
在 Python 中,我們還可以透過subprocess
模組的call
函式執行其他的命令來建立子程序,相當於就是在我們的程式中呼叫其他程式,這裡我們暫不探討這些知識,有興趣的讀者可以自行研究。
對於Python開發者來說,以下情況需要考慮使用多執行緒:
- 程式需要維護許多共享的狀態(尤其是可變狀態),Python 中的列表、字典、集合都是執行緒安全的(多個執行緒同時操作同一個列表、字典或集合,不會引發錯誤和資料問題),所以使用執行緒而不是程序維護共享狀態的代價相對較小。
- 程式會花費大量時間在 I/O 操作上,沒有太多平行計算的需求且不需佔用太多的記憶體。
那麼在遇到下列情況時,應該考慮使用多程序:
- 程式執行計算密集型任務(如:音影片編解碼、資料壓縮、科學計算等)。
- 程式的輸入可以並行的分成塊,並且可以將運算結果合併。
- 程式在記憶體使用方面沒有任何限制且不強依賴於 I/O 操作(如讀寫檔案、套接字等)。