1-多執行緒

本篇介紹如何在 Python 中使用 threading 模組,撰寫多執行緒的平行計算程式,利用多顆 CPU 核心加速運算。

現在電腦的 CPU 都有許多的核心,若想要讓程式可以運用多顆 CPU 核心,充分發揮硬體的運算能力,就必須考慮使用多執行緒(multithreading)或多行程(multiprocessing)等平行化的技術,以下介紹 Python 的多執行緒的程式設計方法與技巧,並提供詳細的範例程式碼。

由於 CPython 的 GIL(Global Interpreter Lock)限制,可能會造成大部分的 Python 程式無法以多執行緒發揮多核心 CPU 的效能,若遇到這樣的狀況,可以考慮改用多行程的方式來設計程式。

threading 模組

在 Python 中若要撰寫多執行緒(multithreading)的平行化程式,最基本的方式是使用 threading 這個模組來建立子執行緒。

threading 是 Python 標準函式庫裡面的模組,所以不用特別安裝即可使用,雖然功能不是很多,但是基本多執行緒程式設計常用的功能它都有,對於比較單純的平行化工作來說,還算滿實用了。

建立子執行緒

以下是使用 threading 模組建立子執行緒的範例:

import threading import time # 子執行緒的工作函數def job(): for i in range(5): print("Child thread:", i) time.sleep(1) # 建立一個子執行緒 t = threading.Thread(target = job) # 執行該子執行緒 t.start() # 主執行緒繼續執行自己的工作for i in range(3): print("Main thread:", i) time.sleep(1) # 等待 t 這個子執行緒結束 t.join() print("Done.")

在這個例子中,我們先定義一個要讓子執行緒執行的 job 函數,接著使用 threading.Thread 建立一個新的子執行緒,其 target 參數就指定為要讓子執行緒執行的函數(也就是 job)。

建立好新的執行緒之後,即可呼叫執行緒的 start 函數,讓它開始執行,在子執行緒執行的同時,我們還是可以在主程式中繼續處理其他的工作。

如果有些工作是要等待子執行緒執行完成後才能處理的話,可以使用執行緒的 join 函數,等待該執行緒執行結束,也就是說放在 join 之後的程式碼就會等到子執行緒執行完成後,才會接著執行。

執行後的輸出會類似這樣:

Child thread: 0 Main thread: 0 Main thread: 1 Child thread: 1 Main thread: 2 Child thread: 2 Child thread: 3 Child thread: 4 Done.

這裡的子執行緒會執行 5 秒,但是主程式中的迴圈只要 3 秒就結束了,所以主程式會在 join 的地方等待 2 秒鐘,等到子執行緒結束之後,才會輸出 Done. 這一行訊息。

多個子執行緒與參數

通常我們在撰寫平行化的程式時,都會使用多個子執行緒,並且傳入不同的參數,讓個子執行緒各自負責不同的工作,這時候就可以在建立子執行緒時,使用 args 參數指定要傳數的參數。以下是一個簡單的範例:

import threading import time # 子執行緒的工作函數def job(num): print("Thread", num) time.sleep(1) # 建立 5 個子執行緒 threads = [] for i in range(5): threads.append(threading.Thread(target = job, args = (i,))) threads[i].start() # 主執行緒繼續執行自己的工作# ...# 等待所有子執行緒結束for i in range(5): threads[i].join() print("Done.")

在這個例子中,我們讓子執行緒執行的 job 函數會接受一個 num 參數,依據這個參數來決定要處理什麼工作,然後在呼叫 threading.Thread 建立子執行緒時,將要傳入的參數放在 args 參數中,這樣就可以把資料傳進子執行緒的 job 函數中了。執行之後的結果如下:

Thread 0 Thread 1 Thread 2 Thread 3 Thread 4 Done.

物件導向

我們也可以使用 Python 物件導向的方式來改寫 threading 的多執行緒程式,以下是一個簡單的範例:

import threading import time # 子執行緒類別class MyThread(threading.Thread): def __init__(self, num): threading.Thread.__init__(self) self.num = num def run(self): print("Thread", self.num) time.sleep(1) # 建立 5 個子執行緒 threads = [] for i in range(5): threads.append(MyThread(i)) threads[i].start() # 主執行緒繼續執行自己的工作# ...# 等待所有子執行緒結束for i in range(5): threads[i].join() print("Done.")

這個範例大致上的觀念都跟前面差不多,比較需要注意的地方就是 threading.Thread 在開始執行時,會呼叫它自己的 run 方法函數,這個方法函數預設會呼叫前面我們以 target 參數所指定的函數,在這裡我們在繼承 threading.Thread 類別之後,就直接把 run 覆寫成要執行的函數即可。

這個範例的執行結果跟上一個例子相同:

Thread 0 Thread 1 Thread 2 Thread 3 Thread 4 Done.

佇列(Queue)

如果我們有許多的工作要分給多個 CPU 核心做運算,最簡單的方式就是使用佇列的方式,讓多個 CPU 可從佇列中取得尚未處理的工作來處理:

import time import threading import queue # Worker 類別,負責處理資料class Worker(threading.Thread): def __init__(self, queue, num): threading.Thread.__init__(self) self.queue = queue self.num = num def run(self): while self.queue.qsize() > 0: # 取得新的資料 msg = self.queue.get() # 處理資料 print("Worker %d: %s" % (self.num, msg)) time.sleep(1) # 建立佇列 my_queue = queue.Queue() # 將資料放入佇列for i in range(10): my_queue.put("Data %d" % i) # 建立兩個 Worker my_worker1 = Worker(my_queue, 1) my_worker2 = Worker(my_queue, 2) # 讓 Worker 開始處理資料 my_worker1.start() my_worker2.start() # 等待所有 Worker 結束 my_worker1.join() my_worker2.join() print("Done.")

這裡我們建立兩個 Worker,它們都會從佇列中取得尚未處理的資料,直到佇列清空為止。執行後的結果會像這樣:

Worker 1: Data 0 Worker 2: Data 1 Worker 1: Data 2 Worker 2: Data 3 Worker 2: Data 4 Worker 1: Data 5 Worker 1: Data 6 Worker 2: Data 7 Worker 2: Data 8 Worker 1: Data 9 Done.

鎖定(Lock)

在平行化的多執行緒程式中,每個執行緒都是同時在執行的,若遇到不可以讓多個執行緒同時進行的工作時(例如將資料寫入同一個檔案),就必須使用鎖定(lock)的方式,一次只讓一個執行緒處理這種工作。

在 Python 中,我們可以使用 threading 模組的 Lock 來處理多執行緒的鎖定問題,以下是一個簡單的使用範例:

import time import threading import queue class Worker(threading.Thread): def __init__(self, queue, num, lock): threading.Thread.__init__(self) self.queue = queue self.num = num self.lock = lock def run(self): while self.queue.qsize() > 0: msg = self.queue.get() # 取得 lock lock.acquire() print("Lock acquired by Worker %d" % self.num) # 不能讓多個執行緒同時進的工作 print("Worker %d: %s" % (self.num, msg)) time.sleep(1) # 釋放 lock print("Lock released by Worker %d" % self.num) self.lock.release() my_queue = queue.Queue() for i in range(5): my_queue.put("Data %d" % i) # 建立 lock lock = threading.Lock() my_worker1 = Worker(my_queue, 1, lock) my_worker2 = Worker(my_queue, 2, lock) my_worker1.start() my_worker2.start() my_worker1.join() my_worker2.join() print("Done.")

在這個範例中,我們讓兩個 Worker 都從佇列中取得待處理的工作,但是我們使用一個 Lock 限制一次只允許一個 Worker 來處理工作。

當一個執行緒呼叫了 Lockacquire 時,代表取得了這個 Lock 的使用權,接著它就可以往下執行裡面的工作,若此時又有另外一個執行緒想要呼叫 acquire 取得使用權的話,就必須等待上一個執行緒執行完,並呼叫 release 釋放這個 Lock 之後,才能夠取得這個 Lock 的使用權,接著執行裡面的工作。

在這種狀況下雖然兩個 Worker 是同時執行的,但是由於 Lock 的互斥作用,因此可以確保被 Lockacquirerelease 包起來的這段程式碼不會被兩個執行緒同時執行。

執行的結果如下:

Lock acquired by Worker 1 Worker 1: Data 0 Lock released by Worker 1 Lock acquired by Worker 2 Worker 2: Data 1 Lock released by Worker 2 Lock acquired by Worker 1 Worker 1: Data 2 Lock released by Worker 1 Lock acquired by Worker 2 Worker 2: Data 3 Lock released by Worker 2 Lock acquired by Worker 1 Worker 1: Data 4 Lock released by Worker 1 Done.

旗標(Semaphore)

有時候因為系統資源有限的因素(例如考量 CPU 或記憶體的限制),在處理某些特別耗資源的工作時,僅允許有限個執行緒同時進行,這個狀況跟上面介紹的鎖定(lock)有點類似,但是鎖定的方式是僅允許一個執行緒進行某項工作,而這裡我們是允許多個執行緒同時執行的,但要限制同時執行的執行緒數量上限。

旗標(semaphore)的作用跟鎖定(lock)類似,但是它多了一個計數器的功能,當一個執行緒呼叫了 acquire 時,旗標內部的計數器就會遞減 1,而當執行緒呼叫了 release 時,計數器就會遞增 1,當計數器遞減到 0 的時候,後面來的執行緒就要等待其他執行緒release 後才能繼續。

以下是一個簡單的範例:

import time import threading import queue class Worker(threading.Thread): def __init__(self, queue, num, semaphore): threading.Thread.__init__(self) self.queue = queue self.num = num self.semaphore = semaphore def run(self): while self.queue.qsize() > 0: msg = self.queue.get() # 取得旗標 semaphore.acquire() print("Semaphore acquired by Worker %d" % self.num) # 僅允許有限個執行緒同時進的工作 print("Worker %d: %s" % (self.num, msg)) time.sleep(1) # 釋放旗標 print("Semaphore released by Worker %d" % self.num) self.semaphore.release() my_queue = queue.Queue() for i in range(5): my_queue.put("Data %d" % i) # 建立旗標 semaphore = threading.Semaphore(2) my_worker1 = Worker(my_queue, 1, semaphore) my_worker2 = Worker(my_queue, 2, semaphore) my_worker3 = Worker(my_queue, 3, semaphore) my_worker1.start() my_worker2.start() my_worker3.start() my_worker1.join() my_worker2.join() my_worker3.join() print("Done.")

Semaphore acquired by Worker 1 Worker 1: Data 0 Semaphore acquired by Worker 2 Worker 2: Data 1 Semaphore released by Worker 1 Semaphore acquired by Worker 1 Worker 1: Data 3 Semaphore released by Worker 2 Semaphore acquired by Worker 2 Worker 2: Data 4 Semaphore released by Worker 1 Semaphore released by Worker 2 Semaphore acquired by Worker 3 Worker 3: Data 2 Semaphore released by Worker 3 Done.

重複鎖定(RLock)

RLock 是一個可重複取得使用權的鎖定功能,它跟普通的 Lock 類似,但是它可以允許同一個執行緒重複取得鎖定的使用權。

若以普通的 Lock 來說,如果同一個執行緒呼叫了兩次 acquire,則在呼叫第二次的時候,就會被擋住:

# 建立 Lock lock = threading.Lock() # 取得 Lock lock.acquire() # 重複取得 Lock 的時候,就被擋住! lock.acquire()

如果想要讓同一個執行緒可以重複取得鎖定,可以改用有重複鎖定的 RLock

# 建立 RLock rlock = threading.RLock() # 取得 rlock rlock.acquire() # 不能讓多個執行緒同時進的工作...# 重複取得 rlock rlock.acquire() # 不能讓多個執行緒同時進的工作...# 釋放 rlock self.rlock.release() # 不能讓多個執行緒同時進的工作...# 再次釋放 rlock self.rlock.release()

RLock 內部有一個計數器,當執行緒在每次呼叫 RLockacquire 的時候,計數器就會遞增 1,紀錄這個鎖定被取得了幾多少次,如果呼叫了 release 時,該計數器就會遞減 1,當計數器遞減至 0 得時候,才會真正釋放鎖定,讓其他的執行緒使用,而在 RLock 的計數器還處於大於 0 的狀態時,其它的執行緒都無法取得這個鎖定的使用權。