真正把消费链路拖死的,往往不是吞吐不够,而是失败模型不清楚

线上看消息积压,我现在第一反应通常不是“多加几个消费者”,而是先问:

1、是不是固定 offset 反复失败

2、ack 时机是不是和业务成功没对齐

3、失败消息有没有被正确旁路

4、补偿重放是不是又把同样的问题打回来

如果这四件事没理清,直接加线程数通常只是把问题放大。

手动 ACK 的意义,是把位点提交和业务成功对齐

批量消费里我更常见的写法是手动 ack,而不是自动提交:

1
2
3
4
5
6
7
8
9
10
11
12
13
@KafkaListener(topics = "xxx", containerFactory = "batchFactory")
public void receive(List<ConsumerRecord<String, String>> msgArray, Acknowledgment ack) {
for (int i = 0; i < msgArray.size(); i++) {
ConsumerRecord<String, String> record = msgArray.get(i);
try {
process(record);
} catch (Throwable e) {
ack.nack(i, Duration.ofMillis(1000L));
return;
}
}
ack.acknowledge();
}

这段代码里真正关键的是语义:

1、当前批次全部处理成功后再提交位点

2、失败时只从失败位置开始回退

3、后续消息不会被误判成已成功消费

为什么固定 offset 卡住时,先别急着调并发

只要一条脏消息反复失败,消费者就可能一直卡在同一个 offset。

这时你会看到的表象往往是:

1、消费者线程明明活着

2、topic lag 一直不掉

3、日志反复打印同一条消息

4、后面的正常消息一直过不去

这种场景再加并发通常没有意义,因为根因不是慢,而是“这条消息过不去”。

nack 之前,最好先把失败分类做出来

我更喜欢把消费失败至少分成三类:

1、临时性失败:比如下游超时、数据库抖动,可以重试

2、数据错误:比如字段缺失、格式异常,应该旁路

3、业务失败:比如规则未通过、状态不允许,应该记录失败原因

如果所有异常都统一 nack,链路最后一定会被少量脏消息拖住。

我更愿意把消费日志打成可以定位 offset 的格式

如果只打一条 consume error,排查价值其实很有限。

更有用的日志字段通常是:

1
2
3
4
5
6
7
8
LoggerUtil.doTrace(log,
"MQ_CONSUMER",
success,
record.topic(),
groupId,
record.offset(),
record.key(),
record.value());

至少要把下面这些信息带出来:

1、topic

2、groupId

3、offset

4、消息主键

5、是否成功

6、单次处理耗时

没有这些字段时,很多积压排查最后都只能靠猜。

RabbitMQ 和 Kafka 在消费模型上,真正差异没那么大

虽然协议和客户端都不一样,但对业务代码来说,最核心的问题始终是这三个:

1、什么时候确认消费成功

2、失败后重试还是旁路

3、重复消费如何识别

比如 RabbitMQ 的手动 ack,其实也是同样的问题:

1
2
3
4
5
6
try {
processOrderEvent(message);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, false);
}

无论是 basicAck/basicNack 还是 ack/nack,本质都是在控制“这条消息现在算不算处理成功”。

补偿重放如果没有边界,最后会变成第二条主链路

很多系统在正常消费失败后,会把消息内容落补偿表,再由定时任务或重放 topic 去补。

这一步如果没有边界,问题会从“偶发失败”演变成“失败消息来回回流”。

我更倾向的补偿模型是:

1、正常 topic 负责主吞吐

2、补偿链路只处理失败数据

3、补偿记录有最大次数和终态

4、重放前仍然走幂等校验

5、补偿成功和失败都更新明确状态

这样补偿链路才不会和主链路互相打架。

如果积压已经发生,我一般怎么排

我自己的顺序通常是:

1、先看是不是卡在固定 offset

2、再看是脏消息还是下游慢

3、再看 ack 是否提交过早或过晚

4、再看补偿任务是不是在反复回流失败消息

5、最后才调整并发、批量大小、线程池参数

因为积压问题很多时候不是消费速度不够,而是消费模型本身就不稳。

小结

消息积压的根因,经常不在“消息太多”,而在“失败消息没有被正确分流”。

手动 ack 负责把位点和业务成功对齐,失败分类负责决定重试还是旁路,补偿重放负责兜住少量失败数据。把这三层收清楚,消费链路才不会越跑越乱。