PYTHON CONCURRENCY
CÁC CHỦ ĐỀ
BÀI MỚI NHẤT
MỚI CẬP NHẬT

Thông báo: Download 4 khóa học Python từ cơ bản đến nâng cao tại đây.

Sử dụng lớp ThreadPoolExecutor trong Python

Trong lập trình, việc xử lý đồng thời và song song có thể giúp tăng hiệu suất và hiệu quả của chương trình. Tuy nhiên, việc quản lý thủ công các luồng (threads) có thể trở nên phức tạp và tốn kém tài nguyên. Để giải quyết vấn đề này, Python cung cấp một công cụ mạnh mẽ gọi là ThreadPoolExecutor trong module concurrent.futures. ThreadPoolExecutor giúp bạn tạo và quản lý một nhóm các luồng (thread pool) một cách dễ dàng và hiệu quả. Trong bài viết này, mình sẽ tìm hiểu về cách sử dụng ThreadPoolExecutor để phát triển các chương trình đa luồng (multi-threaded) trong Python, cũng như những lợi ích mà nó mang lại trong việc tối ưu hóa hiệu suất chương trình.

test php

banquyen png
Bài viết này được đăng tại freetuts.net, không được copy dưới mọi hình thức.

Giới thiệu về lớp ThreadPoolExecutor trong Python

Trong hướng dẫn về đa luồng (multithreading), bạn đã học cách quản lý nhiều luồng trong một chương trình bằng cách sử dụng lớp Thread của module threading. Lớp Thread rất hữu ích khi bạn muốn tạo luồng một cách thủ công.

Tuy nhiên, việc quản lý luồng thủ công không hiệu quả vì việc tạo và hủy nhiều luồng thường xuyên sẽ tốn kém về chi phí tính toán.

Thay vì làm như vậy, bạn có thể muốn tái sử dụng các luồng nếu bạn dự định chạy nhiều tác vụ bất chợt trong chương trình. Một pool luồng cho phép bạn đạt được điều này.

Bài viết này được đăng tại [free tuts .net]

Pool luồng

Một pool luồng là một mẫu để đạt được sự đồng thời trong việc thực thi trong một chương trình. Một pool luồng cho phép bạn quản lý một cách tự động một nhóm các luồng một cách hiệu quả:

Mỗi luồng trong pool được gọi là một luồng công nhân (worker thread) hoặc công nhân (worker). Một pool luồng cho phép bạn tái sử dụng các worker thread sau khi các tác vụ hoàn thành. Nó cũng bảo vệ chống lại các lỗi bất ngờ như ngoại lệ.

Thông thường, một pool luồng cho phép bạn cấu hình số lượng worker thread và cung cấp một quy ước đặt tên cụ thể cho mỗi worker thread.

Để tạo một pool luồng, bạn sử dụng lớp ThreadPoolExecutor từ module concurrent.futures.

ThreadPoolExecutor

Lớp ThreadPoolExecutor mở rộng lớp Executor và trả về một đối tượng Future.

Executor

Lớp Executor có ba phương thức để kiểm soát pool luồng:

  • submit(): Gửi một hàm để thực thi và trả về một đối tượng Future. Phương thức submit() nhận một hàm và thực thi nó không đồng bộ.
  • map(): Thực thi một hàm không đồng bộ cho mỗi phần tử trong một iterable.
  • shutdown(): Tắt executor.

Khi bạn tạo một instance mới của lớp ThreadPoolExecutor, Python sẽ bắt đầu Executor.

Sau khi hoàn thành công việc với executor, bạn phải gọi phương thức shutdown() để giải phóng tài nguyên được giữ bởi executor. Để tránh việc phải gọi phương thức shutdown() một cách thủ công, bạn có thể sử dụng context manager.

Đối tượng Future

Một Future là một đối tượng đại diện cho kết quả cuối cùng của một hoạt động không đồng bộ. Lớp Future có hai phương thức hữu ích:

  • result(): Trả về kết quả của một hoạt động không đồng bộ.
  • exception(): Trả về ngoại lệ của một hoạt động không đồng bộ trong trường hợp có ngoại lệ xảy ra.

Các ví dụ về ThreadPoolExecutor trong Python

Ví dụ sử dụng một luồng trong Python

Chương trình sau sử dụng một luồng đơn:

from time import sleep, perf_counter

def task(id):
    print(f'Bắt đầu tác vụ {id}...')
    sleep(1)
    return f'Hoàn thành tác vụ {id}'

start = perf_counter()

print(task(1))
print(task(2))

finish = perf_counter()

print(f"Chương trình chạy mất {finish-start} giây.")

Kết quả:

Bắt đầu tác vụ 1...
Hoàn thành tác vụ 1
Bắt đầu tác vụ 2...
Hoàn thành tác vụ 2
Chương trình chạy mất 2.0144479 giây.

Cách hoạt động:

  • Đầu tiên, định nghĩa hàm task() mất khoảng một giây để hoàn thành.
  • Thứ hai, gọi hàm task() hai lần và in kết quả.
  • Thứ ba, in thời gian chương trình chạy.

Vì hàm task() mất một giây, gọi nó hai lần sẽ mất khoảng 2 giây.

Sử dụng phương thức submit() trong Python

Để chạy hàm task() một cách đồng thời, bạn có thể sử dụng lớp ThreadPoolExecutor:

from time import sleep, perf_counter
from concurrent.futures import ThreadPoolExecutor

def task(id):
    print(f'Bắt đầu tác vụ {id}...')
    sleep(1)
    return f'Hoàn thành tác vụ {id}'

start = perf_counter()

with ThreadPoolExecutor() as executor:
    f1 = executor.submit(task, 1)
    f2 = executor.submit(task, 2)

    print(f1.result())
    print(f2.result())    

finish = perf_counter()

print(f"Chương trình chạy mất {finish-start} giây.")

Kết quả

Bắt đầu tác vụ 1...
Bắt đầu tác vụ 2...
Hoàn thành tác vụ 1
Hoàn thành tác vụ 2
Chương trình chạy mất 1.0177214 giây.

Chương trình chạy mất khoảng 1 giây để hoàn thành.

Cách hoạt động:

  • Đầu tiên, import lớp ThreadPoolExecutor từ module concurrent.futures.
  • Thứ hai, tạo một pool luồng sử dụng context manager.
  • Thứ ba, gọi hàm task() hai lần bằng cách gửi nó đến phương thức submit() của executor.

Phương thức submit() trả về một đối tượng Future. Trong ví dụ này, chúng ta có hai đối tượng Futuref1f2. Để lấy kết quả từ đối tượng Future, chúng ta gọi phương thức result() của nó.

Sử dụng phương thức map() trong Python

Chương trình sau sử dụng lớp ThreadPoolExecutor. Tuy nhiên, thay vì sử dụng phương thức

from time import sleep, perf_counter
from concurrent.futures import ThreadPoolExecutor

def task(id):
    print(f'Bắt đầu tác vụ {id}...')
    sleep(1)
    return f'Hoàn thành tác vụ {id}'

start = perf_counter()

with ThreadPoolExecutor() as executor:
    results = executor.map(task, [1,2])
    for result in results:
        print(result)

finish = perf_counter()

print(f"Chương trình chạy mất {finish-start} giây.")

Cách hoạt động:

  • Đầu tiên, gọi phương thức map() của đối tượng executor để chạy hàm task cho mỗi id trong danh sách [1,2]. Phương thức map() trả về một iterator chứa kết quả của các lần gọi hàm.
  • Thứ hai, lặp qua các kết quả và in chúng ra.

Ví dụ thực tế về ThreadPoolExecutor trong Python

Chương trình sau tải xuống nhiều hình ảnh từ Wikipedia sử dụng một pool luồng:

from concurrent.futures import ThreadPoolExecutor
from urllib.request import urlopen
import time
import os

def download_image(url):
    image_data = None
    with urlopen(url) as f:
        image_data = f.read()

    if not image_data:
        raise Exception(f"Error: không thể tải ảnh từ {url}")

    filename = os.path.basename(url)
    with open(filename, 'wb') as image_file:
        image_file.write(image_data)
        print(f'{filename} đã được tải xuống...')

start = time.perf_counter()

urls = [
    'https://upload.wikimedia.org/wikipedia/commons/9/9d/Python_bivittatus_1701.jpg',
    'https://upload.wikimedia.org/wikipedia/commons/4/48/Python_Regius.jpg',
    'https://upload.wikimedia.org/wikipedia/commons/d/d3/Baby_carpet_python_caudal_luring.jpg',
    'https://upload.wikimedia.org/wikipedia/commons/f/f0/Rock_python_pratik.JPG',
    'https://upload.wikimedia.org/wikipedia/commons/0/07/Dulip_Wilpattu_Python1.jpg'
]

with ThreadPoolExecutor() as executor:
    executor.map(download_image, urls)

finish = time.perf_counter()    

print(f'Chương trình chạy mất {finish-start} giây.')

Cách hoạt động:

  • Đầu tiên, định nghĩa hàm download_image() tải xuống một hình ảnh từ một URL và lưu vào một file.
  • Thứ hai, thực thi hàm download_image() sử dụng một pool luồng bằng cách gọi phương thức map() của đối tượng ThreadPoolExecutor.

Kết bài

Qua bài viết này, mình đã tìm hiểu cách sử dụng ThreadPoolExecutor để quản lý và tối ưu hóa việc xử lý đa luồng trong Python. Bằng cách sử dụng ThreadPoolExecutor, chúng ta có thể dễ dàng tạo và quản lý một nhóm các luồng (thread pool), giúp cải thiện hiệu suất và giảm thiểu tài nguyên hệ thống. Việc áp dụng các phương pháp như submit()map() của ThreadPoolExecutor không chỉ giúp mình xử lý các tác vụ một cách hiệu quả mà còn đơn giản hóa quá trình quản lý luồng trong các chương trình phức tạp. Hy vọng rằng với những kiến thức này, bạn có thể áp dụng chúng vào các dự án thực tế để tối ưu hóa hiệu suất và hiệu quả của các chương trình Python của mình.

Cùng chuyên mục:

Cách lưu trữ và tải lại Models trong PyTorch

Cách lưu trữ và tải lại Models trong PyTorch

Tìm hiểu về TensorBoard với PyTorch

Tìm hiểu về TensorBoard với PyTorch

Học chuyển giao (Transfer Learning) trong PyTorch Beginner

Học chuyển giao (Transfer Learning) trong PyTorch Beginner

Hướng dẫn cơ bản mạng Nơ-ron Tích Chập (CNN) trong PyTorch

Hướng dẫn cơ bản mạng Nơ-ron Tích Chập (CNN) trong PyTorch

Mạng Nơ-Ron truyền thẳng (Feed Forward Neural Network) trong PyTorch

Mạng Nơ-Ron truyền thẳng (Feed Forward Neural Network) trong PyTorch

Tìm hiểu Activation Functions trong PyTorch

Tìm hiểu Activation Functions trong PyTorch

Softmax và Cross Entropy trong PyTorch Beginner

Softmax và Cross Entropy trong PyTorch Beginner

Dataset Transforms trong PyTorch Beginner

Dataset Transforms trong PyTorch Beginner

Dataset và DataLoader trong PyTorch Beginner

Dataset và DataLoader trong PyTorch Beginner

Hồi quy Logistic trong PyTorch Beginner

Hồi quy Logistic trong PyTorch Beginner

Hồi quy tuyến tính trong PyTorch Beginner

Hồi quy tuyến tính trong PyTorch Beginner

Training Pipeline trong PyTorch Beginner

Training Pipeline trong PyTorch Beginner

Sử dụng Gradient Descent với Autograd trong PyTorch

Sử dụng Gradient Descent với Autograd trong PyTorch

Hướng dẫn về Tensor cơ bản trong PyTorch

Hướng dẫn về Tensor cơ bản trong PyTorch

Hướng dẫn cài đặt PyTorch với Deep Learning

Hướng dẫn cài đặt PyTorch với Deep Learning

LDA (Linear Discriminant Analysis) trong Python

LDA (Linear Discriminant Analysis) trong Python

Thuật toán AdaBoost trong Python

Thuật toán AdaBoost trong Python

Thuật toán K-Means Clustering trong Python

Thuật toán K-Means Clustering trong Python

Triển khai PCA bằng Python

Triển khai PCA bằng Python

Triển khai thuật toán Random Forest bằng Python

Triển khai thuật toán Random Forest bằng Python

Top