PYTHON CONCURRENCY
CÁC CHỦ ĐỀ
BÀI MỚI NHẤT
MỚI CẬP NHẬT

Thông báo: Download 4 khóa học Python từ cơ bản đến nâng cao tại đây.

Sử dụng asyncio.wait_for() trong Python

Trong bài hướng dẫn này, bạn sẽ học cách sử dụng hàm asyncio.wait_for() để chờ một coroutine hoàn thành trong một khoảng thời gian nhất định. Điều này rất hữu ích khi bạn cần đảm bảo rằng một tác vụ không kéo dài vô tận và có thể kiểm soát được thời gian chờ của nó. Hãy cùng khám phá cách sử dụng hàm asyncio.wait_for() để quản lý các tác vụ bất đồng bộ một cách hiệu quả và bảo vệ chương trình của bạn khỏi việc bị treo do các tác vụ kéo dài.

test php

banquyen png
Bài viết này được đăng tại freetuts.net, không được copy dưới mọi hình thức.

Giới thiệu về hàm asyncio.wait_for() trong Python

Trong bài viết trước, bạn đã học cách hủy bỏ một tác vụ đang chạy bằng cách sử dụng phương thức cancel() của đối tượng Task.

Để chờ một tác vụ hoàn thành trong một khoảng thời gian nhất định, bạn có thể sử dụng hàm asyncio.wait_for(). Hàm asyncio.wait_for() sẽ chờ một tác vụ duy nhất hoàn thành trong khoảng thời gian đã định.

Khi hết thời gian chờ, hàm asyncio.wait_for() sẽ hủy bỏ tác vụ và ném ra ngoại lệ TimeoutError. Nếu không, nó sẽ trả về kết quả của tác vụ. Ví dụ:

Bài viết này được đăng tại [free tuts .net]

import asyncio
from asyncio.exceptions import TimeoutError

async def call_api(message, result=1000, delay=3):
    print(message)
    await asyncio.sleep(delay)
    return result

async def main():
    task = asyncio.create_task(
        call_api('Calling API...', result=2000, delay=5)
    )

    MAX_TIMEOUT = 3
    try:
        await asyncio.wait_for(task, timeout=MAX_TIMEOUT)
    except TimeoutError:
        print('The task was cancelled due to a timeout')

asyncio.run(main())

Kết quả:

Calling API...
The task was cancelled due to a timeout

Cách hoạt động

Đầu tiên, định nghĩa một coroutine call_api() mà mất 3 giây để hoàn thành theo mặc định:

async def call_api(message, result=1000, delay=3):
    print(message)
    await asyncio.sleep(delay)
    return result

Thứ hai, tạo một tác vụ bọc lấy coroutine call_api và mất 5 giây để hoàn thành:

task = asyncio.create_task(
    call_api('Calling API...', result=2000, delay=5)
)

Thứ ba, sử dụng hàm asyncio.wait_for() để chờ tác vụ hoàn thành trong 3 giây. Vì tác vụ mất 5 giây để hoàn thành, nên sẽ xảy ra timeout và ngoại lệ TimeoutError sẽ được ném ra:

MAX_TIMEOUT = 3
try:
    await asyncio.wait_for(task, timeout=MAX_TIMEOUT)
except TimeoutError:
    print('The task was cancelled due to a timeout')

Bảo vệ một tác vụ khỏi bị hủy trong Python

Đôi khi, bạn có thể muốn thông báo cho người dùng rằng một tác vụ đang mất nhiều thời gian hơn dự kiến nhưng không hủy tác vụ khi hết thời gian chờ.

Để làm điều đó, bạn có thể bọc tác vụ bằng hàm asyncio.shield(). Hàm asyncio.shield() ngăn chặn việc hủy bỏ một tác vụ. Ví dụ:

import asyncio
from asyncio.exceptions import TimeoutError

async def call_api(message, result=1000, delay=3):
    print(message)
    await asyncio.sleep(delay)
    return result

async def main():
    task = asyncio.create_task(
        call_api('Calling API...', result=2000, delay=5)
    )

    MAX_TIMEOUT = 3
    try:
        await asyncio.wait_for(asyncio.shield(task), timeout=MAX_TIMEOUT)
    except TimeoutError:
        print('The task took more than expected and will complete soon.')
        result = await task
        print(result)

asyncio.run(main())

Kết quả:

Calling API...
The task took more than expected and will complete soon.
2000

Trong ví dụ này, tác vụ mất 5 giây để hoàn thành. Một khi thời gian chờ là 3 giây, ngoại lệ TimeoutError được ném ra. Tuy nhiên, tác vụ không bị hủy do hàm asyncio.shield().

Trong phần xử lý ngoại lệ, chúng ta chờ tác vụ hoàn thành và in ra kết quả.

Kết bài

Sử dụng hàm asyncio.wait_for() để chờ một tác vụ với thời gian chờ giúp bạn quản lý hiệu quả các tác vụ bất đồng bộ và ngăn chặn chúng kéo dài vô tận. Khi cần đảm bảo rằng một tác vụ sẽ hoàn thành ngay cả khi đã vượt quá thời gian chờ, bạn có thể sử dụng hàm asyncio.shield(). Hàm này sẽ bảo vệ tác vụ khỏi bị hủy bỏ, cho phép nó tiếp tục chạy và hoàn thành. Bằng cách kết hợp hai hàm này, bạn có thể kiểm soát tốt hơn thời gian và tiến trình của các tác vụ bất đồng bộ trong chương trình của mình.

Cùng chuyên mục:

Cách sử dụng lớp QTreeWidget của PyQt

Cách sử dụng lớp QTreeWidget của PyQt

Cách sử dụng lớp QTableWidget để tạo một bảng

Cách sử dụng lớp QTableWidget để tạo một bảng

Cách sử dụng lớp QListWidget trong Python

Cách sử dụng lớp QListWidget trong Python

Cách dùng lớp QStatusBar trong PyQt để tạo thanh status bar

Cách dùng lớp QStatusBar trong PyQt để tạo thanh status bar

Cách dùng lớp QDockWidget của PyQt để tạo một widget

Cách dùng lớp QDockWidget của PyQt để tạo một widget

Cách dùng lớp PyQt QToolBar để tạo các widget toolbar

Cách dùng lớp PyQt QToolBar để tạo các widget toolbar

Cách sử dụng lớp PyQt QMenu để tạo menu

Cách sử dụng lớp PyQt QMenu để tạo menu

Cách sử dụng lớp QMainWindow của PyQt để tạo cửa sổ

Cách sử dụng lớp QMainWindow của PyQt để tạo cửa sổ

Cách dùng lớp PyQt QFileDialog để tạo hộp thoại chọn file

Cách dùng lớp PyQt QFileDialog để tạo hộp thoại chọn file

Cách dùng lớp PyQt QInputDialog để tạo một hộp thoại nhập liệu

Cách dùng lớp PyQt QInputDialog để tạo một hộp thoại nhập liệu

Cách sử dụng lớp PyQt QMessageBox để tạo một hộp thoại

Cách sử dụng lớp PyQt QMessageBox để tạo một hộp thoại

Cách sử dụng lớp PyQt QProgressBar để tạo một widget progress bar

Cách sử dụng lớp PyQt QProgressBar để tạo một widget progress bar

Cách dùng lớp PyQt QTextEdit để tạo một widget cho phép chỉnh sửa

Cách dùng lớp PyQt QTextEdit để tạo một widget cho phép chỉnh sửa

Cách dùng lớp PyQt QGroupBox để tạo một khung nhóm với tiêu đề

Cách dùng lớp PyQt QGroupBox để tạo một khung nhóm với tiêu đề

Cách dùng lớp PyQt QTabWidget để tạo một widget dạng tab

Cách dùng lớp PyQt QTabWidget để tạo một widget dạng tab

Cách dùng PyQt QWidget để làm container chứa các widget khác.

Cách dùng PyQt QWidget để làm container chứa các widget khác.

Cách sử dụng lớp PyQt QSlider để tạo một widget thanh trượt (slider).

Cách sử dụng lớp PyQt QSlider để tạo một widget thanh trượt (slider).

Cách tạo một widget nhập ngày và giờ sử dụng PyQt QDateTimeEdit

Cách tạo một widget nhập ngày và giờ sử dụng PyQt QDateTimeEdit

Cách tạo một widget nhập giờ sử dụng lớp PyQt QTimeEdit

Cách tạo một widget nhập giờ sử dụng lớp PyQt QTimeEdit

Cách tạo một widget nhập ngày sử dụng lớp PyQt QDateEdit

Cách tạo một widget nhập ngày sử dụng lớp PyQt QDateEdit

Top