Spring Boot 3.5 下 Kafka 实战记录:从消费者并发模型到序列化选型
目录
📦 本文基于的完整项目源码:https://github.com/meirongdev/shop
在 Shop Platform 的事件驱动架构中,Apache Kafka 3.9(KRaft 模式)是服务间异步通信的核心基础设施。Spring Boot 3.5 结合 Spring Kafka 3.3 提供了开箱即用的 Kafka 集成,但要把 Kafka 用得稳、用得好,还需要理解底层的并发模型、序列化策略、offset 语义以及错误处理机制。
本文从六个维度整理 Spring Boot 3.5 下 Kafka 的一些实践记录。文中的“现状判断”都以 shop 项目的实际实现为准,额外的代码片段会明确视为建议写法,而不是仓库已经落地的事实。
生产者调优:acks、retries 和 linger.ms#
acks 配置#
acks 决定了生产者需要等待多少个 broker 副本确认后才认为发送成功:
| acks 值 | 含义 | 可靠性 | 延迟 |
|---|---|---|---|
acks=0 |
不等待任何确认 | 最低(可能丢消息) | 最低 |
acks=1 |
Leader 确认即可 | 中(Leader 宕机丢消息) | 低 |
acks=all |
所有 ISR 副本确认 | 最高 | 高 |
Kafka 生产者文档里的默认值仍是 acks=1。对于事件驱动架构中承载业务事实的事件(如订单事件、钱包事件),更稳妥的做法仍然是显式写成 acks=all:
spring:
kafka:
producer:
acks: all
参考:Apache Kafka Producer Configs - acks、Confluent: Understanding Kafka acks
retries 和 delivery.timeout.ms#
网络抖动可能导致发送失败,所以通常还是要考虑重试:
spring:
kafka:
producer:
retries: 3
delivery-timeout-ms: 120000
retries 指定最大重试次数,delivery.timeout.ms 是整体投递超时(包含重试时间)。Spring Kafka 默认 retries 为 Integer.MAX_VALUE(无限重试),这在生产环境中需要谨慎——我更倾向于设置有限次数 + 合理的 delivery.timeout.ms。
linger.ms 和 batch.size#
Kafka 生产者会把消息先攒成一批再发送,减少网络往返:
spring:
kafka:
producer:
linger-ms: 5
batch-size: 32768
linger.ms=5:等待 5ms 攒批(默认 0,即立即发送)batch.size=32768:每批最大 32KB(默认 16KB)
对于 Outbox Publisher 这种一次发多条消息的场景,linger.ms 能显著提升吞吐量。对于单条低延迟消息(如实时通知),保持 linger.ms=0 更好。
序列化选型对比#
Spring Kafka 支持多种序列化方式,选择哪种取决于你的场景:
方案对比#
| 序列化方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| StringSerializer | 最简单,手动 JSON | 灵活 | shop 项目当前方案 |
| JsonSerializer | Spring 自动配置 | 类耦合 | 简单项目 |
| Kafka Avro (Confluent) | Schema Registry、兼容检查 | 需要 Schema Registry 基础设施 | 大规模、多团队 |
| Protobuf | 高性能、强类型 | 需要 .proto 文件管理 | 高性能场景 |
shop 项目的选择:String + 手动 JSON#
shop 项目选择 StringSerializer + 手动 ObjectMapper 序列化:
// 生产者:手动序列化为 JSON String
kafkaTemplate.send(topic, objectMapper.writeValueAsString(envelope));
// 消费者:手动反序列化
EventEnvelope<OrderEventData> envelope = objectMapper.readValue(
payload, new TypeReference<>() {});
原因:
- 灵活性:
EventEnvelope<T>是泛型包装,手动序列化能精确控制字段 - 调试友好:String 格式直接在 Kafka 工具中可读
- 零基础设施:不需要 Schema Registry
如果未来团队规模增长、话题数量增多,值得重新评估 Avro + Schema Registry。
参考:Spring Kafka Serialization Documentation、Confluent: Kafka Serialization Deep Dive
消费者并发模型#
concurrency 参数#
@KafkaListener 的 concurrency 参数决定启动多少个消费者线程:
@KafkaListener(
topics = "${shop.loyalty.order-events-topic}",
groupId = "${spring.application.name}",
concurrency = "3")
public void onOrderEvent(String payload) {
// 处理逻辑
}
concurrency 的上限是话题的分区数。如果 order.events.v1 有 6 个分区,concurrency=3 意味着 3 个消费者各消费 2 个分区。
ListenerContainerFactory 定制#
对于需要自定义配置的消费者,通过 ConcurrentKafkaListenerContainerFactory 定制:
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(3);
// 设置消息监听器用于自定义错误处理
factory.setCommonErrorHandler(errorHandler());
return factory;
}
Virtual Threads 与 Kafka 消费者#
Spring Boot 3.5 支持在消费者中使用 Virtual Threads。需要纠正的是:当前 shop 项目已经在多个 Kafka 消费服务中启用了 spring.threads.virtual.enabled: true,包括 notification-service、loyalty-service、wallet-service、search-service 等。
spring:
threads:
virtual:
enabled: true
Virtual Threads 更适合 I/O 等待明显的消费者逻辑(例如写数据库、调下游 HTTP 服务),但它不会自动消除分区数、连接池、下游限流这些约束。也就是说,启用虚拟线程并不等于“listener 吞吐自然翻倍”,幂等、重试和容量规划仍然要照做。
参考:Spring Boot 3.5 Virtual Threads、Spring Kafka Threading Notes
offset 提交策略#
Spring Kafka 的默认行为#
Spring Kafka 默认使用 AckMode.BATCH——handler 成功一批消息后提交 offset。这意味着:
- ✅ 简单,不需要手动管理
- ⚠️ 如果 handler 成功但 offset 提交失败,消息会重放
手动提交模式#
对于需要精确控制 offset 的场景:
spring:
kafka:
consumer:
enable-auto-commit: false
listener:
ack-mode: record # 每处理一条就提交
或使用编程式提交:
@KafkaListener(topics = "${shop.loyalty.order-events-topic}")
public void onOrderEvent(ConsumerRecord<String, String> record,
Acknowledgment ack) {
try {
// 处理逻辑
processEvent(record.value());
// 成功后手动提交
ack.acknowledge();
} catch (Exception e) {
// 不提交,消息会重放
log.error("Failed to process event", e);
}
}
shop 项目的当前选择#
当前仓库里的 listener 大多采用 @KafkaListener + @RetryableTopic + @Transactional。这里需要把语义说得更谨慎一些:@Transactional 能把本地数据库写入约束在同一 Spring 事务里,但它不等于“Kafka offset 和数据库更新形成单一原子提交”。如果服务在数据库提交后、offset 提交前崩溃,消息仍然可能重放,所以幂等校验仍是必选项。
@KafkaListener(topics = "${shop.loyalty.order-events-topic}")
@Transactional
public void onOrderEvent(String payload) {
EventEnvelope<OrderEventData> envelope = objectMapper.readValue(
payload, new TypeReference<>() {});
// DB 操作在本地事务内完成,但消息仍可能在崩溃后重放
handleOrderCompleted(envelope.data());
}
DeadLetterPublishingRecoverer 与 @RetryableTopic#
@RetryableTopic(shop 项目当前方案)#
shop 项目使用 @RetryableTopic 注解实现自动重试和死信:
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 1000, multiplier = 2),
dltTopicSuffix = ".dlq",
autoCreateTopics = "true",
exclude = NonRetryableKafkaConsumerException.class)
@KafkaListener(topics = "${shop.notification.order-events-topic}")
@Transactional
public void onOrderEvent(String payload) {
// 处理逻辑
}
@DltHandler
public void handleDlt(String payload) {
log.error("Event sent to DLT: {}", payload);
}
重试流程:
第 1 次失败 → 等待 1s → 第 2 次 → 等待 2s → 第 3 次 → 等待 4s → 第 4 次
↓ (仍然失败)
发送到 order.events.v1.dlq → @DltHandler 记录日志
DeadLetterPublishingRecoverer(替代方案)#
如果需要更精细的死信控制(如写入死信原因、原始话题信息),可以用 DeadLetterPublishingRecoverer:
@Bean
CommonErrorHandler errorHandler(KafkaTemplate<Object, Object> template) {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
return new DefaultErrorHandler(recoverer,
new FixedBackOff(1000L, 3)); // 3 次重试,间隔 1s
}
DeadLetterPublishingRecoverer 会在死信消息的 header 中附加:
DltOriginalTopic:原始话题DltOriginalPartition:原始分区DltOriginalOffset:原始 offsetDltExceptionFqcn:异常类名DltExceptionMessage:异常消息
这对于后续的死信分析和重放非常有用。
参考:Spring Kafka Retry Topics、Spring Kafka DLT Strategies、DeadLetterPublishingRecoverer Documentation
异常分类:什么该重试、什么该送 DLQ#
// 不可重试 → 直接送 DLQ
throw new NonRetryableKafkaConsumerException("Invalid JSON payload", exception);
// 可重试 → 走重试逻辑
throw new RetryableKafkaConsumerException("Database temporarily unavailable", exception);
典型场景:
| 异常类型 | 分类 | 原因 |
|---|---|---|
| JSON 解析失败 | NonRetryable | 数据格式问题,重试也不会成功 |
| Schema 验证失败 | NonRetryable | 事件结构不合法 |
| 数据库连接超时 | Retryable | 可能自行恢复 |
| 临时网络抖动 | Retryable | 重试可能成功 |
| 业务校验失败 | NonRetryable | 数据本身有问题 |
参考与实现位置#
- Apache Kafka Producer Configs:https://kafka.apache.org/documentation/#producerconfigs
- Spring Kafka Serialization:https://docs.spring.io/spring-kafka/reference/kafka/serdes.html
- Spring Kafka Listener Concurrency:https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/message-listener-container.html
- Spring Kafka Retry Topics:https://docs.spring.io/spring-kafka/reference/retrytopic.html
- Spring Kafka DLT Strategies:https://docs.spring.io/spring-kafka/reference/retrytopic/dlt-strategies.html
- Spring Kafka Dead Letter:https://docs.spring.io/spring-kafka/reference/kafka/annotation-error-handling.html#dead-letters
- Spring Boot 3.5 Virtual Threads:https://docs.spring.io/spring-boot/reference/features/spring-application.html#features.spring-application.virtual-threads
- Confluent Producer Batching:https://docs.confluent.io/platform/current/clients/producer.html#batching
- 仓库实现入口:
services/loyalty-service/src/main/java/dev/meirong/shop/loyalty/listener/OrderEventListener.java、services/notification-service/src/main/java/dev/meirong/shop/notification/listener/、services/promotion-service/src/main/java/dev/meirong/shop/promotion/messaging/
小结#
Spring Boot 3.5 下 Kafka 实战的核心要点:
- 生产者调优:
acks=all保证可靠性,linger.ms攒批提升吞吐,retries有限重试 - 序列化选型:String + 手动 JSON(灵活)、Avro + Schema Registry(大规模)、Protobuf(高性能)
- 消费者并发:
concurrency≤ 分区数,Virtual Threads 适合 I/O 密集型消费者 - offset 提交:
@Transactional只覆盖本地事务;offset 仍可能在崩溃场景下重放,因此幂等不能省 - 死信处理:
@RetryableTopic(简单场景)vsDeadLetterPublishingRecoverer(精细控制) - 异常分类:NonRetryable 直送 DLQ,Retryable 走重试链路