先把几个核心概念学明白

我自己真正把 Kafka 用顺,是先把下面四个概念和业务含义对上:

1、topic:消息类别,不等于业务模块

2、partition:并行度和顺序边界

3、groupId:消费位点隔离

4、offset:消费进度,不代表业务成功

这几个概念如果只停留在“知道名字”,后面一遇到重复消费、积压、重试就很容易乱。

生产端真正要配的不是 KafkaTemplate,而是发送语义

最小可用的生产端不复杂,但我更关心的是:

1、key 怎么定

2、失败能不能观察到

3、是否需要重试或落失败记录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setProducerListener(new KafkaSendResultHandler());
return kafkaTemplate;
}

private ProducerFactory<String, String> producerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.ACKS_CONFIG, "all");
return new DefaultKafkaProducerFactory<>(configs);
}

如果消息需要按订单号、用户号维持局部顺序,key 最好直接用业务主键,而不是随机值。

消费端我更常用的是批量消费 + 手动 ack

业务里只要吞吐一上来,我基本不会停留在单条消费模型。

更常见的是把 listener 容器切成批量模式,再把 ack 改成手动提交:

1
2
3
4
5
6
7
8
9
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}

这里最关键的不是 setBatchListener(true),而是你开始自己控制 offset 提交时机了。

为什么 offset 不能等同于业务成功

很多人刚接 Kafka 时最容易混淆这一点:

1、offset 提交成功

2、数据库落库成功

3、下游调用成功

这三件事不是一回事。

我更认可的写法是把消费逻辑分成“处理成功”和“提交位点”两步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@KafkaListener(topics = "${kafka.topic}", containerFactory = "batchFactory")
public void consume(List<ConsumerRecord<String, String>> msgArray, Acknowledgment ack) {
for (int i = 0; i < msgArray.size(); i++) {
ConsumerRecord<String, String> record = msgArray.get(i);
try {
service.handle(record.value());
} catch (Throwable e) {
log.error("consume error, topic={}, offset={}", record.topic(), record.offset(), e);
ack.nack(i, Duration.ofMillis(1000L));
return;
}
}
ack.acknowledge();
}

这一版虽然简单,但已经能避免“业务没处理完,offset 先提交”的典型问题。

批量消费里最实用的一步,其实是先分片再处理

如果一批消息直接在一个线程里从头跑到尾,批量模式的收益通常会打折。

更常见的做法是:

1、先按固定大小分片

2、每个分片交给线程池处理

3、最后统一收口结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
List<List<ConsumerRecord<String, String>>> partitionList = Lists.partition(records, 500);
CountDownLatch latch = new CountDownLatch(partitionList.size());

for (List<ConsumerRecord<String, String>> partition : partitionList) {
executor.execute(() -> {
try {
batchHandle(partition);
} finally {
latch.countDown();
}
});
}

latch.await();

这样做的前提是单条消息处理之间没有强顺序依赖,否则分片并发反而会埋坑。

发送结果也要进业务观察面

很多生产端代码只关心 send() 有没有抛异常,但真正线上稳定些的做法,是把发送成功和失败都打进日志或指标。

1
2
3
4
5
6
7
8
9
10
11
12
public class KafkaSendResultHandler implements ProducerListener<String, String> {

@Override
public void onSuccess(ProducerRecord<String, String> record, RecordMetadata metadata) {
log.info("send success, topic={}, partition={}, offset={}", metadata.topic(), metadata.partition(), metadata.offset());
}

@Override
public void onError(ProducerRecord<String, String> record, RecordMetadata metadata, Exception exception) {
log.error("send error, topic={}, key={}, value={}", record.topic(), record.key(), record.value(), exception);
}
}

至少要能回答三个问题:

1、消息有没有发出去

2、发到哪个 topic / partition

3、失败后有没有记录可追

我自己踩过的几个坑

1、把 Kafka 当同步 RPC

如果业务必须同步拿结果,Kafka 只会把问题藏到后面。

2、consumer 并发直接照抄

并发数不是越大越好,要一起看 partition 数、数据库吞吐和下游处理能力。

3、批量消费但没有批量落库

消息一批 500 条进来,结果还是一条一条写库,收益会非常有限。

4、没有失败旁路

脏消息、格式错消息、下游超时消息如果全都走同一条重试路径,很容易把消费链路拖死。

如果让我总结 Kafka 真正在业务里要学什么

我现在更看重这几件事,而不是 API 本身:

1、partition 和 key 怎么影响顺序与并发

2、offset 和业务成功怎么对齐

3、批量消费后的分片与收口怎么设计

4、发送失败、消费失败、重试失败分别怎么兜底

真正把这些学明白,Kafka 才算是从“会用”进到“能落业务”。

小结

Kafka 本身不复杂,真正复杂的是怎么把消费位点、业务成功、失败补偿这三件事对齐。

生产端把发送结果纳入观察,消费端把 ack 时机收住,批量处理把分片和收口理顺,这条异步链路才算真正跑稳。