在之前的文章中,我们看了同步请求链路上的 Gateway → BFF → 领域服务。但微服务架构中并不是所有交互都适合同步发生。事件驱动架构让服务之间通过异步消息协作,在不少场景下用最终一致性换取更低耦合和更好的链路弹性。

📦 本文基于的完整项目源码:https://github.com/meirongdev/shop

上一篇:(四)领域服务设计

2026-04 实践更新 当前主线仓库已经把 IdempotencyGuard + Redis Bloom Filter、补偿任务重试、以及关键 Kafka consumer 的幂等保护补齐到 Phase 1 基线;本文整体设计仍然准确,但“待补齐”的幂等链路请以当前 main 分支实现为准。


为什么用事件驱动#

在 Shop Platform 中,异步事件主要用来承接那些不必阻塞用户请求、但又需要可靠传播的业务事实。例如:

flowchart TD Order["用户下单
order-service"] Stock["扣库存
marketplace-service"] Points["发积分
loyalty-service"] Email["发邮件通知
notification-service"] Webhook["推送外部订阅
webhook-service"] Order -->|"同步 需要立即确认"| Stock Order -.->|"异步 可以延迟"| Points Order -.->|"异步"| Email Order -.->|"异步"| Webhook

如果全部走同步 HTTP 调用:

  • 下单接口响应时间 = 最慢下游服务的响应时间
  • 任意下游服务不可用 → 下单失败
  • 服务间耦合紧,新增消费者需要修改生产者

事件驱动的方式:

flowchart LR OS["order-service
写入订单 + outbox
同一事务"] Pub["Outbox Publisher"] Kafka["Kafka
order.events.v1"] L["loyalty-service
累计购物积分"] N["notification-service
发送订单确认邮件"] W["webhook-service
向外部订阅方投递事件"] OS --> Pub Pub --> Kafka Kafka --> L Kafka --> N Kafka --> W

下单接口只需要完成本地事务即可返回,后续处理异步执行。


Kafka 话题全景#

Shop Platform 使用 Apache Kafka 3.9(KRaft 模式,无需 ZooKeeper)作为事件总线。

当前主链路的话题矩阵#

话题 生产者 消费者
wallet.transactions.v1 wallet-service promotion-service(充值奖励)、notification-service(钱包通知)、webhook-service(外部事件投递)
order.events.v1 order-service loyalty-service(购物积分)、notification-service(订单通知)、webhook-service(外部事件投递)
buyer.registered.v1 auth-server loyalty-service(新用户引导任务)、promotion-service(欢迎券)、notification-service(欢迎邮件)
marketplace.product.events.v1 marketplace-service search-service(商品索引)

按当前项目里的约定,话题命名采用 {domain}.{entity}.{version},如 order.events.v1。版本号放在话题名中(而非事件体中),主要是为了让未来的话题级 schema 演进更直观。

注:仓库里还能看到 subscription.events.v1 这类边界 topic,以及 webhook-service 对 user.registered.v1 的监听配置;但它和 auth-server 当前默认生产的 buyer.registered.v1 还没有完全对齐。为了避免把“规划项”写成“现状”,本文只展开已经能从生产者到消费者闭环验证的主链路。


EventEnvelope:统一事件信封#

所有事件使用统一的 EventEnvelope<T> 包装:

public record EventEnvelope<T>(
        String eventId,           // 事件唯一 ID(UUID)
        String source,            // 事件来源(如 "order-service")
        String type,              // 事件类型(如 "ORDER_COMPLETED")
        Instant timestamp,        // 发生时间
        Integer schemaVersion,    // Schema 版本号,默认 1
        String contentType,       // 内容类型,默认 "application/json"
        T data) {                 // 实际业务数据

    public static final int CURRENT_SCHEMA_VERSION = 1;

    public void assertSupportedSchema(int... supportedSchemaVersions) {
        // 验证 schemaVersion 是否在支持列表中
    }
}

设计考量#

@JsonIgnoreProperties(ignoreUnknown = true):所有事件数据都加了这个注解,保证消费者即使收到包含新字段的旧版事件也不会反序列化失败。

Schema VersionEventEnvelope 自带 schemaVersion 字段,消费者通过 assertSupportedSchema(1) 声明自己支持的版本。未来如果事件格式变了(如新增必填字段),可以将 schemaVersion 提升到 2,不支持旧版本的消费者会拒绝处理。

不是强行照搬 CloudEvents:这套 Envelope 的思路和 CloudEvents 很接近——都有统一 metadata、事件类型、来源和数据体——但当前实现是一个更轻量的内部contract,并没有完整套用 CloudEvents 的全部字段和传输绑定。

示例

// 生产者
EventEnvelope<OrderEventData> envelope = new EventEnvelope<>(
        UUID.randomUUID().toString(),
        "order-service",
        "ORDER_COMPLETED",
        Instant.now(),
        new OrderEventData(
                order.getId(),
                order.getOrderNo(),
                order.getBuyerId(),
                order.getSellerId(),
                order.getStatus(),
                order.getTotalAmount(),
                order.getItems().stream()
                    .map(item -> new OrderItemSummary(
                            item.getProductId(), item.getProductName(),
                            item.getQuantity(), item.getLineTotal()))
                    .toList()));

kafkaTemplate.send(topic, objectMapper.writeValueAsString(envelope));
// 消费者
@KafkaListener(topics = "${shop.loyalty.order-events-topic}")
public void onOrderEvent(String payload) {
    EventEnvelope<OrderEventData> envelope = objectMapper.readValue(
            payload, new TypeReference<>() {});
    envelope.assertSupportedSchema(1);
    OrderEventData data = envelope.data();
    // 处理业务逻辑
}

事件数据contract#

事件数据(如 OrderEventData)定义在 shop-contracts-event-common 中,使用 Java record:

public record OrderEventData(
        String orderId, String orderNo, String buyerId,
        String sellerId, String status, BigDecimal totalAmount,
        List<OrderItemSummary> items) {
    public record OrderItemSummary(String productId, String productName,
                                   int quantity, BigDecimal lineTotal) {}
}

Transactional Outbox Pattern#

Outbox Pattern 是本项目事件驱动的核心实现。它解决的是分布式事务里一个常见问题:如何在尽量保证业务数据和事件消息一起写入成功的前提下,不引入分布式事务协调器?

方案对比#

方案 原子性 复杂度 可靠性
先写 DB 再发 Kafka ❌ DB 成功但 Kafka 失败丢消息
先发 Kafka 再写 DB ❌ Kafka 成功但 DB 失败重复消息
分布式事务(2PC) 中(协调器单点)
Transactional Outbox

实现原理#

sequenceDiagram participant Biz as 业务代码 participant DB as 数据库事务 participant Outbox as outbox 表 participant Pub as Publisher participant Kafka as Kafka Biz->>DB: 1a. 写入业务数据
(INSERT INTO orders) Biz->>Outbox: 1b. 同事务写入 outbox 表
(INSERT INTO wallet_outbox_event) Note over Biz,Outbox: 同一本地事务,原子提交 Pub->>Outbox: 2a. 定时轮询 status='PENDING' Pub->>Kafka: 2b. 发送到 Kafka 话题 Pub->>Outbox: 2c. 标记 status='PUBLISHED'

因为业务数据和 outbox 记录在同一个本地事务中,要么都成功要么都失败,保证了原子性。

四种实现变体#

Shop Platform 中 4 个服务实现了 Outbox Pattern,各有细微差异:

Wallet Service(标准变体):

@Entity
@Table(name = "wallet_outbox_event")
public class WalletOutboxEventEntity {
    @Id
    private String id;  // UUID
    private String aggregateId;  // wallet_transaction_id
    private String topic;
    private String eventType;
    private String payload;  // EventEnvelope JSON
    private boolean published;
}

@Scheduled(fixedDelayString = "${shop.wallet.outbox-publish-delay-ms:5000}")
@Transactional
public void publishPendingEvents() {
    List<WalletOutboxEventEntity> events = repository
            .findTop20ByPublishedFalseOrderByCreatedAtAsc();
    for (WalletOutboxEventEntity event : events) {
        kafkaTemplate.send(event.getTopic(), event.getPayload());
        event.markPublished();
    }
    repository.saveAll(events);
}

Marketplace Service(同步发送变体):

// 使用 .join() 同步等待 Kafka 确认
kafkaTemplate.send(topic, aggregateId, payload).join();

带 key 发送保证同一个 aggregate 的事件有序。

Order Service(异步变体):

// top20、无 key,发送后批量标记 published
kafkaTemplate.send(topic, payload);

Activity Service(按 game 分区变体):

// 查询 status='PENDING',按 gameId 作为 key,逐条 save
kafkaTemplate.send(event.getTopic(), event.getGameId(), event.getPayload());

为什么不用 Debezium CDC#

Debezium 通过读取 MySQL binlog 自动提取 outbox 数据,不需要应用层写 publisher。这是一种成熟方案,但也有代价:

  • 需要额外部署 Debezium Connect 组件
  • binlog 格式依赖 MySQL 配置,运维复杂度增加
  • 应用层对 outbox 发送的控制力减弱(如暂停、重试、监控)

本项目当前选择应用层 Publisher,更多是出于 POC 阶段的权衡:

  • 不需要额外基础设施
  • 发送逻辑完全可控(batch size、key 策略、错误处理)
  • 代码量不大(每个服务约 30 行)

如果事件量继续增长,或者团队已经具备 CDC 运维经验,那么 Debezium Outbox 依然是很值得重新评估的选项。


IdempotencyGuard:消费者幂等保障#

Kafka 的 at-least-once 语义意味着消息至少投递一次,在网络抖动或消费者重启时可能出现重复投递。消费者通常需要认真处理幂等问题。

接口定义#

public interface IdempotencyGuard {
    <T> T executeOnce(String key, Supplier<T> action, Supplier<T> fallback);
}

两种实现#

方案一:DB-only(当前默认实现)

public <T> T executeOnce(String key, Supplier<T> action, Supplier<T> fallback) {
    validateKey(key);
    if (repository.existsByKey(key)) {
        return fallback.get();  // 已处理过,走 fallback
    }
    try {
        return action.get();    // 执行业务
    } catch (DataIntegrityViolationException exception) {
        return fallback.get();  // 并发重复导致唯一键冲突
    }
}

每个服务有自己的幂等表(如 wallet_idempotency_key):

public interface WalletIdempotencyKeyRepository
        extends JpaRepository<WalletIdempotencyKeyEntity, String>,
                IdempotencyRepository {
    boolean existsByIdempotencyKey(String idempotencyKey);
    @Override default boolean existsByKey(String key) {
        return existsByIdempotencyKey(key);
    }
}

方案二:Redis + Bloom Filter(高性能)

public <T> T executeOnce(String key, Supplier<T> action, Supplier<T> fallback) {
    // Bloom Filter 快速检查
    if (!bloomFilter.contains(key)) {
        missCounter.increment();
        return executeAction(key, action, fallback);
    }

    // Bloom Filter 说"可能存在"→ 查 DB 确认
    if (repository.existsByKey(key)) {
        duplicateHitCounter.increment();
        return fallback.get();
    }

    // 误判(false positive),实际是新 key
    falsePositiveHitCounter.increment();
    return executeAction(key, action, fallback);
}

两层检查的主要作用是:

  1. Bloom Filter 说"不存在" → 在 Bloom Filter 语义下可以认为不存在,直接执行(快速路径)
  2. Bloom Filter 说"可能存在" → 查 DB 确认(因为 Bloom Filter 有误判率)
  3. Redis 不可用 → 降级到 DB-only 路径

配置:

shop:
  idempotency:
    bloom-filter:
      enabled: true
      redis-key: "shop:idempotency:loyalty"
      expected-insertions: 1000000   # 预期插入量
      false-probability: 0.001       # 误判率 0.1%

消费者中的使用#

@KafkaListener(topics = "${shop.loyalty.order-events-topic}")
@Transactional
public void onOrderEvent(String payload) {
    EventEnvelope<OrderEventData> envelope = objectMapper.readValue(
            payload, new TypeReference<>() {});
    OrderEventData data = envelope.data();

    String idempotencyKey = "LOYALTY_ORDER_COMPLETED:" + data.orderId();
    idempotencyGuard.executeOnce(
            idempotencyKey,
            () -> {
                handleOrderCompleted(data);
                // 在同一事务中保存幂等键
                idempotencyKeyRepository.save(
                        new LoyaltyIdempotencyKeyEntity(idempotencyKey));
                return null;
            },
            () -> {
                log.info("Loyalty order event already processed: orderId={}, skipping",
                        data.orderId());
                return null;
            });
}

在当前仓库里,ArchUnit 规则 ARCH-06 要求:调用 IdempotencyGuard.executeOnce() 的方法需要标注 @Transactional,保证幂等键写入和业务操作在同一事务中。

@IdempotencyExempt:天然幂等的声明#

有些消费者可以通过自身约束获得天然幂等,不一定需要 IdempotencyGuard。比如 notification-service 通过 notification_log 表的 (eventId, channel) 唯一约束去重:

@Component
@IdempotencyExempt(reason = "notification_log deduplicates by eventId + channel")
public class OrderEventListener {
    @KafkaListener(topics = "${shop.notification.order-events-topic}")
    public void onOrderEvent(String payload) {
        // 直接处理,notification_log 唯一约束承担去重
    }
}

ArchUnit 规则 ARCH-05 强制所有 @KafkaListener 所在类要么注入 IdempotencyGuard,要么标注 @IdempotencyExempt


重试与 DLQ 策略#

Spring Kafka 的 @RetryableTopic 提供了开箱即用的重试机制:

@RetryableTopic(
        attempts = "4",                              // 最多尝试 4 次
        backoff = @Backoff(delay = 1000, multiplier = 2), // 指数退避
        dltTopicSuffix = ".dlq",                     // 死信话题后缀
        autoCreateTopics = "true",                   // 自动创建 DLQ topic
        exclude = NonRetryableKafkaConsumerException.class // 不可重试的异常
)
@KafkaListener(topics = "${shop.notification.order-events-topic}")
@Transactional
public void onOrderEvent(String payload) {
    // 处理逻辑
}

@DltHandler
public void handleDlt(String payload) {
    log.error("Event sent to DLT: {}", payload);
}

重试流程#

第 1 次执行失败 → 等待 1s → 第 2 次 → 等待 2s → 第 3 次 → 等待 4s → 第 4 次
    ↓ (仍然失败)
发送到 order.events.v1.dlq → @DltHandler 记录日志

异常分类#

// 不可重试 → 直接送 DLQ
throw new NonRetryableKafkaConsumerException("Invalid JSON payload", exception);

// 可重试 → 走重试逻辑
throw new RetryableKafkaConsumerException("Database temporarily unavailable", exception);

典型场景:

  • JSON 解析失败、schema 验证失败NonRetryableKafkaConsumerException(重试也不会成功)
  • 数据库连接超时、临时网络抖动RetryableKafkaConsumerException(可能自行恢复)

这里也有两个工程边界值得记一下:第一,autoCreateTopics = true 对本地开发和测试很方便,但生产环境通常还是更适合由 IaC 或平台侧预建 topic;第二,非阻塞重试和 DLT 会影响同一 consumer group 里的处理时序,因此幂等和事件顺序语义都要单独设计,不能默认“Kafka 会帮你兜底”。

无手动 Acknowledgment#

所有消费者都使用 Spring Kafka 默认的“handler 成功才提交 offset”模型。@Transactional 可以把本地数据库事务包进来:如果业务逻辑抛错,消息会重投;但这并不等于 Kafka offset 与数据库更新组成了跨资源原子事务,所以消费者端的幂等仍然需要认真对待,而不是顺手补一层就算了。


Buyer BFF 的同步 Saga#

虽然事件驱动处理了异步协作,但结账流程仍然需要同步 Saga,因为用户需要即时知道支付结果:

flowchart TD subgraph 正向["结账 Saga(同步,一个 HTTP 请求内完成)"] direction TB S1["① marketplace-service
预扣库存"] --> S2["② order-service
创建订单"] S2 --> S3["③ wallet-service
扣款"] S3 --> S4["④ loyalty-service
扣积分(可选)"] S4 --> S5["⑤ buyer-bff
清空购物车"] end subgraph 逆向["失败补偿(逆向)"] direction TB F1["⑤' marketplace-service
恢复库存"] --> F2["④' wallet-service
退款"] F2 --> F3["③' loyalty-service
退回积分"] end Fail{"任何一步失败"} -.-> F1

这和 Outbox 异步事件是互补关系:

场景 模式 原因
结账 → 返回支付结果 同步 Saga 用户等待即时反馈
订单完成 → 发积分 Outbox + Kafka 可以延迟,不需要阻塞用户
订单完成 → 发邮件 Outbox + Kafka 异步通知,不影响主流程
库存扣减失败 → 回滚 同步补偿 需要尽快回滚,用户才能重新下单

典型事件流示例#

与其把所有场景揉成一条"万能链路",不如直接看当前仓库里已经闭环的四条主路径:

flowchart TD subgraph 注册["1. 用户注册"] direction TB Auth["auth-server"] -->|"buyer.registered.v1"| R1["loyalty-service
初始化引导/奖励任务"] Auth -->|"buyer.registered.v1"| R2["promotion-service
发欢迎券"] Auth -->|"buyer.registered.v1"| R3["notification-service
发送欢迎邮件"] end subgraph 订单["2. 订单完成"] direction TB Order["order-service"] -->|"order.events.v1"| O1["loyalty-service
发放购物积分"] Order -->|"order.events.v1"| O2["notification-service
发送订单确认邮件"] Order -->|"order.events.v1"| O3["webhook-service
向外部订阅方投递"] end subgraph 商品["3. 商品变更"] direction TB MP["marketplace-service"] -->|"marketplace.product.events.v1"| S1["search-service
更新 Meilisearch 索引"] end subgraph 充值["4. 钱包充值完成"] direction TB Wallet["wallet-service"] -->|"wallet.transactions.v1"| W1["promotion-service
生成充值奖励"] Wallet -->|"wallet.transactions.v1"| W2["notification-service
发送充值完成通知"] Wallet -->|"wallet.transactions.v1"| W3["webhook-service
向外部订阅方投递"] end

这样写虽然没有“一条龙故事”那么炫,但更贴近源码里当前已经落地的事实:哪些事件真的发了、哪些消费者真的接了、哪些只是预留配置,一眼能分清。


参考与实现位置#

  • Spring Kafka Retry Topics:https://docs.spring.io/spring-kafka/reference/retrytopic.html
  • Spring Kafka DLT Strategies:https://docs.spring.io/spring-kafka/reference/retrytopic/dlt-strategies.html
  • CloudEvents 规范:https://cloudevents.io/
  • Debezium Outbox Pattern:https://debezium.io/documentation/reference/stable/transformations/outbox-event-router.html
  • 仓库实现入口:shared/shop-contracts/shop-contracts-event-common/src/main/java/dev/meirong/shop/contracts/event/EventEnvelope.javaservices/order-service/src/main/java/dev/meirong/shop/order/service/OrderOutboxPublisher.javaservices/wallet-service/src/main/java/dev/meirong/shop/wallet/service/WalletOutboxPublisher.javaservices/marketplace-service/src/main/java/dev/meirong/shop/marketplace/service/MarketplaceOutboxPublisher.javaservices/activity-service/src/main/java/dev/meirong/shop/activity/service/ActivityOutboxPublisher.javashared/shop-common/shop-common-core/src/main/java/dev/meirong/shop/common/idempotency/services/loyalty-service/src/main/java/dev/meirong/shop/loyalty/listener/OrderEventListener.javaservices/promotion-service/src/main/java/dev/meirong/shop/promotion/messaging/WalletRewardListener.javaservices/search-service/src/main/java/dev/meirong/shop/search/consumer/ProductEventConsumer.java

小结#

Shop Platform 的事件驱动实现里,我更想记下这些点:

  • EventEnvelope 统一模型:eventId、source、type、schemaVersion 标准化,@JsonIgnoreProperties 保证向后兼容
  • Transactional Outbox:4 种实现变体,同一事务保证业务数据 + 事件原子性,定时轮询发布
  • IdempotencyGuard 双层防护:DB-only 当前默认方案 + Redis Bloom Filter 高性能方案,失败时可降级
  • @RetryableTopic 重试 + DLQ:指数退避、异常分类、死信处理
  • 同步 Saga + 异步事件互补:结账等用户等待的用同步,积分/通知等用异步

下一篇将深入 插件化活动引擎:GamePlugin SPI 接口如何支持 4 种游戏插件(砸金蛋、抢红包、集卡、虚拟养成),Redis Lua 脚本如何实现原子抢红包,以及 Anti-Cheat 反作弊机制。

项目仓库:github.com/meirongdev/shop(私有,仅供参考)