PYTHON CONCURRENCY
CÁC CHỦ ĐỀ
BÀI MỚI NHẤT
MỚI CẬP NHẬT

Thông báo: Download 4 khóa học Python từ cơ bản đến nâng cao tại đây.

Cách sử dụng Thread-safe Queue trong Python

Trong bài viết này, bạn sẽ học cách sử dụng hàng đợi (queue) an toàn với luồng (thread-safe queue) trong Python. Việc trao đổi dữ liệu giữa các luồng trong lập trình đa luồng (multithreading) đòi hỏi sự đồng bộ hóa để tránh xung đột và bảo đảm tính toàn vẹn của dữ liệu. Sử dụng hàng đợi an toàn với luồng giúp bạn giải quyết vấn đề này một cách hiệu quả, cho phép các luồng trao đổi dữ liệu một cách an toàn và dễ dàng.

test php

banquyen png
Bài viết này được đăng tại freetuts.net, không được copy dưới mọi hình thức.

Giới thiệu về hàng đợi an toàn với luồng trong Python

Mô-đun queue tích hợp sẵn trong Python cho phép bạn trao đổi dữ liệu một cách an toàn giữa nhiều luồng. Lớp Queue trong mô-đun queue đã triển khai đầy đủ các cơ chế khóa cần thiết.

Tạo một hàng đợi mới

Để tạo một hàng đợi mới, bạn cần import lớp Queue từ mô-đun queue:

from queue import Queue

và sử dụng hàm dựng Queue như sau:

Bài viết này được đăng tại [free tuts .net]

queue = Queue()

Để tạo một hàng đợi có giới hạn kích thước, bạn có thể sử dụng tham số maxsize. Ví dụ, đoạn mã sau tạo một hàng đợi có thể chứa tối đa 10 mục:

queue = Queue(maxsize=10)

Thêm một mục vào hàng đợi

Để thêm một mục vào hàng đợi, bạn sử dụng phương thức put() như sau:

queue.put(item)

Khi hàng đợi đã đầy, bạn sẽ không thể thêm mục vào. Lúc này, lệnh gọi put() sẽ bị chặn cho đến khi có chỗ trống trong hàng đợi.

Nếu bạn không muốn phương thức put() bị chặn khi hàng đợi đầy, bạn có thể đặt tham số blockFalse:

queue.put(item, block=False)

Trong trường hợp này, phương thức put() sẽ ném ra ngoại lệ queue.Full nếu hàng đợi đầy:

try:
   queue.put(item, block=False)
except queue.Full as e:
   # xử lý ngoại lệ

Để thêm một mục vào hàng đợi có giới hạn kích thước và chờ một khoảng thời gian, bạn có thể sử dụng tham số timeout như sau:

try:
   queue.put(item, timeout=3)
except queue.Full as e:
   # xử lý ngoại lệ

Lấy một mục từ hàng đợi

Để lấy một mục từ hàng đợi, bạn có thể sử dụng phương thức get():

item = queue.get()

Phương thức get() sẽ bị chặn cho đến khi có mục để lấy ra từ hàng đợi.

Để lấy một mục từ hàng đợi mà không bị chặn, bạn có thể đặt tham số blockFalse:

try:
   item = queue.get(block=False)
except queue.Empty:
   # xử lý ngoại lệ

Để lấy một mục từ hàng đợi và chờ một khoảng thời gian, bạn có thể sử dụng phương thức get() với tham số timeout:

try:
   item = queue.get(timeout=10)
except queue.Empty:
   # xử lý ngoại lệ

Lấy kích thước của hàng đợi

Phương thức qsize() trả về số lượng mục trong hàng đợi:

size = queue.qsize()

Ngoài ra, phương thức empty() trả về True nếu hàng đợi rỗng hoặc False ngược lại. Phương thức full() trả về True nếu hàng đợi đầy hoặc False ngược lại.

Đánh dấu một nhiệm vụ đã hoàn thành

Một mục mà bạn thêm vào hàng đợi đại diện cho một đơn vị công việc hoặc một nhiệm vụ.

Khi một luồng gọi phương thức get() để lấy mục từ hàng đợi, nó có thể cần xử lý mục đó trước khi nhiệm vụ được coi là hoàn thành.

Khi hoàn thành, luồng có thể gọi phương thức task_done() của hàng đợi để chỉ ra rằng nó đã xử lý xong nhiệm vụ:

item = queue.get()

# xử lý mục
# ...

# đánh dấu mục đã hoàn thành
queue.task_done()

Chờ tất cả nhiệm vụ trong hàng đợi được hoàn thành

Để chờ tất cả nhiệm vụ trong hàng đợi được hoàn thành, bạn có thể gọi phương thức join() trên đối tượng hàng đợi:

queue.join()

Ví dụ về hàng đợi an toàn với luồng trong Python

Ví dụ sau minh họa cách sử dụng hàng đợi an toàn với luồng để trao đổi dữ liệu giữa hai luồng:

import time
from queue import Empty, Queue
from threading import Thread

def producer(queue):
    for i in range(1, 6):
        print(f'Inserting item {i} into the queue')
        time.sleep(1)
        queue.put(i)

def consumer(queue):
    while True:
        try:
            item = queue.get()
        except Empty:
            continue
        else:
            print(f'Processing item {item}')
            time.sleep(2)
            queue.task_done()

def main():
    queue = Queue()

    # tạo một luồng producer và bắt đầu nó
    producer_thread = Thread(
        target=producer,
        args=(queue,)
    )
    producer_thread.start()

    # tạo một luồng consumer và bắt đầu nó
    consumer_thread = Thread(
        target=consumer,
        args=(queue,),
        daemon=True
    )
    consumer_thread.start()

    # chờ tất cả các nhiệm vụ được thêm vào hàng đợi
    producer_thread.join()

    # chờ tất cả nhiệm vụ trong hàng đợi được hoàn thành
    queue.join()

if __name__ == '__main__':
    main()

Giải thích cách hoạt động của chương trình

Đầu tiên, định nghĩa hàm producer() để thêm các số từ 1 đến 5 vào hàng đợi. Nó sẽ dừng lại một giây sau mỗi lần lặp:

def producer(queue):
    for i in range(1, 6):
        print(f'Inserting item {i} into the queue')
        time.sleep(1)
        queue.put(i)

Tiếp theo, định nghĩa hàm consumer() để lấy một mục từ hàng đợi và xử lý nó. Nó sẽ dừng lại hai giây sau khi xử lý mỗi mục trong hàng đợi:

def consumer(queue):
    while True:
        try:
            item = queue.get()
        except Empty:
            continue
        else:
            print(f'Processing item {item}')
            time.sleep(2)
            queue.task_done()

Hàm queue.task_done() chỉ ra rằng hàm đã xử lý xong mục trong hàng đợi.

Thứ ba, định nghĩa hàm main() để tạo hai luồng: một luồng thêm một số vào hàng đợi mỗi giây, trong khi luồng khác xử lý một mục từ hàng đợi mỗi hai giây:

def main():
    queue = Queue()

    # tạo một luồng producer và bắt đầu nó
    producer_thread = Thread(
        target=producer,
        args=(queue,)
    )
    producer_thread.start()

    # tạo một luồng consumer và bắt đầu nó
    consumer_thread = Thread(
        target=consumer,
        args=(queue,),
        daemon=True
    )
    consumer_thread.start()

    # chờ tất cả các nhiệm vụ được thêm vào hàng đợi
    producer_thread.join()

    # chờ tất cả nhiệm vụ trong hàng đợi được hoàn thành
    queue.join()

Kết quả

Inserting item 1 into the queue
Inserting item 2 into the queue
Processing item 1
Inserting item 3 into the queue
Processing item 2
Inserting item 4 into the queue
Inserting item 5 into the queue
Processing item 3
Processing item 4
Processing item 5

Trong hàm main():

  • Tạo một hàng đợi mới bằng cách gọi hàm dựng Queue().
  • Tạo một luồng producer_thread và bắt đầu nó ngay lập tức.
  • Tạo một luồng daemon consumer_thread và bắt đầu nó ngay lập tức.
  • Chờ tất cả các số được thêm vào hàng đợi bằng cách sử dụng phương thức join() của luồng.
  • Chờ tất cả các nhiệm vụ trong hàng đợi được hoàn thành bằng cách gọi phương thức join() của hàng đợi.

Luồng producer thêm một số vào hàng đợi mỗi giây, và luồng consumer xử lý một số từ hàng đợi mỗi hai giây, hiển thị các số trong hàng đợi mỗi giây.

Kết bài

Sử dụng hàng đợi an toàn với luồng trong Python giúp đảm bảo việc trao đổi dữ liệu giữa các luồng diễn ra một cách an toàn và hiệu quả. Bằng cách sử dụng các phương thức của lớp Queue như put(), get(), và task_done(), bạn có thể dễ dàng quản lý các tác vụ và đồng bộ hóa hoạt động của các luồng. Việc hiểu và áp dụng hàng đợi an toàn với luồng sẽ giúp bạn xây dựng các ứng dụng đa luồng mạnh mẽ và ổn định hơn. Hy vọng rằng bài hướng dẫn này đã cung cấp cho bạn những kiến thức cần thiết để làm việc với hàng đợi trong Python một cách hiệu quả.

Cùng chuyên mục:

Hướng dẫn xây dựng Command-Line Interface (CLI) bằng Quo trong Python

Hướng dẫn xây dựng Command-Line Interface (CLI) bằng Quo trong Python

Hướng dẫn toàn diện về module datetime trong Python

Hướng dẫn toàn diện về module datetime trong Python

Cách truy cập và thiết lập biến môi trường trong Python

Cách truy cập và thiết lập biến môi trường trong Python

Lớp dữ liệu (Data Classes) trong Python với decorator @dataclass

Lớp dữ liệu (Data Classes) trong Python với decorator @dataclass

Từ khóa yield trong Python

Từ khóa yield trong Python

Sự khác biệt giữa sort() và sorted() trong Python

Sự khác biệt giữa sort() và sorted() trong Python

Sử dụng Poetry để quản lý dependencies trong Python

Sử dụng Poetry để quản lý dependencies trong Python

Định dạng chuỗi Strings trong Python

Định dạng chuỗi Strings trong Python

Một tác vụ phổ biến khi làm việc với danh sách trong Python

Một tác vụ phổ biến khi làm việc với danh sách trong Python

Làm việc với các biến môi trường trong Python

Làm việc với các biến môi trường trong Python

Sự khác biệt giữa set() và frozenset() trong Python

Sự khác biệt giữa set() và frozenset() trong Python

Sự khác biệt giữa iterator và iterable trong Python

Sự khác biệt giữa iterator và iterable trong Python

Cách làm việc với file tarball/tar trong Python

Cách làm việc với file tarball/tar trong Python

Chuyển đổi kiểu dữ liệu trong Python

Chuyển đổi kiểu dữ liệu trong Python

Sự khác biệt giữa toán tử == và is trong Python

Sự khác biệt giữa toán tử == và is trong Python

Làm việc với file ZIP trong Python

Làm việc với file ZIP trong Python

Cách sử dụng ThreadPoolExecutor trong Python

Cách sử dụng ThreadPoolExecutor trong Python

Sự khác biệt giữa byte objects và string trong Python

Sự khác biệt giữa byte objects và string trong Python

Xử lý độ chính xác các hàm floor, ceil, round, trunc, format  trong Python

Xử lý độ chính xác các hàm floor, ceil, round, trunc, format trong Python

Cách lặp qua nhiều list với hàm zip() trong Python

Cách lặp qua nhiều list với hàm zip() trong Python

Top