先看问题
线程池这块,很多项目不是不会用,而是用得太随便。
最常见的问题就两类:
1、所有异步任务共用一个线程池
2、拒绝策略和队列长度直接照抄模板
平时流量不高时看不出问题,一到高峰期,慢任务、重任务、补偿任务、回调任务全挤在一起,排查时也很难判断到底是哪类任务把线程打满了。
这篇只讲业务里常见的线程池隔离实践,不展开讲线程池原理。
先拆池子
最常见的错误写法,是全项目只保留一个业务线程池。
1 2 3 4 5 6 7 8 9 10 11
| @Bean("commonExecutor") public ThreadPoolTaskExecutor commonExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(16); executor.setMaxPoolSize(32); executor.setQueueCapacity(1000); executor.setThreadNamePrefix("common-exec-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; }
|
这种写法最大的风险不是参数不合理,而是所有任务都进了同一个池子。
更稳的方式是按职责拆。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @Configuration public class ExecutorConfig {
@Bean("queryExecutor") public ThreadPoolTaskExecutor queryExecutor() { return buildExecutor(16, 32, 200, "query-exec-", new ThreadPoolExecutor.AbortPolicy()); }
@Bean("actionExecutor") public ThreadPoolTaskExecutor actionExecutor() { return buildExecutor(24, 48, 100, "action-exec-", new ThreadPoolExecutor.CallerRunsPolicy()); }
@Bean("compensateExecutor") public ThreadPoolTaskExecutor compensateExecutor() { return buildExecutor(8, 16, 500, "compensate-exec-", new ThreadPoolExecutor.DiscardOldestPolicy()); }
private ThreadPoolTaskExecutor buildExecutor(int coreSize, int maxSize, int queueCapacity, String prefix, RejectedExecutionHandler handler) { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(coreSize); executor.setMaxPoolSize(maxSize); executor.setQueueCapacity(queueCapacity); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix(prefix); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(10); executor.setRejectedExecutionHandler(handler); executor.initialize(); return executor; } }
|
这类拆法适合:
1、查询和执行链路负载差异很大
2、补偿任务允许堆积,但主链路不允许
3、不同任务的失败处理方式不同
线程池隔离的核心价值,不是“看起来专业”,而是高峰期互相不拖垮。
队列别共用一套模板
很多人配线程池参数时,只盯着核心线程数和最大线程数,反而忽略了队列长度。
队列过大,会把问题藏起来;队列过小,会把拒绝放大出来。
更接近业务的写法,是先按任务性质分:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public enum TaskType { QUERY, ACTION, COMPENSATE, CALLBACK }
public class ExecutorRouter {
private final ThreadPoolTaskExecutor queryExecutor; private final ThreadPoolTaskExecutor actionExecutor; private final ThreadPoolTaskExecutor compensateExecutor;
public ExecutorRouter(ThreadPoolTaskExecutor queryExecutor, ThreadPoolTaskExecutor actionExecutor, ThreadPoolTaskExecutor compensateExecutor) { this.queryExecutor = queryExecutor; this.actionExecutor = actionExecutor; this.compensateExecutor = compensateExecutor; }
public Executor choose(TaskType taskType) { switch (taskType) { case QUERY: return queryExecutor; case ACTION: return actionExecutor; case COMPENSATE: case CALLBACK: return compensateExecutor; default: throw new IllegalArgumentException("unknown taskType"); } } }
|
一般可以这么理解:
1、查询任务怕排队太久,队列不要过长
2、执行任务更怕失败,需要留一定缓冲
3、补偿任务允许堆积,但必须可观测
线程池参数不是背模板,而是要先看任务能不能等、能不能丢、能不能降级。
任务提交要带业务标识
线上排查线程池问题时,最怕日志只告诉你“线程池满了”,但不知道满的是谁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class BizRunnable implements Runnable {
private final String bizCode; private final String traceId; private final Runnable delegate;
public BizRunnable(String bizCode, String traceId, Runnable delegate) { this.bizCode = bizCode; this.traceId = traceId; this.delegate = delegate; }
@Override public void run() { MDC.put("traceId", traceId); MDC.put("bizCode", bizCode); try { delegate.run(); } finally { MDC.remove("traceId"); MDC.remove("bizCode"); } } }
|
配合提交侧封装:
1 2 3 4 5 6 7 8
| public void submitOrderTask(OrderRequest request) { String traceId = TraceIdHolder.get(); actionExecutor.execute(new BizRunnable( "order:create", traceId, () -> orderService.createOrder(request) )); }
|
这样做的价值很直接:
1、线程池打满时能知道是哪个业务打满了
2、日志里能把 traceId 和业务动作关联起来
3、拒绝告警时有足够上下文做处置
拒绝策略别默认一个模板
很多项目为了“尽量不丢任务”,喜欢统一用 CallerRunsPolicy。
这个策略不是不能用,但前提是你能接受主线程被拖慢。
1 2 3 4 5 6 7 8
| executor.setRejectedExecutionHandler((task, pool) -> { log.error("task rejected, executor={}, activeCount={}, queueSize={}, taskCount={}", pool, pool.getActiveCount(), pool.getQueue().size(), pool.getTaskCount()); throw new RejectedExecutionException("action executor overloaded"); });
|
几种常见策略的适用边界:
1、AbortPolicy:主链路任务,不允许静默丢失,直接失败更容易兜底
2、CallerRunsPolicy:低速流量下还能接受,但要确认不会拖慢入口线程
3、DiscardPolicy:只适合无关紧要的通知型任务
4、DiscardOldestPolicy:适合补偿类或延迟处理类任务,但要配可重放能力
如果一个接口本来就是核心下单链路,再用 CallerRunsPolicy 把 Web 线程拖住,最后很可能不是一个任务慢,而是整个入口一起超时。
线程池状态要能看见
线程池用了几年都没人知道当前活跃线程数、队列积压量,这类系统高峰期往往只能靠重启止血。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Component public class ExecutorMetricsPrinter {
private final ThreadPoolTaskExecutor actionExecutor; private final ThreadPoolTaskExecutor queryExecutor;
public ExecutorMetricsPrinter(ThreadPoolTaskExecutor actionExecutor, ThreadPoolTaskExecutor queryExecutor) { this.actionExecutor = actionExecutor; this.queryExecutor = queryExecutor; }
@Scheduled(fixedDelay = 10000) public void print() { print("action", actionExecutor); print("query", queryExecutor); }
private void print(String name, ThreadPoolTaskExecutor executor) { ThreadPoolExecutor threadPool = executor.getThreadPoolExecutor(); log.info("executor={}, core={}, max={}, active={}, poolSize={}, queueSize={}, completed={}", name, threadPool.getCorePoolSize(), threadPool.getMaximumPoolSize(), threadPool.getActiveCount(), threadPool.getPoolSize(), threadPool.getQueue().size(), threadPool.getCompletedTaskCount()); } }
|
最少要能看到:
1、当前活跃线程数
2、队列积压数量
3、任务总提交量和完成量
4、拒绝次数
没有这些信息,线程池治理基本等于碰运气。
超时之后怎么办
线程池里还有一个经常被忽略的问题:超时了以后,任务到底停没停。
1 2 3 4 5 6 7 8 9 10 11
| public OrderDetail queryOrderDetail(String orderId) { Future<OrderDetail> future = queryExecutor.submit(() -> remoteOrderFacade.query(orderId)); try { return future.get(500, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { future.cancel(true); throw new BizException("query timeout"); } catch (Exception e) { throw new BizException("query failed", e); } }
|
这类写法至少比只抛一个 timeout 异常更明确。
要注意的是:
1、不是所有任务都能安全中断
2、远程调用超时不等于业务真的没执行
3、取消策略要和幂等、补偿一起设计
常见坑
1、方法里临时 new 线程池
这种写法最容易导致线程数失控,也不方便统一监控和回收。
2、所有任务都共用一个队列
表面上方便,实际上一类慢任务就能把其他任务一起堵住。
3、队列配得太大
队列一旦太大,短时间内看起来很稳,实际上只是把超时和积压往后推。
4、拒绝后只打日志,不回传业务结果
主链路任务被拒绝以后,调用方还以为成功受理,这类问题最难排查。
5、线程池监控只看线程数,不看队列和拒绝
真正能反映风险的,往往不是线程跑满,而是队列持续堆积和拒绝频率上升。
收一下
线程池治理最容易被误解成“调参数”,但真正决定线上稳定性的,往往是这几件事:
1、按职责拆池,避免不同任务互相拖垮
2、按任务后果选拒绝策略,不要默认一个模板
3、任务提交时带上业务标识和 TraceId
4、把线程池活跃数、队列积压、拒绝次数暴露出来
5、超时、中断、补偿要一起设计,不能只管抛异常