ThreadPoolTaskExecutor

Spring

applicationContext.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

<!-- 开启异步,并引入线程池 -->
<task:annotation-driven executor="threadPool" />

<!-- 定义线程池 -->
<bean id="threadPool"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 核心线程数,默认为1 -->
<property name="corePoolSize" value="10" />

<!-- 最大线程数,默认为Integer.MAX_VALUE -->
<property name="maxPoolSize" value="50" />

<!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE -->
<property name="queueCapacity" value="100" />

<!-- 线程池维护线程所允许的空闲时间,默认为60s -->
<property name="keepAliveSeconds" value="30" />

<!-- 完成任务自动关闭 , 默认为false-->
<property name="waitForTasksToCompleteOnShutdown" value="true" />

<!-- 核心线程超时退出,默认为false -->
<property name="allowCoreThreadTimeOut" value="true" />

<!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 -->
<property name="rejectedExecutionHandler">
<!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->
<!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->
<!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
<!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>
</beans>

MultiThreadDemo-线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 多线程并发处理demo
* @author daniel.zhao
*
*/
public class MultiThreadDemo implements Runnable {

private MultiThreadProcessService multiThreadProcessService;

public MultiThreadDemo() {
}

public MultiThreadDemo(MultiThreadProcessService multiThreadProcessService) {
this.multiThreadProcessService = multiThreadProcessService;
}

@Override
public void run() {
multiThreadProcessService.processSomething();
}
}

MultiThreadProcessService-业务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Service
public class MultiThreadProcessService {

public static final Logger logger = Logger.getLogger(MultiThreadProcessService.class);

/**
* 默认处理流程耗时1000ms
*/
public void processSomething() {
logger.debug("MultiThreadProcessService-processSomething" + Thread.currentThread() + "......start");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.debug("MultiThreadProcessService-processSomething" + Thread.currentThread() + "......end");
}
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { MultiThreadConfig.class })
public class MultiThreadTest {

@Autowired
private ThreadPoolTaskExecutor taskExecutor;

@Autowired
private MultiThreadProcessService multiThreadProcessService;

@Test
public void test() {
int n = 20;
for (int i = 0; i < n; i++) {
taskExecutor.execute(new MultiThreadDemo(multiThreadProcessService));
System.out.println("int i is " + i + ", now threadpool active threads totalnum is " + taskExecutor.getActiveCount());
}

try {
System.in.read();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

SpringBoot

参考

SpringBoot 线程池的创建、@Async 配置步骤及注意事项

springBoot 启动类的配置

1
2
3
4
5
6
7
8
@ServletComponentScan
@SpringBootApplication
@EnableAsync
public class ClubApiApplication {
public static void main(String[] args) {
SpringApplication.run(ClubApiApplication.class, args);
}
}

配置类

ThreadPoolTaskConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.concurrent.ThreadPoolExecutor;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
* 线程池配置
* @author zhh
*
*/
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {

/**
* 默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,
* 当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
* 当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝
*/

/** 核心线程数(默认线程数) */
private static final int corePoolSize = 20;
/** 最大线程数 */
private static final int maxPoolSize = 100;
/** 允许线程空闲时间(单位:默认为秒) */
private static final int keepAliveTime = 10;
/** 缓冲队列大小 */
private static final int queueCapacity = 200;
/** 线程池名前缀 */
private static final String threadNamePrefix = "Async-Service-";

@Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名
public ThreadPoolTaskExecutor taskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveTime);
executor.setThreadNamePrefix(threadNamePrefix);

// 线程池对拒绝任务的处理策略
// CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化
executor.initialize();
return executor;
}
}

异步方法类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class TranTest2Service {
Logger log = LoggerFactory.getLogger(TranTest2Service.class);

// 发送提醒短信 1
@PostConstruct // 加上该注解项目启动时就执行一次该方法
@Async("taskExecutor") //@Async ("taskExecutor") 对应我们自定义线程池中的 @Bean ("taskExecutor") ,表示使用我们自定义的线程池
public void sendMessage1() throws InterruptedException {
log.info("发送短信方法---- 1 执行开始");
Thread.sleep(5000); // 模拟耗时
log.info("发送短信方法---- 1 执行结束");
}

// 发送提醒短信 2
@PostConstruct // 加上该注解项目启动时就执行一次该方法
@Async("taskExecutor") //@Async ("taskExecutor") 对应我们自定义线程池中的 @Bean ("taskExecutor") ,表示使用我们自定义的线程池
public void sendMessage2() throws InterruptedException {
log.info("发送短信方法---- 2 执行开始");
Thread.sleep(2000); // 模拟耗时
log.info("发送短信方法---- 2 执行结束");
}
}

调用

OrderTaskServic.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Service
public class OrderTaskServic {
@Autowired
private TranTest2Service tranTest2Service;

// 订单处理任务
public void orderTask() throws InterruptedException {

this.cancelOrder(); // 取消订单
tranTest2Service.sendMessage1(); // 发短信的方法 1
tranTest2Service.sendMessage2(); // 发短信的方法 2
}

// 取消订单
public void cancelOrder() throws InterruptedException {
System.out.println("取消订单的方法执行------开始");
System.out.println("取消订单的方法执行------结束 ");
}
}

注意事项

如下方式会使 @Async 失效

  • 异步方法使用 static 修饰
  • 异步类没有使用 @Component 注解(或其他注解)导致 spring 无法扫描到异步类
  • 异步方法不能与被调用的异步方法在同一个类中
  • 类中需要使用 @Autowired 或 @Resource 等注解自动注入,不能自己手动 new 对象
  • 如果使用 SpringBoot 框架必须在启动类中增加 @EnableAsync 注解