Skip to content

Latest commit

 

History

History
382 lines (269 loc) · 19.2 KB

63.Python中的并发编程-1.md

File metadata and controls

382 lines (269 loc) · 19.2 KB

Python中的併發程式設計-1

現如今,我們使用的計算機早已是多 CPU 或多核的計算機,而我們使用的作業系統基本都支援“多工”,這使得我們可以同時執行多個程式,也可以將一個程式分解為若干個相對獨立的子任務,讓多個子任務“並行”或“併發”的執行,從而縮短程式的執行時間,同時也讓使用者獲得更好的體驗。因此當下,不管用什麼程式語言進行開發,實現“並行”或“併發”程式設計已經成為了程式設計師的標配技能。為了講述如何在 Python 程式中實現“並行”或“併發”,我們需要先了解兩個重要的概念:程序和執行緒。

執行緒和程序

我們透過作業系統執行一個程式會創建出一個或多個程序,程序是具有一定獨立功能的程式關於某個資料集合上的一次執行活動。簡單的說,程序是作業系統分配儲存空間的基本單位,每個程序都有自己的地址空間、資料棧以及其他用於跟蹤程序執行的輔助資料;作業系統管理所有程序的執行,為它們合理的分配資源。一個程序可以透過 fork 或 spawn 的方式建立新的程序來執行其他的任務,不過新的程序也有自己獨立的記憶體空間,因此兩個程序如果要共享資料,必須透過程序間通訊機制來實現,具體的方式包括管道、訊號、套接字等。

一個程序還可以擁有多個執行線索,簡單的說就是擁有多個可以獲得 CPU 排程的執行單元,這就是所謂的執行緒。由於執行緒在同一個程序下,它們可以共享相同的上下文,因此相對於程序而言,執行緒間的資訊共享和通訊更加容易。當然在單核 CPU 系統中,多個執行緒不可能同時執行,因為在某個時刻只有一個執行緒能夠獲得 CPU,多個執行緒透過共享 CPU 執行時間的方式來達到併發的效果。

在程式中使用多執行緒技術通常都會帶來不言而喻的好處,最主要的體現在提升程式的效能和改善使用者體驗,今天我們使用的軟體幾乎都用到了多執行緒技術,這一點可以利用系統自帶的程序監控工具(如 macOS 中的“活動監視器”、Windows 中的“任務管理器”)來證實,如下圖所示。

這裡,我們還需要跟大家再次強調兩個概念:併發(concurrency)和並行(parallel)。併發通常是指同一時刻只能有一條指令執行,但是多個執行緒對應的指令被快速輪換地執行。比如一個處理器,它先執行執行緒 A 的指令一段時間,再執行執行緒 B 的指令一段時間,再切回到執行緒 A 執行一段時間。由於處理器執行指令的速度和切換的速度極快,人們完全感知不到計算機在這個過程中有多個執行緒切換上下文執行的操作,這就使得宏觀上看起來多個執行緒在同時執行,但微觀上其實只有一個執行緒在執行。並行是指同一時刻,有多條指令在多個處理器上同時執行,並行必須要依賴於多個處理器,不論是從宏觀上還是微觀上,多個執行緒可以在同一時刻一起執行的。很多時候,我們並不用嚴格區分併發和並行兩個詞,所以我們有時候也把 Python 中的多執行緒、多程序以及非同步 I/O 都視為實現併發程式設計的手段,但實際上前面兩者也可以實現並行程式設計,當然這裡還有一個全域性直譯器鎖(GIL)的問題,我們稍後討論。

多執行緒程式設計

Python 標準庫中threading模組的Thread類可以幫助我們非常輕鬆的實現多執行緒程式設計。我們用一個聯網下載檔案的例子來對比使用多執行緒和不使用多執行緒到底有什麼區別,程式碼如下所示。

不使用多執行緒的下載。

import random
import time


def download(*, filename):
    start = time.time()
    print(f'開始下載 {filename}.')
    time.sleep(random.randint(3, 6))
    print(f'{filename} 下載完成.')
    end = time.time()
    print(f'下載耗時: {end - start:.3f}秒.')


def main():
    start = time.time()
    download(filename='Python從入門到住院.pdf')
    download(filename='MySQL從刪庫到跑路.avi')
    download(filename='Linux從精通到放棄.mp4')
    end = time.time()
    print(f'總耗時: {end - start:.3f}秒.')


if __name__ == '__main__':
    main()

說明:上面的程式碼並沒有真正實現聯網下載的功能,而是透過time.sleep()休眠一段時間來模擬下載檔案需要一些時間上的開銷,跟實際下載的狀況比較類似。

執行上面的程式碼,可以得到如下所示的執行結果。可以看出,當我們的程式只有一個工作執行緒時,每個下載任務都需要等待上一個下載任務執行結束才能開始,所以程式執行的總耗時是三個下載任務各自執行時間的總和。

開始下載Python從入門到住院.pdf.
Python從入門到住院.pdf下載完成.
下載耗時: 3.005秒.
開始下載MySQL從刪庫到跑路.avi.
MySQL從刪庫到跑路.avi下載完成.
下載耗時: 5.006秒.
開始下載Linux從精通到放棄.mp4.
Linux從精通到放棄.mp3下載完成.
下載耗時: 6.007秒.
總耗時: 14.018秒.

事實上,上面的三個下載任務之間並沒有邏輯上的因果關係,三者是可以“併發”的,下一個下載任務沒有必要等待上一個下載任務結束,為此,我們可以使用多執行緒程式設計來改寫上面的程式碼。

import random
import time
from threading import Thread


def download(*, filename):
    start = time.time()
    print(f'開始下載 {filename}.')
    time.sleep(random.randint(3, 6))
    print(f'{filename} 下載完成.')
    end = time.time()
    print(f'下載耗時: {end - start:.3f}秒.')


def main():
    threads = [
        Thread(target=download, kwargs={'filename': 'Python從入門到住院.pdf'}),
        Thread(target=download, kwargs={'filename': 'MySQL從刪庫到跑路.avi'}),
        Thread(target=download, kwargs={'filename': 'Linux從精通到放棄.mp4'})
    ]
    start = time.time()
    # 啟動三個執行緒
    for thread in threads:
        thread.start()
    # 等待執行緒結束
    for thread in threads:
        thread.join()
    end = time.time()
    print(f'總耗時: {end - start:.3f}秒.')


if __name__ == '__main__':
    main()

某次的執行結果如下所示。

開始下載 Python從入門到住院.pdf.
開始下載 MySQL從刪庫到跑路.avi.
開始下載 Linux從精通到放棄.mp4.
MySQL從刪庫到跑路.avi 下載完成.
下載耗時: 3.005秒.
Python從入門到住院.pdf 下載完成.
下載耗時: 5.006秒.
Linux從精通到放棄.mp4 下載完成.
下載耗時: 6.003秒.
總耗時: 6.004秒.

透過上面的執行結果可以發現,整個程式的執行時間幾乎等於耗時最長的一個下載任務的執行時間,這也就意味著,三個下載任務是併發執行的,不存在一個等待另一個的情況,這樣做很顯然提高了程式的執行效率。簡單的說,如果程式中有非常耗時的執行單元,而這些耗時的執行單元之間又沒有邏輯上的因果關係,即 B 單元的執行不依賴於 A 單元的執行結果,那麼 A 和 B 兩個單元就可以放到兩個不同的執行緒中,讓他們併發的執行。這樣做的好處除了減少程式執行的等待時間,還可以帶來更好的使用者體驗,因為一個單元的阻塞不會造成程式的“假死”,因為程式中還有其他的單元是可以運轉的。

使用 Thread 類建立執行緒物件

透過上面的程式碼可以看出,直接使用Thread類的構造器就可以建立執行緒物件,而執行緒物件的start()方法可以啟動一個執行緒。執行緒啟動後會執行target引數指定的函式,當然前提是獲得 CPU 的排程;如果target指定的執行緒要執行的目標函式有引數,需要透過args引數為其進行指定,對於關鍵字引數,可以透過kwargs引數進行傳入。Thread類的構造器還有很多其他的引數,我們遇到的時候再為大家進行講解,目前需要大家掌握的,就是targetargskwargs

繼承 Thread 類自定義執行緒

除了上面的程式碼展示的建立執行緒的方式外,還可以透過繼承Thread類並重寫run()方法的方式來自定義執行緒,具體的程式碼如下所示。

import random
import time
from threading import Thread


class DownloadThread(Thread):

    def __init__(self, filename):
        self.filename = filename
        super().__init__()

    def run(self):
        start = time.time()
        print(f'開始下載 {self.filename}.')
        time.sleep(random.randint(3, 6))
        print(f'{self.filename} 下載完成.')
        end = time.time()
        print(f'下載耗時: {end - start:.3f}秒.')


def main():
    threads = [
        DownloadThread('Python從入門到住院.pdf'),
        DownloadThread('MySQL從刪庫到跑路.avi'),
        DownloadThread('Linux從精通到放棄.mp4')
    ]
    start = time.time()
    # 啟動三個執行緒
    for thread in threads:
        thread.start()
    # 等待執行緒結束
    for thread in threads:
        thread.join()
    end = time.time()
    print(f'總耗時: {end - start:.3f}秒.')


if __name__ == '__main__':
    main()

使用執行緒池

我們還可以透過執行緒池的方式將任務放到多個執行緒中去執行,透過執行緒池來使用執行緒應該是多執行緒程式設計最理想的選擇。事實上,執行緒的建立和釋放都會帶來較大的開銷,頻繁的建立和釋放執行緒通常都不是很好的選擇。利用執行緒池,可以提前準備好若干個執行緒,在使用的過程中不需要再透過自定義的程式碼建立和釋放執行緒,而是直接複用執行緒池中的執行緒。Python 內建的concurrent.futures模組提供了對執行緒池的支援,程式碼如下所示。

import random
import time
from concurrent.futures import ThreadPoolExecutor
from threading import Thread


def download(*, filename):
    start = time.time()
    print(f'開始下載 {filename}.')
    time.sleep(random.randint(3, 6))
    print(f'{filename} 下載完成.')
    end = time.time()
    print(f'下載耗時: {end - start:.3f}秒.')


def main():
    with ThreadPoolExecutor(max_workers=4) as pool:
        filenames = ['Python從入門到住院.pdf', 'MySQL從刪庫到跑路.avi', 'Linux從精通到放棄.mp4']
        start = time.time()
        for filename in filenames:
            pool.submit(download, filename=filename)
    end = time.time()
    print(f'總耗時: {end - start:.3f}秒.')


if __name__ == '__main__':
    main()

守護執行緒

所謂“守護執行緒”就是在主執行緒結束的時候,不值得再保留的執行執行緒。這裡的不值得保留指的是守護執行緒會在其他非守護執行緒全部執行結束之後被銷燬,它守護的是當前程序內所有的非守護執行緒。簡單的說,守護執行緒會跟隨主執行緒一起掛掉,而主執行緒的生命週期就是一個程序的生命週期。如果不理解,我們可以看一段簡單的程式碼。

import time
from threading import Thread


def display(content):
    while True:
        print(content, end='', flush=True)
        time.sleep(0.1)


def main():
    Thread(target=display, args=('Ping', )).start()
    Thread(target=display, args=('Pong', )).start()


if __name__ == '__main__':
    main()

說明:上面的程式碼中,我們將print函式的引數flush設定為True,這是因為flush引數的值如果為False,而print又沒有做換行處理,就會導致每次print輸出的內容被放到作業系統的輸出緩衝區,直到緩衝區被輸出的內容塞滿,才會清空緩衝區產生一次輸出。上述現象是作業系統為了減少 I/O 中斷,提升 CPU 利用率做出的設定,為了讓程式碼產生直觀互動,我們才將flush引數設定為True,強制每次輸出都清空輸出緩衝區。

上面的程式碼執行起來之後是不會停止的,因為兩個子執行緒中都有死迴圈,除非你手動中斷程式碼的執行。但是,如果在建立執行緒物件時,將名為daemon的引數設定為True,這兩個執行緒就會變成守護執行緒,那麼在其他執行緒結束時,即便有死迴圈,兩個守護執行緒也會掛掉,不會再繼續執行下去,程式碼如下所示。

import time
from threading import Thread


def display(content):
    while True:
        print(content, end='', flush=True)
        time.sleep(0.1)


def main():
    Thread(target=display, args=('Ping', ), daemon=True).start()
    Thread(target=display, args=('Pong', ), daemon=True).start()
    time.sleep(5)


if __name__ == '__main__':
    main()

上面的程式碼,我們在主執行緒中添加了一行time.sleep(5)讓主執行緒休眠5秒,在這個過程中,輸出PingPong的守護執行緒會持續運轉,直到主執行緒在5秒後結束,這兩個守護執行緒也被銷燬,不再繼續執行。

思考:如果將上面程式碼第12行的daemon=True去掉,程式碼會怎樣執行?有興趣的讀者可以嘗試一下,並看看實際執行的結果跟你想象的是否一致。

資源競爭

在編寫多執行緒程式碼時,不可避免的會遇到多個執行緒競爭同一個資源(物件)的情況。在這種情況下,如果沒有合理的機制來保護被競爭的資源,那麼就有可能出現非預期的狀況。下面的程式碼建立了100個執行緒向同一個銀行賬戶(初始餘額為0元)轉賬,每個執行緒轉賬金額為1元。在正常的情況下,我們的銀行賬戶最終的餘額應該是100元,但是執行下面的程式碼我們並不能得到100元這個結果。

import time

from concurrent.futures import ThreadPoolExecutor


class Account(object):
    """銀行賬戶"""

    def __init__(self):
        self.balance = 0.0

    def deposit(self, money):
        """存錢"""
        new_balance = self.balance + money
        time.sleep(0.01)
        self.balance = new_balance


def main():
    """主函式"""
    account = Account()
    with ThreadPoolExecutor(max_workers=16) as pool:
        for _ in range(100):
            pool.submit(account.deposit, 1)
    print(account.balance)


if __name__ == '__main__':
    main()

上面程式碼中的Account類代表了銀行賬戶,它的deposit方法代表存款行為,引數money代表存入的金額,該方法透過time.sleep函式模擬受理存款需要一段時間。我們透過執行緒池的方式啟動了100個執行緒向一個賬戶轉賬,但是上面的程式碼並不能執行出100這個我們期望的結果,這就是在多個執行緒競爭一個資源的時候,可能會遇到的資料不一致的問題。注意上面程式碼的第14行,當多個執行緒都執行到這行程式碼時,它們會在相同的餘額上執行加上存入金額的操作,這就會造成“丟失更新”現象,即之前修改資料的成果被後續的修改給覆蓋掉了,所以才得不到正確的結果。

要解決上面的問題,可以使用鎖機制,透過鎖對操作資料的關鍵程式碼加以保護。Python 標準庫的threading模組提供了LockRLock類來支援鎖機制,這裡我們不去深究二者的區別,建議大家直接使用RLock。接下來,我們給銀行賬戶新增一個鎖物件,透過鎖物件來解決剛才存款時發生“丟失更新”的問題,程式碼如下所示。

import time

from concurrent.futures import ThreadPoolExecutor
from threading import RLock


class Account(object):
    """銀行賬戶"""

    def __init__(self):
        self.balance = 0.0
        self.lock = RLock()

    def deposit(self, money):
        # 獲得鎖
        self.lock.acquire()
        try:
            new_balance = self.balance + money
            time.sleep(0.01)
            self.balance = new_balance
        finally:
            # 釋放鎖
            self.lock.release()


def main():
    """主函式"""
    account = Account()
    with ThreadPoolExecutor(max_workers=16) as pool:
        for _ in range(100):
            pool.submit(account.deposit, 1)
    print(account.balance)


if __name__ == '__main__':
    main()

上面程式碼中,獲得鎖和釋放鎖的操作也可以透過上下文語法來實現,使用上下文語法會讓程式碼更加簡單優雅,這也是我們推薦大家使用的方式。

import time

from concurrent.futures import ThreadPoolExecutor
from threading import RLock


class Account(object):
    """銀行賬戶"""

    def __init__(self):
        self.balance = 0.0
        self.lock = RLock()

    def deposit(self, money):
        # 透過上下文語法獲得鎖和釋放鎖
        with self.lock:
            new_balance = self.balance + money
            time.sleep(0.01)
            self.balance = new_balance


def main():
    """主函式"""
    account = Account()
    with ThreadPoolExecutor(max_workers=16) as pool:
        for _ in range(100):
            pool.submit(account.deposit, 1)
    print(account.balance)


if __name__ == '__main__':
    main()

思考:將上面的程式碼修改為5個執行緒向銀行賬戶存錢,5個執行緒從銀行賬戶取錢,取錢的執行緒在銀行賬戶餘額不足時,需要停下來等待存錢的執行緒將錢存入後再嘗試取錢。這裡需要用到執行緒排程的知識,大家可以自行研究下threading模組中的Condition類,看看是否能夠完成這個任務。

GIL問題

如果使用官方的 Python 直譯器(通常稱之為 CPython)執行 Python 程式,我們並不能透過使用多執行緒的方式將 CPU 的利用率提升到逼近400%(對於4核 CPU)或逼近800%(對於8核 CPU)這樣的水平,因為 CPython 在執行程式碼時,會受到 GIL(全域性直譯器鎖)的限制。具體的說,CPython 在執行任何程式碼時,都需要對應的執行緒先獲得 GIL,然後每執行100條(位元組碼)指令,CPython 就會讓獲得 GIL 的執行緒主動釋放 GIL,這樣別的執行緒才有機會執行。因為 GIL 的存在,無論你的 CPU 有多少個核,我們編寫的 Python 程式碼也沒有機會真正並行的執行。

GIL 是官方 Python 直譯器在設計上的歷史遺留問題,要解決這個問題,讓多執行緒能夠發揮 CPU 的多核優勢,需要重新實現一個不帶 GIL 的 Python 直譯器。這個問題按照官方的說法,在 Python 釋出4.0版本時會得到解決,就讓我們拭目以待吧。當下,對於 CPython 而言,如果希望充分發揮 CPU 的多核優勢,可以考慮使用多程序,因為每個程序都對應一個 Python 直譯器,因此每個程序都有自己獨立的 GIL,這樣就可以突破 GIL 的限制。在下一個章節中,我們會為大家介紹關於多程序的相關知識,並對多執行緒和多程序的程式碼及其執行效果進行比較。