📦 本文基于的完整项目源码:https://github.com/meirongdev/shop

在 Shop Platform 的事件驱动架构中,Apache Kafka 3.9(KRaft 模式)是服务间异步通信的核心基础设施。Spring Boot 3.5 结合 Spring Kafka 3.3 提供了开箱即用的 Kafka 集成,但要把 Kafka 用得稳、用得好,还需要理解底层的并发模型、序列化策略、offset 语义以及错误处理机制。

本文从六个维度整理 Spring Boot 3.5 下 Kafka 的一些实践记录。文中的“现状判断”都以 shop 项目的实际实现为准,额外的代码片段会明确视为建议写法,而不是仓库已经落地的事实。


生产者调优:acks、retries 和 linger.ms#

acks 配置#

acks 决定了生产者需要等待多少个 broker 副本确认后才认为发送成功:

acks 值 含义 可靠性 延迟
acks=0 不等待任何确认 最低(可能丢消息) 最低
acks=1 Leader 确认即可 中(Leader 宕机丢消息)
acks=all 所有 ISR 副本确认 最高

Kafka 生产者文档里的默认值仍是 acks=1。对于事件驱动架构中承载业务事实的事件(如订单事件、钱包事件),更稳妥的做法仍然是显式写成 acks=all

spring:
  kafka:
    producer:
      acks: all

参考:Apache Kafka Producer Configs - acksConfluent: Understanding Kafka acks

retries 和 delivery.timeout.ms#

网络抖动可能导致发送失败,所以通常还是要考虑重试:

spring:
  kafka:
    producer:
      retries: 3
      delivery-timeout-ms: 120000

retries 指定最大重试次数,delivery.timeout.ms 是整体投递超时(包含重试时间)。Spring Kafka 默认 retriesInteger.MAX_VALUE(无限重试),这在生产环境中需要谨慎——我更倾向于设置有限次数 + 合理的 delivery.timeout.ms

参考:Apache Kafka Producer Configs - retries

linger.ms 和 batch.size#

Kafka 生产者会把消息先攒成一批再发送,减少网络往返:

spring:
  kafka:
    producer:
      linger-ms: 5
      batch-size: 32768
  • linger.ms=5:等待 5ms 攒批(默认 0,即立即发送)
  • batch.size=32768:每批最大 32KB(默认 16KB)

对于 Outbox Publisher 这种一次发多条消息的场景,linger.ms 能显著提升吞吐量。对于单条低延迟消息(如实时通知),保持 linger.ms=0 更好。

参考:Confluent: Kafka Producer Batching


序列化选型对比#

Spring Kafka 支持多种序列化方式,选择哪种取决于你的场景:

方案对比#

序列化方式 优点 缺点 适用场景
StringSerializer 最简单,手动 JSON 灵活 shop 项目当前方案
JsonSerializer Spring 自动配置 类耦合 简单项目
Kafka Avro (Confluent) Schema Registry、兼容检查 需要 Schema Registry 基础设施 大规模、多团队
Protobuf 高性能、强类型 需要 .proto 文件管理 高性能场景

shop 项目的选择:String + 手动 JSON#

shop 项目选择 StringSerializer + 手动 ObjectMapper 序列化:

// 生产者:手动序列化为 JSON String
kafkaTemplate.send(topic, objectMapper.writeValueAsString(envelope));

// 消费者:手动反序列化
EventEnvelope<OrderEventData> envelope = objectMapper.readValue(
        payload, new TypeReference<>() {});

原因:

  • 灵活性EventEnvelope<T> 是泛型包装,手动序列化能精确控制字段
  • 调试友好:String 格式直接在 Kafka 工具中可读
  • 零基础设施:不需要 Schema Registry

如果未来团队规模增长、话题数量增多,值得重新评估 Avro + Schema Registry。

参考:Spring Kafka Serialization DocumentationConfluent: Kafka Serialization Deep Dive


消费者并发模型#

concurrency 参数#

@KafkaListenerconcurrency 参数决定启动多少个消费者线程:

@KafkaListener(
        topics = "${shop.loyalty.order-events-topic}",
        groupId = "${spring.application.name}",
        concurrency = "3")
public void onOrderEvent(String payload) {
    // 处理逻辑
}

concurrency 的上限是话题的分区数。如果 order.events.v1 有 6 个分区,concurrency=3 意味着 3 个消费者各消费 2 个分区。

参考:Spring Kafka Listener Concurrency

ListenerContainerFactory 定制#

对于需要自定义配置的消费者,通过 ConcurrentKafkaListenerContainerFactory 定制:

@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
        ConsumerFactory<String, String> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setConcurrency(3);
    // 设置消息监听器用于自定义错误处理
    factory.setCommonErrorHandler(errorHandler());
    return factory;
}

Virtual Threads 与 Kafka 消费者#

Spring Boot 3.5 支持在消费者中使用 Virtual Threads。需要纠正的是:当前 shop 项目已经在多个 Kafka 消费服务中启用了 spring.threads.virtual.enabled: true,包括 notification-serviceloyalty-servicewallet-servicesearch-service 等。

spring:
  threads:
    virtual:
      enabled: true

Virtual Threads 更适合 I/O 等待明显的消费者逻辑(例如写数据库、调下游 HTTP 服务),但它不会自动消除分区数、连接池、下游限流这些约束。也就是说,启用虚拟线程并不等于“listener 吞吐自然翻倍”,幂等、重试和容量规划仍然要照做。

参考:Spring Boot 3.5 Virtual ThreadsSpring Kafka Threading Notes


offset 提交策略#

Spring Kafka 的默认行为#

Spring Kafka 默认使用 AckMode.BATCH——handler 成功一批消息后提交 offset。这意味着:

  • ✅ 简单,不需要手动管理
  • ⚠️ 如果 handler 成功但 offset 提交失败,消息会重放

手动提交模式#

对于需要精确控制 offset 的场景:

spring:
  kafka:
    consumer:
      enable-auto-commit: false
    listener:
      ack-mode: record  # 每处理一条就提交

或使用编程式提交:

@KafkaListener(topics = "${shop.loyalty.order-events-topic}")
public void onOrderEvent(ConsumerRecord<String, String> record,
                          Acknowledgment ack) {
    try {
        // 处理逻辑
        processEvent(record.value());
        // 成功后手动提交
        ack.acknowledge();
    } catch (Exception e) {
        // 不提交,消息会重放
        log.error("Failed to process event", e);
    }
}

shop 项目的当前选择#

当前仓库里的 listener 大多采用 @KafkaListener + @RetryableTopic + @Transactional。这里需要把语义说得更谨慎一些:@Transactional 能把本地数据库写入约束在同一 Spring 事务里,但它不等于“Kafka offset 和数据库更新形成单一原子提交”。如果服务在数据库提交后、offset 提交前崩溃,消息仍然可能重放,所以幂等校验仍是必选项。

@KafkaListener(topics = "${shop.loyalty.order-events-topic}")
@Transactional
public void onOrderEvent(String payload) {
    EventEnvelope<OrderEventData> envelope = objectMapper.readValue(
            payload, new TypeReference<>() {});
    // DB 操作在本地事务内完成,但消息仍可能在崩溃后重放
    handleOrderCompleted(envelope.data());
}

参考:Spring Kafka AcknowledgmentSpring Kafka Transactions


DeadLetterPublishingRecoverer 与 @RetryableTopic#

@RetryableTopic(shop 项目当前方案)#

shop 项目使用 @RetryableTopic 注解实现自动重试和死信:

@RetryableTopic(
        attempts = "4",
        backoff = @Backoff(delay = 1000, multiplier = 2),
        dltTopicSuffix = ".dlq",
        autoCreateTopics = "true",
        exclude = NonRetryableKafkaConsumerException.class)
@KafkaListener(topics = "${shop.notification.order-events-topic}")
@Transactional
public void onOrderEvent(String payload) {
    // 处理逻辑
}

@DltHandler
public void handleDlt(String payload) {
    log.error("Event sent to DLT: {}", payload);
}

重试流程:

第 1 次失败 → 等待 1s → 第 2 次 → 等待 2s → 第 3 次 → 等待 4s → 第 4 次
    ↓ (仍然失败)
发送到 order.events.v1.dlq → @DltHandler 记录日志

DeadLetterPublishingRecoverer(替代方案)#

如果需要更精细的死信控制(如写入死信原因、原始话题信息),可以用 DeadLetterPublishingRecoverer

@Bean
CommonErrorHandler errorHandler(KafkaTemplate<Object, Object> template) {
    DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
    return new DefaultErrorHandler(recoverer,
            new FixedBackOff(1000L, 3));  // 3 次重试,间隔 1s
}

DeadLetterPublishingRecoverer 会在死信消息的 header 中附加:

  • DltOriginalTopic:原始话题
  • DltOriginalPartition:原始分区
  • DltOriginalOffset:原始 offset
  • DltExceptionFqcn:异常类名
  • DltExceptionMessage:异常消息

这对于后续的死信分析和重放非常有用。

参考:Spring Kafka Retry TopicsSpring Kafka DLT StrategiesDeadLetterPublishingRecoverer Documentation


异常分类:什么该重试、什么该送 DLQ#

// 不可重试 → 直接送 DLQ
throw new NonRetryableKafkaConsumerException("Invalid JSON payload", exception);

// 可重试 → 走重试逻辑
throw new RetryableKafkaConsumerException("Database temporarily unavailable", exception);

典型场景:

异常类型 分类 原因
JSON 解析失败 NonRetryable 数据格式问题,重试也不会成功
Schema 验证失败 NonRetryable 事件结构不合法
数据库连接超时 Retryable 可能自行恢复
临时网络抖动 Retryable 重试可能成功
业务校验失败 NonRetryable 数据本身有问题

参考:Spring Kafka Exception Handling


参考与实现位置#


小结#

Spring Boot 3.5 下 Kafka 实战的核心要点:

  • 生产者调优acks=all 保证可靠性,linger.ms 攒批提升吞吐,retries 有限重试
  • 序列化选型:String + 手动 JSON(灵活)、Avro + Schema Registry(大规模)、Protobuf(高性能)
  • 消费者并发concurrency ≤ 分区数,Virtual Threads 适合 I/O 密集型消费者
  • offset 提交@Transactional 只覆盖本地事务;offset 仍可能在崩溃场景下重放,因此幂等不能省
  • 死信处理@RetryableTopic(简单场景)vs DeadLetterPublishingRecoverer(精细控制)
  • 异常分类:NonRetryable 直送 DLQ,Retryable 走重试链路

项目仓库:github.com/meirongdev/shop