ThreadPoolExecutor「推荐」

参考

https://www.cnblogs.com/zjfjava/p/11227456.html

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolSerialTest {
public static void main(String[] args) throws InterruptedException {
//核心线程数
int corePoolSize = 10;
//最大线程数
int maximumPoolSize = 20;
//超过 corePoolSize 线程数量的线程最大空闲时间
long keepAliveTime = 2;
//以秒为时间单位
TimeUnit unit = TimeUnit.SECONDS;
//创建工作队列,用于存放提交的等待执行任务
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(20);
ThreadPoolExecutor threadPoolExecutor = null;

Map<Long, List<Integer>> threadMap = new HashMap<>();
System.out.println("主线程 id:" + Thread.currentThread().getId());
long start = System.currentTimeMillis();
try {
//创建线程池
threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
new ThreadPoolExecutor.CallerRunsPolicy());

//循环提交任务
for (int i = 0; i < 20; i++) {
//提交任务的索引
final int index = (i + 1);
threadPoolExecutor.submit(() -> {
//线程打印输出
long id = Thread.currentThread().getId();

List<Integer> indexList = threadMap.get(id);
if (indexList == null) {
indexList = new ArrayList<>();
indexList.add(index);
threadMap.put(id, indexList);
} else {
indexList.add(index);
}

System.out.println("大家好,我的任务编号是:" + index + " thread-Id:" + id);
//模拟线程执行时间,10s
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
} finally {
threadPoolExecutor.shutdown();
}

try {
//等待直到所有任务完成,才能执行后面代码
threadPoolExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);

long end = System.currentTimeMillis();
System.out.println("耗时:" + (end-start) + "ms");

threadMap.forEach((key, value) -> {
System.out.println("threadid:" + key + " indexs:" + value.toString());
});
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Executors「不推荐使用」

示例

1
2
3
4
5
6
7
8
9
10
//创建一个线程池,包含 6 个线程
ExecutorService executorService = Executors.newFixedThreadPool(6);

//循环处理中,取出线程去执行业务逻辑
IntStream.range(0, 20).forEach(i -> executorService.submit(() -> {
//业务代码
}));

//30 秒后判断是否停止。是-返回 true;否-返回 false
executorService.awaitTermination(30, TimeUnit.SECONDS);

ExecutorService 的关闭

shutdown 和 awaitTermination 为接口 ExecutorService 定义的两个方法,一般情况配合使用来关闭线程池。

shutdown 方法

平滑的关闭 ExecutorService,当此方法被调用时,ExecutorService 停止接收新的任务并且等待已经提交的任务(包含提交正在执行和提交未执行)执行完成。当所有提交任务执行完毕,线程池即被关闭

awaitTermination 方法

接收人 timeout 和 TimeUnit 两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测 ExecutorService 是否已经关闭,若关闭则返回 true,否则返回 false。一般情况下会和 shutdown 方法组合使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);

service.submit(new Task());
service.submit(new Task());
service.submit(new LongTask());
service.submit(new Task());

service.shutdown();

while (!service.awaitTermination(1, TimeUnit.SECONDS)) {
System.out.println("线程池没有关闭");
}

System.out.println("线程池已经关闭");