先看问题

线程池这块,很多项目不是不会用,而是用得太随便。

最常见的问题就两类:

1、所有异步任务共用一个线程池

2、拒绝策略和队列长度直接照抄模板

平时流量不高时看不出问题,一到高峰期,慢任务、重任务、补偿任务、回调任务全挤在一起,排查时也很难判断到底是哪类任务把线程打满了。

这篇只讲业务里常见的线程池隔离实践,不展开讲线程池原理。

先拆池子

最常见的错误写法,是全项目只保留一个业务线程池。

1
2
3
4
5
6
7
8
9
10
11
@Bean("commonExecutor")
public ThreadPoolTaskExecutor commonExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(16);
executor.setMaxPoolSize(32);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("common-exec-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}

这种写法最大的风险不是参数不合理,而是所有任务都进了同一个池子。

更稳的方式是按职责拆。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Configuration
public class ExecutorConfig {

@Bean("queryExecutor")
public ThreadPoolTaskExecutor queryExecutor() {
return buildExecutor(16, 32, 200, "query-exec-", new ThreadPoolExecutor.AbortPolicy());
}

@Bean("actionExecutor")
public ThreadPoolTaskExecutor actionExecutor() {
return buildExecutor(24, 48, 100, "action-exec-", new ThreadPoolExecutor.CallerRunsPolicy());
}

@Bean("compensateExecutor")
public ThreadPoolTaskExecutor compensateExecutor() {
return buildExecutor(8, 16, 500, "compensate-exec-", new ThreadPoolExecutor.DiscardOldestPolicy());
}

private ThreadPoolTaskExecutor buildExecutor(int coreSize,
int maxSize,
int queueCapacity,
String prefix,
RejectedExecutionHandler handler) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(coreSize);
executor.setMaxPoolSize(maxSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix(prefix);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(10);
executor.setRejectedExecutionHandler(handler);
executor.initialize();
return executor;
}
}

这类拆法适合:

1、查询和执行链路负载差异很大

2、补偿任务允许堆积,但主链路不允许

3、不同任务的失败处理方式不同

线程池隔离的核心价值,不是“看起来专业”,而是高峰期互相不拖垮。

队列别共用一套模板

很多人配线程池参数时,只盯着核心线程数和最大线程数,反而忽略了队列长度。

队列过大,会把问题藏起来;队列过小,会把拒绝放大出来。

更接近业务的写法,是先按任务性质分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public enum TaskType {
QUERY,
ACTION,
COMPENSATE,
CALLBACK
}

public class ExecutorRouter {

private final ThreadPoolTaskExecutor queryExecutor;
private final ThreadPoolTaskExecutor actionExecutor;
private final ThreadPoolTaskExecutor compensateExecutor;

public ExecutorRouter(ThreadPoolTaskExecutor queryExecutor,
ThreadPoolTaskExecutor actionExecutor,
ThreadPoolTaskExecutor compensateExecutor) {
this.queryExecutor = queryExecutor;
this.actionExecutor = actionExecutor;
this.compensateExecutor = compensateExecutor;
}

public Executor choose(TaskType taskType) {
switch (taskType) {
case QUERY:
return queryExecutor;
case ACTION:
return actionExecutor;
case COMPENSATE:
case CALLBACK:
return compensateExecutor;
default:
throw new IllegalArgumentException("unknown taskType");
}
}
}

一般可以这么理解:

1、查询任务怕排队太久,队列不要过长

2、执行任务更怕失败,需要留一定缓冲

3、补偿任务允许堆积,但必须可观测

线程池参数不是背模板,而是要先看任务能不能等、能不能丢、能不能降级。

任务提交要带业务标识

线上排查线程池问题时,最怕日志只告诉你“线程池满了”,但不知道满的是谁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class BizRunnable implements Runnable {

private final String bizCode;
private final String traceId;
private final Runnable delegate;

public BizRunnable(String bizCode, String traceId, Runnable delegate) {
this.bizCode = bizCode;
this.traceId = traceId;
this.delegate = delegate;
}

@Override
public void run() {
MDC.put("traceId", traceId);
MDC.put("bizCode", bizCode);
try {
delegate.run();
} finally {
MDC.remove("traceId");
MDC.remove("bizCode");
}
}
}

配合提交侧封装:

1
2
3
4
5
6
7
8
public void submitOrderTask(OrderRequest request) {
String traceId = TraceIdHolder.get();
actionExecutor.execute(new BizRunnable(
"order:create",
traceId,
() -> orderService.createOrder(request)
));
}

这样做的价值很直接:

1、线程池打满时能知道是哪个业务打满了

2、日志里能把 traceId 和业务动作关联起来

3、拒绝告警时有足够上下文做处置

拒绝策略别默认一个模板

很多项目为了“尽量不丢任务”,喜欢统一用 CallerRunsPolicy。

这个策略不是不能用,但前提是你能接受主线程被拖慢。

1
2
3
4
5
6
7
8
executor.setRejectedExecutionHandler((task, pool) -> {
log.error("task rejected, executor={}, activeCount={}, queueSize={}, taskCount={}",
pool,
pool.getActiveCount(),
pool.getQueue().size(),
pool.getTaskCount());
throw new RejectedExecutionException("action executor overloaded");
});

几种常见策略的适用边界:

1、AbortPolicy:主链路任务,不允许静默丢失,直接失败更容易兜底

2、CallerRunsPolicy:低速流量下还能接受,但要确认不会拖慢入口线程

3、DiscardPolicy:只适合无关紧要的通知型任务

4、DiscardOldestPolicy:适合补偿类或延迟处理类任务,但要配可重放能力

如果一个接口本来就是核心下单链路,再用 CallerRunsPolicy 把 Web 线程拖住,最后很可能不是一个任务慢,而是整个入口一起超时。

线程池状态要能看见

线程池用了几年都没人知道当前活跃线程数、队列积压量,这类系统高峰期往往只能靠重启止血。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Component
public class ExecutorMetricsPrinter {

private final ThreadPoolTaskExecutor actionExecutor;
private final ThreadPoolTaskExecutor queryExecutor;

public ExecutorMetricsPrinter(ThreadPoolTaskExecutor actionExecutor,
ThreadPoolTaskExecutor queryExecutor) {
this.actionExecutor = actionExecutor;
this.queryExecutor = queryExecutor;
}

@Scheduled(fixedDelay = 10000)
public void print() {
print("action", actionExecutor);
print("query", queryExecutor);
}

private void print(String name, ThreadPoolTaskExecutor executor) {
ThreadPoolExecutor threadPool = executor.getThreadPoolExecutor();
log.info("executor={}, core={}, max={}, active={}, poolSize={}, queueSize={}, completed={}",
name,
threadPool.getCorePoolSize(),
threadPool.getMaximumPoolSize(),
threadPool.getActiveCount(),
threadPool.getPoolSize(),
threadPool.getQueue().size(),
threadPool.getCompletedTaskCount());
}
}

最少要能看到:

1、当前活跃线程数

2、队列积压数量

3、任务总提交量和完成量

4、拒绝次数

没有这些信息,线程池治理基本等于碰运气。

超时之后怎么办

线程池里还有一个经常被忽略的问题:超时了以后,任务到底停没停。

1
2
3
4
5
6
7
8
9
10
11
public OrderDetail queryOrderDetail(String orderId) {
Future<OrderDetail> future = queryExecutor.submit(() -> remoteOrderFacade.query(orderId));
try {
return future.get(500, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
future.cancel(true);
throw new BizException("query timeout");
} catch (Exception e) {
throw new BizException("query failed", e);
}
}

这类写法至少比只抛一个 timeout 异常更明确。

要注意的是:

1、不是所有任务都能安全中断

2、远程调用超时不等于业务真的没执行

3、取消策略要和幂等、补偿一起设计

常见坑

1、方法里临时 new 线程池

这种写法最容易导致线程数失控,也不方便统一监控和回收。

2、所有任务都共用一个队列

表面上方便,实际上一类慢任务就能把其他任务一起堵住。

3、队列配得太大

队列一旦太大,短时间内看起来很稳,实际上只是把超时和积压往后推。

4、拒绝后只打日志,不回传业务结果

主链路任务被拒绝以后,调用方还以为成功受理,这类问题最难排查。

5、线程池监控只看线程数,不看队列和拒绝

真正能反映风险的,往往不是线程跑满,而是队列持续堆积和拒绝频率上升。

收一下

线程池治理最容易被误解成“调参数”,但真正决定线上稳定性的,往往是这几件事:

1、按职责拆池,避免不同任务互相拖垮

2、按任务后果选拒绝策略,不要默认一个模板

3、任务提交时带上业务标识和 TraceId

4、把线程池活跃数、队列积压、拒绝次数暴露出来

5、超时、中断、补偿要一起设计,不能只管抛异常