藉助 Queue 實現多線程間的協同
Pipeline
並行地執行多個任務的 Python 程序通常都需要一種協作機制,使得多個線程負責的各部分之間的工作能夠相互協同。
管線 ( pipeline )。pipeline 的工作方式類似於工廠里的流水線,分為串行排列的多道工序( phase )。每道工序都由特定的函數處理,函數之間可以並行地執行。
比如需要創建這樣一個系統,可以從相機接收持續的圖片流,再將收到的圖片更改尺寸,最後上傳到線上的圖片庫中。
這樣的系統就可以分為三道工序,分別用 download 、 resize 、 upload 三個函數去處理。此外還需要一個在各道工序間傳遞任務對象的媒介,這個可以通過線程安全的 producer-consumer 隊列去實現。
具體的示例代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import time
from threading import Thread
from collections import deque
from threading import lock
def upload(item):
pass
def download(item):
pass
def resize(item):
pass
class MyQueue:
def __init__(self) -> None:
self.items = deque()
self.lock = Lock()
def put(self, item):
with self.lock:
self.items.append(item)
def get(self):
with self.lock:
return self.items.popleft()
class Worker(Thread):
def __init__(self, func, in_queue, out_queue):
super().__init__()
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue
self.polled_count = 0
# self.work_done = 0
def run(self):
while True:
self.polled_count += 1
try:
item = self.in_queue.get()
except IndexError:
time.sleep(0.01)
else:
result = self.func(item)
self.out_queue.put(result)
# self.work_done += 1
download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()
threads = [
Worker(download, download_queue, resize_queue),
Worker(resize, resize_queue, upload_queue),
Worker(upload, upload_queue, done_queue),
]
for thread in threads:
thread.start()
for i in range(100):
download_queue.put(object())
while len(done_queue.items) < 100:
pass
processed = len(done_queue.items)
polled = sum(t.polled_count for t in threads)
print(f'Processed {processed} items after '
f'polling {polled} times')
# Processed 100 items after polling 308 times
上述實現雖然能夠處理完成輸入的所有任務,但仍存在很多問題。
首先是 polled_count 值遠大於任務的數量。即工作線程的 run 方法中定義的從隊列中取項目的動作執行了太多次。
各個工作函數的執行速度其實是不一致的,前置位的工作函數(比如 download )運行緩慢,會導致後一道工序(比如 resize )上的函數持續不斷地向其隊列請求新的任務,然而隊列為空導致不斷地觸發 IndexError 錯誤,最終導致 CPU 時間的浪費。
其次,確認所有任務是否全部完成,需要一個 while 循環不斷地檢查 done_queue 隊列中元素的數量。
再次,工作線程中的 run 方法會一直處於 while True 的循環當中,沒有一種明顯的方法可以向該工作線程發送任務完成可以退出的消息。
最後,當第一道工序執行很快而第二道工序執行很慢時,處於兩道工序之間的隊列中的元素數量會持續增長。如果有足夠多的任務和足夠長的時間,程序最終會耗盡內存並崩潰。
Queue
內置的 queue 模塊中的 Queue 類可以解決上述問題。
Queue 類中的 get 方法是阻塞的,即在有新的項目放置到隊列中以前, get 會一直處於等待狀態,直到獲取到某個項目。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time
from queue import Queue
from threading import Thread
my_queue = Queue()
def consumer():
print('Consumer waiting')
my_queue.get()
print('Consumer done')
thread = Thread(target=consumer)
thread.start()
time.sleep(1)
print('Producer putting')
my_queue.put(object())
print('Producer done')
thread.join()
# Consumer waiting
# Producer putting
# Producer done
# Consumer done
即便線程先於主程序運行,它也會先處於等待狀態,直到一個新的項目被放置到隊列中,能夠被 get
獲取到。
這可以解決前面的程序中 polled_count 值過大的問題。
Queue 類可以指定 buffer size,從而限制了兩道工序間 pending
的任務的最大數量。即隊列中的元素數量達到最大值後,向隊列中放入新元素的 put 方法會阻塞,等待隊列中某個元素被消耗從而為新元素騰出空間。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import time
from threading import Thread
from queue import Queue
my_queue = Queue(1)
def consumer():
time.sleep(1)
my_queue.get()
print('Consumer got 1')
my_queue.get()
print('Consumer got 2')
print('Consumer done')
thread = Thread(target=consumer)
thread.start()
my_queue.put(object())
print('Producer put 1')
my_queue.put(object())
print('Producer put 2')
print('Producer done')
thread.join()
# Producer put 1
# Consumer got 1
# Producer put 2
# Producer done
# Consumer got 2
# Consumer done
Consumer 線程中的 sleep 應該使得主程序有足夠的時間將兩個對象都放置到隊列中。但隊列的大小是 1,這就導致隊列中先放入的元素必須通過 get 方法取出之後,才能繼續使用 put 方法放置新的元素進去。
即 Producer 會等待 Consumer 線程把放置到隊列中的舊元素消耗掉,才能繼續向隊列中添加新的元素。
task_done
Queue 類可以使用其 task_done 方法來追蹤任務的進度,使得程序可以確保在某個特定的時間點,隊列中的所有任務都已經被處理完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from queue import Queue
from threading import Thread
import time
in_queue = Queue()
def consumer():
print('Consumer waiting')
work = in_queue.get()
print('Consumer working')
print('Consumer done')
in_queue.task_done()
thread = Thread(target=consumer)
thread.start()
print('Producer putting')
in_queue.put(object())
print('Producer waiting')
in_queue.join()
print('Producer done')
thread.join()
# Consumer waiting
# Producer putting
# Producer waiting
# Consumer working
# Consumer done
# Producer done
在代碼中調用 in_queue.join() 後,只有隊列 in_queue 中的所有元素都執行了一遍 task_done (即有幾個元素就需要幾條 task_done ), in_queue.join()
之後的代碼才會執行。否則就繼續等待,直到 Consumer 調用了足夠次數的 task_done 。
結合前面提到的特性,可以創建一個新的 Queue 類,它能夠告知工作線程什麼時候該停止執行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class ClosableQueue(Queue):
SENTINEL = object()
def close(self):
self.put(self.SENTINEL)
def __iter__(self):
while True:
item = self.get()
try:
if item is self.SENTINEL:
return # Cause the thread to exit
yield item
finally:
self.task_done()
更新後的完整代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import time
from queue import Queue
from threading import Thread
from collections import deque
from threading import Lock
def upload(item):
pass
def download(item):
pass
def resize(item):
pass
class ClosableQueue(Queue):
SENTINEL = object()
def close(self):
self.put(self.SENTINEL)
def __iter__(self):
while True:
item = self.get()
try:
if item is self.SENTINEL:
return # Cause the thread to exit
yield item
finally:
self.task_done()
class StoppableWorker(Thread):
def __init__(self, func, in_queue, out_queue):
super().__init__()
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
for item in self.in_queue:
result = self.func(item)
self.out_queue.put(result)
download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()
threads = [
StoppableWorker(download, download_queue, resize_queue),
StoppableWorker(resize, resize_queue, upload_queue),
StoppableWorker(upload, upload_queue, done_queue),
]
for thread in threads:
thread.start()
for _ in range(1000):
download_queue.put(object())
download_queue.close()
download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print(done_queue.qsize(), 'items finished')
# 1000 items finished
邏輯上就是給 Queue 類加了一個 SENTINEL 對象,用來作為隊列結束的標誌。工作線程通過循環讀取輸入隊列中的任務,這些任務對象經過特定函數處理後放置到輸出隊列中。若讀取到的任務是 SENTINEL 對象,則線程結束運行。
task_done 方法和主程序中的 xxx_queue.join 用於確保某個隊列中的所有任務都已經處理完成,轉移到了下一個隊列中。後面再調用下一個隊列的 close 方法在該隊列尾部添加一個 SENTINEL 對象,作為隊列的結束標誌。
上述實現的好處在於,工作線程會在讀取到 SENTINEL 對象時自動結束運行;主程序中 upload_queue.join() 執行結束後就能保證三個階段的所有任務都被處理完了,而不再需要頻繁地去檢查 done_queue 中的元素數量。
最終實現
當需要對不同的階段( download 、 resize 、 upload )都分別綁定多個線程去處理時,只稍微修改下代碼就可以了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import time
from queue import Queue
from threading import Thread
from collections import deque
from threading import Lock
def upload(item):
pass
def download(item):
pass
def resize(item):
pass
class ClosableQueue(Queue):
SENTINEL = object()
def close(self):
self.put(self.SENTINEL)
def __iter__(self):
while True:
item = self.get()
try:
if item is self.SENTINEL:
return # Cause the thread to exit
yield item
finally:
self.task_done()
class StoppableWorker(Thread):
def __init__(self, func, in_queue, out_queue):
super().__init__()
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
for item in self.in_queue:
result = self.func(item)
self.out_queue.put(result)
def start_threads(count, *args):
threads = [StoppableWorker(*args) for _ in range(count)]
for thread in threads:
thread.start()
return threads
def stop_threads(closable_queue, threads):
for _ in threads:
closable_queue.close()
closable_queue.join()
for thread in threads:
thread.join()
download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()
download_threads = start_threads(
3, download, download_queue, resize_queue)
resize_threads = start_threads(
4, resize, resize_queue, upload_queue)
upload_threads = start_threads(
5, upload, upload_queue, done_queue)
for _ in range(1000):
download_queue.put(object())
stop_threads(download_queue, download_threads)
stop_threads(resize_queue, resize_threads)
stop_threads(upload_queue, upload_threads)
print(done_queue.qsize(), 'items finished')
# 1000 items finished
要點
Pipeline 可以很好地組織流水線類型的工作,尤其是 IO 相關的 Python 多線程程序
需要特別注意構建 pipeline 時的隱藏問題:怎樣告訴工作線程終止運行、busy waiting 以及潛在的內存爆炸等
Queue 類具備構建健壯的 pipeline 所需的特性,如阻塞式操作、buffer size 和 joining 等。
原文連結:https://www.starky.ltd/2021/10/08/effective-python-notes-concurrency-and-queue/