PYTHON CONCURRENCY
CÁC CHỦ ĐỀ
BÀI MỚI NHẤT
MỚI CẬP NHẬT

Thông báo: Download 4 khóa học Python từ cơ bản đến nâng cao tại đây.

Hủy bỏ tác vụ trong Python

Trong bài viết này, bạn sẽ học cách hủy bỏ một tác vụ bất đồng bộ kéo dài mà có thể không bao giờ hoàn thành. Bằng cách sử dụng phương thức cancel() của đối tượng Task trong thư viện asyncio, bạn có thể dễ dàng quản lý và kiểm soát các tác vụ trong Python, giúp chương trình của bạn trở nên linh hoạt và hiệu quả hơn.

test php

banquyen png
Bài viết này được đăng tại freetuts.net, không được copy dưới mọi hình thức.

Giới thiệu về hủy bỏ tác vụ trong Python

Dòng lệnh sau sử dụng từ khóa await để đợi một tác vụ hoàn thành:

task = asyncio.create_task(coroutine())
result = await task

Tuy nhiên, nếu coroutine() mất quá nhiều thời gian, bạn sẽ bị kẹt lại trong quá trình chờ đợi mà không thu được kết quả nào. Thêm vào đó, bạn sẽ không có cách nào để dừng lại nếu muốn.

Để giải quyết vấn đề này, bạn có thể hủy bỏ tác vụ bằng cách sử dụng phương thức cancel() của đối tượng Task. Nếu bạn hủy bỏ một tác vụ, nó sẽ gây ra ngoại lệ CancelledError khi bạn await nó. Ví dụ:

Bài viết này được đăng tại [free tuts .net]

import asyncio
from asyncio import CancelledError

async def call_api(message, result=1000, delay=3):
    print(message)
    await asyncio.sleep(delay)
    return result

async def main():
    task = asyncio.create_task(
        call_api('Calling API...', result=2000, delay=5)
    )

    if not task.done():
        print('Cancelling the task...')
        task.cancel()

    try:
        await task
    except CancelledError:
        print('Task has been cancelled.')

asyncio.run(main())

Kết quả:

Cancelling the task...
Task has been cancelled

Cách hoạt động

Coroutine call_api() in ra một thông báo, chờ 3 giây và trả về kết quả.

Tạo một tác vụ mới sử dụng hàm create_task() và truyền vào coroutine call_api(). Tác vụ này sẽ mất 5 giây để hoàn thành:

task = asyncio.create_task(
    call_api('Calling API...', result=2000, delay=5)
)

Kiểm tra nếu tác vụ chưa hoàn thành bằng cách gọi phương thức done() và hủy bỏ tác vụ bằng cách sử dụng phương thức cancel():

if not task.done():
    print('Cancelling the task...')
    task.cancel()

Đợi tác vụ hoàn thành bằng từ khóa await. Vì tác vụ đã bị hủy, ngoại lệ CancelledError sẽ được ném ra:

try:
    await task
except CancelledError:
    print('Task has been cancelled.')

Nếu bạn muốn kiểm tra mỗi giây xem một tác vụ đã hoàn thành chưa và hủy nó nếu một khoảng thời gian đã trôi qua, bạn có thể sử dụng vòng lặp while:

import asyncio
from asyncio import CancelledError

async def call_api(message, result=1000, delay=3):
    print(message)
    await asyncio.sleep(delay)
    return result

async def main():
    task = asyncio.create_task(
        call_api('Calling API...', result=2000, delay=5)
    )

    time_elapsed = 0
    while not task.done():
        time_elapsed += 1
        await asyncio.sleep(1)
        print('Task has not completed, checking again in a second')
        if time_elapsed == 3:
            print('Cancelling the task...')
            task.cancel()
            break

    try:
        await task
    except CancelledError:
        print('Task has been cancelled.')

asyncio.run(main())

Trong ví dụ này, vòng lặp while kiểm tra nếu tác vụ đã hoàn thành mỗi giây và hủy bỏ tác vụ khi thời gian đã trôi qua đạt đến 3 giây.

Kết bài

Sử dụng phương thức cancel() của đối tượng Task để hủy bỏ một tác vụ là cách hiệu quả để quản lý các tác vụ bất đồng bộ kéo dài trong Python. Khi bạn await một tác vụ đã bị hủy, chương trình sẽ gặp ngoại lệ CancelledError, giúp bạn xác định và xử lý các tác vụ bị hủy một cách dễ dàng và rõ ràng hơn. Việc này giúp bạn kiểm soát tốt hơn luồng xử lý và cải thiện hiệu suất của ứng dụng.

Cùng chuyên mục:

Cách thêm Progress Bar trong Python với chỉ một dòng Code

Cách thêm Progress Bar trong Python với chỉ một dòng Code

Toán tử Walrus Operator- Tính năng mới trong Python 3.8

Toán tử Walrus Operator- Tính năng mới trong Python 3.8

Cách nạp dữ liệu Machine Learning từ File trong Python

Cách nạp dữ liệu Machine Learning từ File trong Python

Hướng dẫn sử dụng Google Sheets API với Python

Hướng dẫn sử dụng Google Sheets API với Python

Xây dựng  web Python tự động hóa Twitter | Flask, Heroku, Twitter API & Google Sheets API

Xây dựng web Python tự động hóa Twitter | Flask, Heroku, Twitter API & Google Sheets API

Xây dựng Web Machine Learning đẹp mắt với Streamlit và Scikit-learn trong Python

Xây dựng Web Machine Learning đẹp mắt với Streamlit và Scikit-learn trong Python

Hướng dẫn tạo Chatbot đơn giản bằng PyTorch

Hướng dẫn tạo Chatbot đơn giản bằng PyTorch

11 mẹo và thủ thuật để viết Code Python hiệu quả hơn

11 mẹo và thủ thuật để viết Code Python hiệu quả hơn

Hướng dẫn làm ứng dụng TODO với Flask dành cho người mới bắt đầu trong Python

Hướng dẫn làm ứng dụng TODO với Flask dành cho người mới bắt đầu trong Python

Hướng dẫn viết Snake Game bằng Python

Hướng dẫn viết Snake Game bằng Python

Cách sử dụng chế độ interactive trong Python

Cách sử dụng chế độ interactive trong Python

Cách sử dụng Python Debugger với hàm breakpoint()

Cách sử dụng Python Debugger với hàm breakpoint()

Xây dựng ứng dụng Web Style Transfer với PyTorch và Streamlit

Xây dựng ứng dụng Web Style Transfer với PyTorch và Streamlit

Cách cài đặt Jupyter Notebook trong môi trường Conda và thêm Kernel

Cách cài đặt Jupyter Notebook trong môi trường Conda và thêm Kernel

Hướng dẫn xây dựng ứng dụng dự đoán giá cổ phiếu bằng Python

Hướng dẫn xây dựng ứng dụng dự đoán giá cổ phiếu bằng Python

Hướng dẫn tạo ứng dụng AI hội thoại với NVIDIA Jarvis trong Python

Hướng dẫn tạo ứng dụng AI hội thoại với NVIDIA Jarvis trong Python

Hỗ trợ Async trong Django 3.1

Hỗ trợ Async trong Django 3.1

8 mẹo tái cấu trúc Python giúp mã sạch hơn và Pythonic

8 mẹo tái cấu trúc Python giúp mã sạch hơn và Pythonic

Ý nghĩa của if __name__ ==

Ý nghĩa của if __name__ == "__main__" trong Python

Cách xóa phần tử trong danh sách Python

Cách xóa phần tử trong danh sách Python

Top