【笔记】SpringBoot项目实现多线程

前言

SpringBoot项目实现多线程

直接使用Executor

执行异步方法

src/main/java/com/service/DemoService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.Executor;

@Service
public class DemoService {

@Autowired
private Executor executor;

public void demo() {
executor.execute(() -> {
...
});
}

}

使用默认线程池

  • 每次调用异步方法,Spring都会将方法放入一个线程池中异步执行

定义配置文件

src/main/resources/application.properties
1
2
3
4
5
6
7
8
9
10
11
12
# 核心线程数
spring.task.execution.pool.core-size=8
# 最大线程数
spring.task.execution.pool.max-size=16
# 空闲线程存活时间
spring.task.execution.pool.keep-alive=60s
# 任务队列
spring.task.execution.pool.queue-capacity=100
# 线程名称前缀
spring.task.execution.thread-name-prefix=Custom-Thread-
# 是否允许核心线程超时
spring.task.execution.pool.allow-core-thread-timeout=true

定义配置类

src/main/java/com/config/AsyncConfig.java
1
2
3
4
5
6
7
8
9
package com.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;

@Configuration
@EnableAsync
public class AsyncConfig {
}

定义异步方法

  • 异步方法只能返回voidCompletableFuture<T>
src/main/java/com/AsyncClass.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.util.concurrent.CompletableFuture;

@Component
public class AsyncClass {

@Async
public void asyncMethod1() {

...

}

@Async
public CompletableFuture<String> asyncMethod2() {

...

return CompletableFuture.completedFuture("");
}

}

执行异步方法

src/main/java/com/service/DemoService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.service;

import com.AsyncClass;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

@Service
public class DemoService {

@Autowired
private AsyncClass asyncClass;

public void demo1() {
asyncClass.asyncMethod1();
}

public void demo2() {
CompletableFuture<String>[] futures = new CompletableFuture[2];
futures[0] = asyncClass.asyncMethod2();
futures[1] = asyncClass.asyncMethod2();
// 阻塞等待所有任务执行完毕
CompletableFuture.allOf(futures);
// 获取返回值
System.out.println(futures[0].get());
System.out.println(futures[1].get());
}

}

使用自定义线程池

定义配置类

src/main/java/com/config/AsyncConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.*;

@Configuration
@EnableAsync
public class AsyncConfig {

/**
* 自定义线程池
*/
@Bean("threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("Thread-");
executor.setAllowCoreThreadTimeOut(true);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.initialize();
return executor;
}

/**
* 每次执行任务都会创建新的线程
*/
@Bean("simpleAsyncTaskExecutor")
public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
return new SimpleAsyncTaskExecutor("Thread-");
}

/**
* 将Java标准线程池包装为Spring的TaskExecutor
*/
@Bean("concurrentTaskExecutor")
public ConcurrentTaskExecutor concurrentTaskExecutor() {
return new ConcurrentTaskExecutor(Executors.newFixedThreadPool(10));
}

/**
* Java标准线程池
*/
@Bean("threadPoolExecutor")
public Executor threadPoolExecutor() {
return new ThreadPoolExecutor(
5,
10,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new ThreadPoolExecutor.DiscardPolicy()
);
}

}

定义异步方法

src/main/java/com/AsyncClass.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.util.concurrent.CompletableFuture;

@Component
public class AsyncClass {

@Async("customThreadPool")
public void asyncMethod1() {
...
}

@Async("simpleAsyncTaskExecutor")
public void asyncMethod2() {
...
}

@Async("concurrentTaskExecutor")
public void asyncMethod3() {
...
}

@Async("threadPoolExecutor")
public void asyncMethod4() {
...
}

}

执行异步方法

src/main/java/com/service/DemoService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.service;

import com.AsyncClass;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

@Service
public class DemoService {

@Autowired
private AsyncClass asyncClass;

public void demo1() {
asyncClass.asyncMethod1();
}

public void demo2() {
asyncClass.asyncMethod2();
}

public void demo3() {
asyncClass.asyncMethod3();
}

public void demo4() {
asyncClass.asyncMethod4();
}

}

完成