先把几个核心概念学明白
我自己真正把 Kafka 用顺,是先把下面四个概念和业务含义对上:
1、topic:消息类别,不等于业务模块
2、partition:并行度和顺序边界
3、groupId:消费位点隔离
4、offset:消费进度,不代表业务成功
这几个概念如果只停留在“知道名字”,后面一遇到重复消费、积压、重试就很容易乱。
生产端真正要配的不是 KafkaTemplate,而是发送语义
最小可用的生产端不复杂,但我更关心的是:
1、key 怎么定
2、失败能不能观察到
3、是否需要重试或落失败记录
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Bean public KafkaTemplate<String, String> kafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory()); kafkaTemplate.setProducerListener(new KafkaSendResultHandler()); return kafkaTemplate; }
private ProducerFactory<String, String> producerFactory() { Map<String, Object> configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configs.put(ProducerConfig.ACKS_CONFIG, "all"); return new DefaultKafkaProducerFactory<>(configs); }
|
如果消息需要按订单号、用户号维持局部顺序,key 最好直接用业务主键,而不是随机值。
消费端我更常用的是批量消费 + 手动 ack
业务里只要吞吐一上来,我基本不会停留在单条消费模型。
更常见的是把 listener 容器切成批量模式,再把 ack 改成手动提交:
1 2 3 4 5 6 7 8 9
| @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(1); factory.setBatchListener(true); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; }
|
这里最关键的不是 setBatchListener(true),而是你开始自己控制 offset 提交时机了。
为什么 offset 不能等同于业务成功
很多人刚接 Kafka 时最容易混淆这一点:
1、offset 提交成功
2、数据库落库成功
3、下游调用成功
这三件事不是一回事。
我更认可的写法是把消费逻辑分成“处理成功”和“提交位点”两步:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @KafkaListener(topics = "${kafka.topic}", containerFactory = "batchFactory") public void consume(List<ConsumerRecord<String, String>> msgArray, Acknowledgment ack) { for (int i = 0; i < msgArray.size(); i++) { ConsumerRecord<String, String> record = msgArray.get(i); try { service.handle(record.value()); } catch (Throwable e) { log.error("consume error, topic={}, offset={}", record.topic(), record.offset(), e); ack.nack(i, Duration.ofMillis(1000L)); return; } } ack.acknowledge(); }
|
这一版虽然简单,但已经能避免“业务没处理完,offset 先提交”的典型问题。
批量消费里最实用的一步,其实是先分片再处理
如果一批消息直接在一个线程里从头跑到尾,批量模式的收益通常会打折。
更常见的做法是:
1、先按固定大小分片
2、每个分片交给线程池处理
3、最后统一收口结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| List<List<ConsumerRecord<String, String>>> partitionList = Lists.partition(records, 500); CountDownLatch latch = new CountDownLatch(partitionList.size());
for (List<ConsumerRecord<String, String>> partition : partitionList) { executor.execute(() -> { try { batchHandle(partition); } finally { latch.countDown(); } }); }
latch.await();
|
这样做的前提是单条消息处理之间没有强顺序依赖,否则分片并发反而会埋坑。
发送结果也要进业务观察面
很多生产端代码只关心 send() 有没有抛异常,但真正线上稳定些的做法,是把发送成功和失败都打进日志或指标。
1 2 3 4 5 6 7 8 9 10 11 12
| public class KafkaSendResultHandler implements ProducerListener<String, String> {
@Override public void onSuccess(ProducerRecord<String, String> record, RecordMetadata metadata) { log.info("send success, topic={}, partition={}, offset={}", metadata.topic(), metadata.partition(), metadata.offset()); }
@Override public void onError(ProducerRecord<String, String> record, RecordMetadata metadata, Exception exception) { log.error("send error, topic={}, key={}, value={}", record.topic(), record.key(), record.value(), exception); } }
|
至少要能回答三个问题:
1、消息有没有发出去
2、发到哪个 topic / partition
3、失败后有没有记录可追
我自己踩过的几个坑
1、把 Kafka 当同步 RPC
如果业务必须同步拿结果,Kafka 只会把问题藏到后面。
2、consumer 并发直接照抄
并发数不是越大越好,要一起看 partition 数、数据库吞吐和下游处理能力。
3、批量消费但没有批量落库
消息一批 500 条进来,结果还是一条一条写库,收益会非常有限。
4、没有失败旁路
脏消息、格式错消息、下游超时消息如果全都走同一条重试路径,很容易把消费链路拖死。
如果让我总结 Kafka 真正在业务里要学什么
我现在更看重这几件事,而不是 API 本身:
1、partition 和 key 怎么影响顺序与并发
2、offset 和业务成功怎么对齐
3、批量消费后的分片与收口怎么设计
4、发送失败、消费失败、重试失败分别怎么兜底
真正把这些学明白,Kafka 才算是从“会用”进到“能落业务”。
小结
Kafka 本身不复杂,真正复杂的是怎么把消费位点、业务成功、失败补偿这三件事对齐。
生产端把发送结果纳入观察,消费端把 ack 时机收住,批量处理把分片和收口理顺,这条异步链路才算真正跑稳。