Thông báo: Download 4 khóa học Python từ cơ bản đến nâng cao tại đây.
Cách sử dụng Thread-safe Queue trong Python
Trong bài viết này, bạn sẽ học cách sử dụng hàng đợi (queue) an toàn với luồng (thread-safe queue) trong Python. Việc trao đổi dữ liệu giữa các luồng trong lập trình đa luồng (multithreading) đòi hỏi sự đồng bộ hóa để tránh xung đột và bảo đảm tính toàn vẹn của dữ liệu. Sử dụng hàng đợi an toàn với luồng giúp bạn giải quyết vấn đề này một cách hiệu quả, cho phép các luồng trao đổi dữ liệu một cách an toàn và dễ dàng.
Giới thiệu về hàng đợi an toàn với luồng trong Python
Mô-đun queue
tích hợp sẵn trong Python cho phép bạn trao đổi dữ liệu một cách an toàn giữa nhiều luồng. Lớp Queue
trong mô-đun queue
đã triển khai đầy đủ các cơ chế khóa cần thiết.
Tạo một hàng đợi mới
Để tạo một hàng đợi mới, bạn cần import lớp Queue
từ mô-đun queue
:
from queue import Queue
và sử dụng hàm dựng Queue
như sau:
Bài viết này được đăng tại [free tuts .net]
queue = Queue()
Để tạo một hàng đợi có giới hạn kích thước, bạn có thể sử dụng tham số maxsize
. Ví dụ, đoạn mã sau tạo một hàng đợi có thể chứa tối đa 10 mục:
queue = Queue(maxsize=10)
Thêm một mục vào hàng đợi
Để thêm một mục vào hàng đợi, bạn sử dụng phương thức put()
như sau:
queue.put(item)
Khi hàng đợi đã đầy, bạn sẽ không thể thêm mục vào. Lúc này, lệnh gọi put()
sẽ bị chặn cho đến khi có chỗ trống trong hàng đợi.
Nếu bạn không muốn phương thức put()
bị chặn khi hàng đợi đầy, bạn có thể đặt tham số block
là False
:
queue.put(item, block=False)
Trong trường hợp này, phương thức put()
sẽ ném ra ngoại lệ queue.Full
nếu hàng đợi đầy:
try: queue.put(item, block=False) except queue.Full as e: # xử lý ngoại lệ
Để thêm một mục vào hàng đợi có giới hạn kích thước và chờ một khoảng thời gian, bạn có thể sử dụng tham số timeout
như sau:
try: queue.put(item, timeout=3) except queue.Full as e: # xử lý ngoại lệ
Lấy một mục từ hàng đợi
Để lấy một mục từ hàng đợi, bạn có thể sử dụng phương thức get()
:
item = queue.get()
Phương thức get()
sẽ bị chặn cho đến khi có mục để lấy ra từ hàng đợi.
Để lấy một mục từ hàng đợi mà không bị chặn, bạn có thể đặt tham số block
là False
:
try: item = queue.get(block=False) except queue.Empty: # xử lý ngoại lệ
Để lấy một mục từ hàng đợi và chờ một khoảng thời gian, bạn có thể sử dụng phương thức get()
với tham số timeout
:
try: item = queue.get(timeout=10) except queue.Empty: # xử lý ngoại lệ
Lấy kích thước của hàng đợi
Phương thức qsize()
trả về số lượng mục trong hàng đợi:
size = queue.qsize()
Ngoài ra, phương thức empty()
trả về True
nếu hàng đợi rỗng hoặc False
ngược lại. Phương thức full()
trả về True
nếu hàng đợi đầy hoặc False
ngược lại.
Đánh dấu một nhiệm vụ đã hoàn thành
Một mục mà bạn thêm vào hàng đợi đại diện cho một đơn vị công việc hoặc một nhiệm vụ.
Khi một luồng gọi phương thức get()
để lấy mục từ hàng đợi, nó có thể cần xử lý mục đó trước khi nhiệm vụ được coi là hoàn thành.
Khi hoàn thành, luồng có thể gọi phương thức task_done()
của hàng đợi để chỉ ra rằng nó đã xử lý xong nhiệm vụ:
item = queue.get() # xử lý mục # ... # đánh dấu mục đã hoàn thành queue.task_done()
Chờ tất cả nhiệm vụ trong hàng đợi được hoàn thành
Để chờ tất cả nhiệm vụ trong hàng đợi được hoàn thành, bạn có thể gọi phương thức join()
trên đối tượng hàng đợi:
queue.join()
Ví dụ về hàng đợi an toàn với luồng trong Python
Ví dụ sau minh họa cách sử dụng hàng đợi an toàn với luồng để trao đổi dữ liệu giữa hai luồng:
import time from queue import Empty, Queue from threading import Thread def producer(queue): for i in range(1, 6): print(f'Inserting item {i} into the queue') time.sleep(1) queue.put(i) def consumer(queue): while True: try: item = queue.get() except Empty: continue else: print(f'Processing item {item}') time.sleep(2) queue.task_done() def main(): queue = Queue() # tạo một luồng producer và bắt đầu nó producer_thread = Thread( target=producer, args=(queue,) ) producer_thread.start() # tạo một luồng consumer và bắt đầu nó consumer_thread = Thread( target=consumer, args=(queue,), daemon=True ) consumer_thread.start() # chờ tất cả các nhiệm vụ được thêm vào hàng đợi producer_thread.join() # chờ tất cả nhiệm vụ trong hàng đợi được hoàn thành queue.join() if __name__ == '__main__': main()
Giải thích cách hoạt động của chương trình
Đầu tiên, định nghĩa hàm producer()
để thêm các số từ 1 đến 5 vào hàng đợi. Nó sẽ dừng lại một giây sau mỗi lần lặp:
def producer(queue): for i in range(1, 6): print(f'Inserting item {i} into the queue') time.sleep(1) queue.put(i)
Tiếp theo, định nghĩa hàm consumer()
để lấy một mục từ hàng đợi và xử lý nó. Nó sẽ dừng lại hai giây sau khi xử lý mỗi mục trong hàng đợi:
def consumer(queue): while True: try: item = queue.get() except Empty: continue else: print(f'Processing item {item}') time.sleep(2) queue.task_done()
Hàm queue.task_done()
chỉ ra rằng hàm đã xử lý xong mục trong hàng đợi.
Thứ ba, định nghĩa hàm main()
để tạo hai luồng: một luồng thêm một số vào hàng đợi mỗi giây, trong khi luồng khác xử lý một mục từ hàng đợi mỗi hai giây:
def main(): queue = Queue() # tạo một luồng producer và bắt đầu nó producer_thread = Thread( target=producer, args=(queue,) ) producer_thread.start() # tạo một luồng consumer và bắt đầu nó consumer_thread = Thread( target=consumer, args=(queue,), daemon=True ) consumer_thread.start() # chờ tất cả các nhiệm vụ được thêm vào hàng đợi producer_thread.join() # chờ tất cả nhiệm vụ trong hàng đợi được hoàn thành queue.join()
Kết quả
Inserting item 1 into the queue Inserting item 2 into the queue Processing item 1 Inserting item 3 into the queue Processing item 2 Inserting item 4 into the queue Inserting item 5 into the queue Processing item 3 Processing item 4 Processing item 5
Trong hàm main()
:
- Tạo một hàng đợi mới bằng cách gọi hàm dựng
Queue()
. - Tạo một luồng
producer_thread
và bắt đầu nó ngay lập tức. - Tạo một luồng daemon
consumer_thread
và bắt đầu nó ngay lập tức. - Chờ tất cả các số được thêm vào hàng đợi bằng cách sử dụng phương thức
join()
của luồng. - Chờ tất cả các nhiệm vụ trong hàng đợi được hoàn thành bằng cách gọi phương thức
join()
của hàng đợi.
Luồng producer thêm một số vào hàng đợi mỗi giây, và luồng consumer xử lý một số từ hàng đợi mỗi hai giây, hiển thị các số trong hàng đợi mỗi giây.
Kết bài
Sử dụng hàng đợi an toàn với luồng trong Python giúp đảm bảo việc trao đổi dữ liệu giữa các luồng diễn ra một cách an toàn và hiệu quả. Bằng cách sử dụng các phương thức của lớp Queue như put()
, get()
, và task_done()
, bạn có thể dễ dàng quản lý các tác vụ và đồng bộ hóa hoạt động của các luồng. Việc hiểu và áp dụng hàng đợi an toàn với luồng sẽ giúp bạn xây dựng các ứng dụng đa luồng mạnh mẽ và ổn định hơn. Hy vọng rằng bài hướng dẫn này đã cung cấp cho bạn những kiến thức cần thiết để làm việc với hàng đợi trong Python một cách hiệu quả.