Hướng dẫn tạo và sử dụng ThreadPool
Trong bài trước, các bạn đã được học về cách sử dụng sleep()
và join()
. Sang bài này, tôi sẽ hướng dẫn các bạn tìm hiểu cách tạo và sử dụng ThreadPool
. Các bạn theo dõi nhé!
1. ThreadPool là gì?
Trong Java, ThreadPool
được dùng để giới hạn số lượng Thread
được chạy bên trong ứng dụng của chúng ta trong cùng một thời điểm. Nếu chúng ta không có sự giới hạn này, mỗi khi có một Thread
mới được tạo ra và được cấp phát bộ nhớ bằng từ khóa new
thì sẽ có vấn đề về bộ nhớ và hiệu suất, có thể dẫn đến lỗi crash chương trình.
Ví dụ: Khi chúng ta viết chương trình tải các tập tin từ Internet, mỗi tập tin cần 1 Thread
để thực hiện quá trình tải, giả sử cần tải 1000 tệp hình ảnh thì chúng ta phải cần tới 1000 Thread
hoạt động cùng một thời điểm trong cùng một chương trình. Điều này sẽ dễ dẫn đến lỗi quá tải của chương trình, làm ảnh hưởng đến hiệu suất và chương trình sẽ rất dễ bị crash vì khó kiểm soát.
Vì vậy, để khắc phục hiện tượng này, Java cho phép chúng ta thay vì phải tạo mới Thread
cho mỗi nhiệm vụ, quá trình được thực hiện trong cùng một thời điểm thì các nhiệm vụ, quá trình đó có thể được đưa vào trong một ThreadPool
để khi trong ThreadPool
có bất kỳ Thread
nào đang không phải thực hiện một nhiệm vụ nào thì sẽ có nhiệm vụ gán vào một trong số các Thread
đó để thực thi. Điều này sẽ giúp khắc phục được sự tắc nghẽn và chương trình sẽ kiểm soát được các luồng thực thi.
Bài viết này được đăng tại [free tuts .net]
Bên trong ThreadPool
, các nhiệm vụ sẽ được chèn vào trong một Blocking Queue. Blocking Queue có thể hiểu là nơi chứa các nhiệm vụ mà các Thread
sẽ lấy chúng ra và thực thi lần lượt. Mỗi khi có một nhiệm vụ mới được thêm vào Queue và sau đó sẽ chỉ có một Thread
đang không phải thực hiện một nhiệm vụ nào vào Queue lấy nhiệm vụ đó ra, còn các Thread
còn lại phải chờ đợi cho đến khi Thread đó lấy nhiệm vụ ra thành công.
Ví dụ dưới đây sẽ minh họa cách tạo ThreadPool
bằng cách sử dụng ThreadPoolExecutor
:
package vidu; public class RunPool implements Runnable { int id; @Override public void run() { System.out.println("Đang xử lý tiến trình " + id); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Đã xử lý tiến trình " + id); } public RunPool(int id) { this.id = id; } }
package vidu; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class MyThreadPool { public static void main(String[] args) { ArrayBlockingQueue<Runnable> hangDoi = new ArrayBlockingQueue<>(100); // khi số tiến trình của chúng ta vượt quá maxSize ở đây là 5 // ví dụ như đối số thứ nhất = 6 // thì tất cả những tiến trình mới mà chúng ta tạo ra sẽ được đưa vào hangDoi ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 12, TimeUnit.SECONDS, hangDoi); // dùng vòng lặp for để có thể chạy các Thread for (int i = 0; i < 10; i++) { // trong phương thức execute() thì đối số truyền vào phải là một Runnable // đó là lý do mà lớp RunPool phải implements từ interface Runnable threadPoolExecutor.execute(new RunPool(i)); } } }
Kết quả sau khi biên dịch chương trình:
Giải thích hoạt động của chương trình trên:
Trong dòng code khởi tạo ThreadPoolExecutor
: ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS, hangDoi);
chúng ta có 5 tham số:
- Đối số 1 (
corePoolSize
) là số lượngThread
tối thiểu trongThreadPool
, ở đâycorePoolSize = 5
. Khi khởi tạo, số lượngThread
có thể là 0. Khi nhiệm vụ được thêm vào thìThread
mới được tạo ra và kể từ đây, nếu số lượngThread
ít hơncorePoolSize
thì nhữngThread
mới sẽ được tạo ra đến khi sốThread
bằng giá trị củacorePoolSize
. - Đối số 2 (
maximumPoolSize
) là số lượng tối đa cácThread
trongThreadPool
. - Đối số 3 (
keepAliveTime
): khi sốThread
lớn hơncorePoolSize
thìkeepAliveTime
là thời gian tối đa mà 1Thread
"nhàn rỗi" chờ nhiệm vụ. Khi hết thời gian chờ màThread
đó chưa có nhiệm vụ thì nó sẽ bị hủy. - Đối số 4 (
unit
) là đơn vị thời gian củakeepAliveTime
. Trong ví dụ này thì unit của tôi làTimeUnit.SECONDS
. - Đối số 5 (
workQueue
) là hàng đợi dùng để chứa các nhiệm vụ mà cácThread
sẽ lấy chúng ra và thực thi lần lượt, ở đây tôi dùngArrayBlockingQueue
.
2. ExecutorService
Kể từ Java 5 trở đi, ThreadPool
đã được xây dựng sẵn trong gói java.util.concurrent
, vì vậy chúng ta không cần phải tạo một ThreadPool
mà thay vào đó chúng ta sẽ sử dụng các lớp có sẵn của gói này. Java cung cấp cho chúng ta lớp Executor
, interface của lớp Executor
là ExecutorService
.
Ta có thể tạo ThreadPool
thông qua Interface ExecutorService
, các nhiệm vụ sẽ được đưa vào ThreadPool
và được xử lý bằng một trong những phương thức có sẵn mà Executor
cung cấp như sau:
newSingleThreadExecutor()
: TrongThreadPool
chỉ có 1Thread
và các nhiêm vụ sẽ được xử lý một cách tuần tự.newCachedThreadPool()
: TrongThreadPool
sẽ có nhiềuThread
và các nhiệm vụ sẽ được xử lý một cách song song. CácThread
cũ sau khi xử lý xong sẽ được sử dụng lại cho nhiệm vụ mới. Mặc định nếu mộtThread
không được sử dụng trong vòng 60 giây thìThread
đó sẽ bị tắt.newFixedThreadPool()
: TrongThreadPool
sẽ được cố định cácThread
. Nếu một nhiệm vụ mới được đưa vào mà cácThread
đều đang "bận rộn" thì nhiệm vụ đó sẽ được gửi vào Blocking Queue và sau đó nếu có mộtThread
đã thực thi xong nhiệm vụ của nó thì nhiệm vụ đang ở trong Queue đó sẽ được push ra khỏi Queue và đượcThread
đó xử lý tiếp.newScheduledThreadPool()
: tương tự nhưnewCachedThreadPool()
nhưng sẽ có thời gian delay giữa cácThread
.newSingleThreadScheduledExecutor()
: tương tự nhưnewSingleThreadExecutor()
nhưng sẽ có khoảng thời gian delay giữa cácThread
.
Interface ExecutorService
đại diện cho cơ chế thực thi bất đồng bộ có khả năng thực thi các nhiệm vụ trong background (nền). ExecutorService
tương tự như một ThreadPool
và trong thực tế, việc triển khai ExecutorService
là một triển khai ThreadPool
.
Sau đây tôi sẽ đưa ra một ví dụ minh họa cách sử dụng ExecutorService
:
package vidu; public class RunPool implements Runnable { int id; @Override public void run() { System.out.println("Đang xử lý tiến trình " + id); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Đã xử lý tiến trình " + id); } public RunPool(int id) { this.id = id; } }
package vidu; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class MyThreadPool { public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) { pool.submit(new RunPool(i)); // chay ThreadPool, đối số là 1 Runnable } try { // thời gian sống của mỗi Thread là 1 ngày (nếu nó chưa thực thi xong) pool.awaitTermination(1, TimeUnit.DAYS); } catch (InterruptedException e) { e.printStackTrace(); } pool.shutdown(); // tắt ThreadPool } }
Kết quả sau khi biên dịch chương trình:
Lưu ý: Bạn nên shutdown một ThreadPool
bằng cách gọi phương thức shutdown()
bởi vì ta không thể chắc chắn được rằng máy ảo Java có thể tự động làm điều đó.
3. Lời kết
Trong bài này, tôi đã hướng dẫn các bạn tìm hiểu về cách tạo và sử dụng ThreadPool
.Sang bài sau, tôi sẽ trình bày các phần còn lại liên quan đến Thread
. Các bạn theo dõi nhé!