Spring Boot 3.5 + Java 25 + Cloud Native 系列(五):事件驱动架构
目录
在之前的文章中,我们看了同步请求链路上的 Gateway → BFF → 领域服务。但微服务架构中并不是所有交互都适合同步发生。事件驱动架构让服务之间通过异步消息协作,在不少场景下用最终一致性换取更低耦合和更好的链路弹性。
📦 本文基于的完整项目源码:https://github.com/meirongdev/shop
上一篇:(四)领域服务设计
2026-04 实践更新 当前主线仓库已经把
IdempotencyGuard + Redis Bloom Filter、补偿任务重试、以及关键 Kafka consumer 的幂等保护补齐到 Phase 1 基线;本文整体设计仍然准确,但“待补齐”的幂等链路请以当前 main 分支实现为准。
为什么用事件驱动#
在 Shop Platform 中,异步事件主要用来承接那些不必阻塞用户请求、但又需要可靠传播的业务事实。例如:
order-service"] Stock["扣库存
marketplace-service"] Points["发积分
loyalty-service"] Email["发邮件通知
notification-service"] Webhook["推送外部订阅
webhook-service"] Order -->|"同步 需要立即确认"| Stock Order -.->|"异步 可以延迟"| Points Order -.->|"异步"| Email Order -.->|"异步"| Webhook
如果全部走同步 HTTP 调用:
- 下单接口响应时间 = 最慢下游服务的响应时间
- 任意下游服务不可用 → 下单失败
- 服务间耦合紧,新增消费者需要修改生产者
事件驱动的方式:
写入订单 + outbox
同一事务"] Pub["Outbox Publisher"] Kafka["Kafka
order.events.v1"] L["loyalty-service
累计购物积分"] N["notification-service
发送订单确认邮件"] W["webhook-service
向外部订阅方投递事件"] OS --> Pub Pub --> Kafka Kafka --> L Kafka --> N Kafka --> W
下单接口只需要完成本地事务即可返回,后续处理异步执行。
Kafka 话题全景#
Shop Platform 使用 Apache Kafka 3.9(KRaft 模式,无需 ZooKeeper)作为事件总线。
当前主链路的话题矩阵#
| 话题 | 生产者 | 消费者 |
|---|---|---|
wallet.transactions.v1 |
wallet-service | promotion-service(充值奖励)、notification-service(钱包通知)、webhook-service(外部事件投递) |
order.events.v1 |
order-service | loyalty-service(购物积分)、notification-service(订单通知)、webhook-service(外部事件投递) |
buyer.registered.v1 |
auth-server | loyalty-service(新用户引导任务)、promotion-service(欢迎券)、notification-service(欢迎邮件) |
marketplace.product.events.v1 |
marketplace-service | search-service(商品索引) |
按当前项目里的约定,话题命名采用 {domain}.{entity}.{version},如 order.events.v1。版本号放在话题名中(而非事件体中),主要是为了让未来的话题级 schema 演进更直观。
注:仓库里还能看到
subscription.events.v1这类边界 topic,以及 webhook-service 对user.registered.v1的监听配置;但它和 auth-server 当前默认生产的buyer.registered.v1还没有完全对齐。为了避免把“规划项”写成“现状”,本文只展开已经能从生产者到消费者闭环验证的主链路。
EventEnvelope:统一事件信封#
所有事件使用统一的 EventEnvelope<T> 包装:
public record EventEnvelope<T>(
String eventId, // 事件唯一 ID(UUID)
String source, // 事件来源(如 "order-service")
String type, // 事件类型(如 "ORDER_COMPLETED")
Instant timestamp, // 发生时间
Integer schemaVersion, // Schema 版本号,默认 1
String contentType, // 内容类型,默认 "application/json"
T data) { // 实际业务数据
public static final int CURRENT_SCHEMA_VERSION = 1;
public void assertSupportedSchema(int... supportedSchemaVersions) {
// 验证 schemaVersion 是否在支持列表中
}
}
设计考量#
@JsonIgnoreProperties(ignoreUnknown = true):所有事件数据都加了这个注解,保证消费者即使收到包含新字段的旧版事件也不会反序列化失败。
Schema Version:EventEnvelope 自带 schemaVersion 字段,消费者通过 assertSupportedSchema(1) 声明自己支持的版本。未来如果事件格式变了(如新增必填字段),可以将 schemaVersion 提升到 2,不支持旧版本的消费者会拒绝处理。
不是强行照搬 CloudEvents:这套 Envelope 的思路和 CloudEvents 很接近——都有统一 metadata、事件类型、来源和数据体——但当前实现是一个更轻量的内部contract,并没有完整套用 CloudEvents 的全部字段和传输绑定。
示例:
// 生产者
EventEnvelope<OrderEventData> envelope = new EventEnvelope<>(
UUID.randomUUID().toString(),
"order-service",
"ORDER_COMPLETED",
Instant.now(),
new OrderEventData(
order.getId(),
order.getOrderNo(),
order.getBuyerId(),
order.getSellerId(),
order.getStatus(),
order.getTotalAmount(),
order.getItems().stream()
.map(item -> new OrderItemSummary(
item.getProductId(), item.getProductName(),
item.getQuantity(), item.getLineTotal()))
.toList()));
kafkaTemplate.send(topic, objectMapper.writeValueAsString(envelope));
// 消费者
@KafkaListener(topics = "${shop.loyalty.order-events-topic}")
public void onOrderEvent(String payload) {
EventEnvelope<OrderEventData> envelope = objectMapper.readValue(
payload, new TypeReference<>() {});
envelope.assertSupportedSchema(1);
OrderEventData data = envelope.data();
// 处理业务逻辑
}
事件数据contract#
事件数据(如 OrderEventData)定义在 shop-contracts-event-common 中,使用 Java record:
public record OrderEventData(
String orderId, String orderNo, String buyerId,
String sellerId, String status, BigDecimal totalAmount,
List<OrderItemSummary> items) {
public record OrderItemSummary(String productId, String productName,
int quantity, BigDecimal lineTotal) {}
}
Transactional Outbox Pattern#
Outbox Pattern 是本项目事件驱动的核心实现。它解决的是分布式事务里一个常见问题:如何在尽量保证业务数据和事件消息一起写入成功的前提下,不引入分布式事务协调器?
方案对比#
| 方案 | 原子性 | 复杂度 | 可靠性 |
|---|---|---|---|
| 先写 DB 再发 Kafka | ❌ DB 成功但 Kafka 失败丢消息 | 低 | 低 |
| 先发 Kafka 再写 DB | ❌ Kafka 成功但 DB 失败重复消息 | 低 | 低 |
| 分布式事务(2PC) | ✅ | 高 | 中(协调器单点) |
| Transactional Outbox | ✅ | 中 | 高 |
实现原理#
(INSERT INTO orders) Biz->>Outbox: 1b. 同事务写入 outbox 表
(INSERT INTO wallet_outbox_event) Note over Biz,Outbox: 同一本地事务,原子提交 Pub->>Outbox: 2a. 定时轮询 status='PENDING' Pub->>Kafka: 2b. 发送到 Kafka 话题 Pub->>Outbox: 2c. 标记 status='PUBLISHED'
因为业务数据和 outbox 记录在同一个本地事务中,要么都成功要么都失败,保证了原子性。
四种实现变体#
Shop Platform 中 4 个服务实现了 Outbox Pattern,各有细微差异:
Wallet Service(标准变体):
@Entity
@Table(name = "wallet_outbox_event")
public class WalletOutboxEventEntity {
@Id
private String id; // UUID
private String aggregateId; // wallet_transaction_id
private String topic;
private String eventType;
private String payload; // EventEnvelope JSON
private boolean published;
}
@Scheduled(fixedDelayString = "${shop.wallet.outbox-publish-delay-ms:5000}")
@Transactional
public void publishPendingEvents() {
List<WalletOutboxEventEntity> events = repository
.findTop20ByPublishedFalseOrderByCreatedAtAsc();
for (WalletOutboxEventEntity event : events) {
kafkaTemplate.send(event.getTopic(), event.getPayload());
event.markPublished();
}
repository.saveAll(events);
}
Marketplace Service(同步发送变体):
// 使用 .join() 同步等待 Kafka 确认
kafkaTemplate.send(topic, aggregateId, payload).join();
带 key 发送保证同一个 aggregate 的事件有序。
Order Service(异步变体):
// top20、无 key,发送后批量标记 published
kafkaTemplate.send(topic, payload);
Activity Service(按 game 分区变体):
// 查询 status='PENDING',按 gameId 作为 key,逐条 save
kafkaTemplate.send(event.getTopic(), event.getGameId(), event.getPayload());
为什么不用 Debezium CDC#
Debezium 通过读取 MySQL binlog 自动提取 outbox 数据,不需要应用层写 publisher。这是一种成熟方案,但也有代价:
- 需要额外部署 Debezium Connect 组件
- binlog 格式依赖 MySQL 配置,运维复杂度增加
- 应用层对 outbox 发送的控制力减弱(如暂停、重试、监控)
本项目当前选择应用层 Publisher,更多是出于 POC 阶段的权衡:
- 不需要额外基础设施
- 发送逻辑完全可控(batch size、key 策略、错误处理)
- 代码量不大(每个服务约 30 行)
如果事件量继续增长,或者团队已经具备 CDC 运维经验,那么 Debezium Outbox 依然是很值得重新评估的选项。
IdempotencyGuard:消费者幂等保障#
Kafka 的 at-least-once 语义意味着消息至少投递一次,在网络抖动或消费者重启时可能出现重复投递。消费者通常需要认真处理幂等问题。
接口定义#
public interface IdempotencyGuard {
<T> T executeOnce(String key, Supplier<T> action, Supplier<T> fallback);
}
两种实现#
方案一:DB-only(当前默认实现)
public <T> T executeOnce(String key, Supplier<T> action, Supplier<T> fallback) {
validateKey(key);
if (repository.existsByKey(key)) {
return fallback.get(); // 已处理过,走 fallback
}
try {
return action.get(); // 执行业务
} catch (DataIntegrityViolationException exception) {
return fallback.get(); // 并发重复导致唯一键冲突
}
}
每个服务有自己的幂等表(如 wallet_idempotency_key):
public interface WalletIdempotencyKeyRepository
extends JpaRepository<WalletIdempotencyKeyEntity, String>,
IdempotencyRepository {
boolean existsByIdempotencyKey(String idempotencyKey);
@Override default boolean existsByKey(String key) {
return existsByIdempotencyKey(key);
}
}
方案二:Redis + Bloom Filter(高性能)
public <T> T executeOnce(String key, Supplier<T> action, Supplier<T> fallback) {
// Bloom Filter 快速检查
if (!bloomFilter.contains(key)) {
missCounter.increment();
return executeAction(key, action, fallback);
}
// Bloom Filter 说"可能存在"→ 查 DB 确认
if (repository.existsByKey(key)) {
duplicateHitCounter.increment();
return fallback.get();
}
// 误判(false positive),实际是新 key
falsePositiveHitCounter.increment();
return executeAction(key, action, fallback);
}
两层检查的主要作用是:
- Bloom Filter 说"不存在" → 在 Bloom Filter 语义下可以认为不存在,直接执行(快速路径)
- Bloom Filter 说"可能存在" → 查 DB 确认(因为 Bloom Filter 有误判率)
- Redis 不可用 → 降级到 DB-only 路径
配置:
shop:
idempotency:
bloom-filter:
enabled: true
redis-key: "shop:idempotency:loyalty"
expected-insertions: 1000000 # 预期插入量
false-probability: 0.001 # 误判率 0.1%
消费者中的使用#
@KafkaListener(topics = "${shop.loyalty.order-events-topic}")
@Transactional
public void onOrderEvent(String payload) {
EventEnvelope<OrderEventData> envelope = objectMapper.readValue(
payload, new TypeReference<>() {});
OrderEventData data = envelope.data();
String idempotencyKey = "LOYALTY_ORDER_COMPLETED:" + data.orderId();
idempotencyGuard.executeOnce(
idempotencyKey,
() -> {
handleOrderCompleted(data);
// 在同一事务中保存幂等键
idempotencyKeyRepository.save(
new LoyaltyIdempotencyKeyEntity(idempotencyKey));
return null;
},
() -> {
log.info("Loyalty order event already processed: orderId={}, skipping",
data.orderId());
return null;
});
}
在当前仓库里,ArchUnit 规则 ARCH-06 要求:调用 IdempotencyGuard.executeOnce() 的方法需要标注 @Transactional,保证幂等键写入和业务操作在同一事务中。
@IdempotencyExempt:天然幂等的声明#
有些消费者可以通过自身约束获得天然幂等,不一定需要 IdempotencyGuard。比如 notification-service 通过 notification_log 表的 (eventId, channel) 唯一约束去重:
@Component
@IdempotencyExempt(reason = "notification_log deduplicates by eventId + channel")
public class OrderEventListener {
@KafkaListener(topics = "${shop.notification.order-events-topic}")
public void onOrderEvent(String payload) {
// 直接处理,notification_log 唯一约束承担去重
}
}
ArchUnit 规则 ARCH-05 强制所有 @KafkaListener 所在类要么注入 IdempotencyGuard,要么标注 @IdempotencyExempt。
重试与 DLQ 策略#
Spring Kafka 的 @RetryableTopic 提供了开箱即用的重试机制:
@RetryableTopic(
attempts = "4", // 最多尝试 4 次
backoff = @Backoff(delay = 1000, multiplier = 2), // 指数退避
dltTopicSuffix = ".dlq", // 死信话题后缀
autoCreateTopics = "true", // 自动创建 DLQ topic
exclude = NonRetryableKafkaConsumerException.class // 不可重试的异常
)
@KafkaListener(topics = "${shop.notification.order-events-topic}")
@Transactional
public void onOrderEvent(String payload) {
// 处理逻辑
}
@DltHandler
public void handleDlt(String payload) {
log.error("Event sent to DLT: {}", payload);
}
重试流程#
第 1 次执行失败 → 等待 1s → 第 2 次 → 等待 2s → 第 3 次 → 等待 4s → 第 4 次
↓ (仍然失败)
发送到 order.events.v1.dlq → @DltHandler 记录日志
异常分类#
// 不可重试 → 直接送 DLQ
throw new NonRetryableKafkaConsumerException("Invalid JSON payload", exception);
// 可重试 → 走重试逻辑
throw new RetryableKafkaConsumerException("Database temporarily unavailable", exception);
典型场景:
- JSON 解析失败、schema 验证失败 →
NonRetryableKafkaConsumerException(重试也不会成功) - 数据库连接超时、临时网络抖动 →
RetryableKafkaConsumerException(可能自行恢复)
这里也有两个工程边界值得记一下:第一,autoCreateTopics = true 对本地开发和测试很方便,但生产环境通常还是更适合由 IaC 或平台侧预建 topic;第二,非阻塞重试和 DLT 会影响同一 consumer group 里的处理时序,因此幂等和事件顺序语义都要单独设计,不能默认“Kafka 会帮你兜底”。
无手动 Acknowledgment#
所有消费者都使用 Spring Kafka 默认的“handler 成功才提交 offset”模型。@Transactional 可以把本地数据库事务包进来:如果业务逻辑抛错,消息会重投;但这并不等于 Kafka offset 与数据库更新组成了跨资源原子事务,所以消费者端的幂等仍然需要认真对待,而不是顺手补一层就算了。
Buyer BFF 的同步 Saga#
虽然事件驱动处理了异步协作,但结账流程仍然需要同步 Saga,因为用户需要即时知道支付结果:
预扣库存"] --> S2["② order-service
创建订单"] S2 --> S3["③ wallet-service
扣款"] S3 --> S4["④ loyalty-service
扣积分(可选)"] S4 --> S5["⑤ buyer-bff
清空购物车"] end subgraph 逆向["失败补偿(逆向)"] direction TB F1["⑤' marketplace-service
恢复库存"] --> F2["④' wallet-service
退款"] F2 --> F3["③' loyalty-service
退回积分"] end Fail{"任何一步失败"} -.-> F1
这和 Outbox 异步事件是互补关系:
| 场景 | 模式 | 原因 |
|---|---|---|
| 结账 → 返回支付结果 | 同步 Saga | 用户等待即时反馈 |
| 订单完成 → 发积分 | Outbox + Kafka | 可以延迟,不需要阻塞用户 |
| 订单完成 → 发邮件 | Outbox + Kafka | 异步通知,不影响主流程 |
| 库存扣减失败 → 回滚 | 同步补偿 | 需要尽快回滚,用户才能重新下单 |
典型事件流示例#
与其把所有场景揉成一条"万能链路",不如直接看当前仓库里已经闭环的四条主路径:
初始化引导/奖励任务"] Auth -->|"buyer.registered.v1"| R2["promotion-service
发欢迎券"] Auth -->|"buyer.registered.v1"| R3["notification-service
发送欢迎邮件"] end subgraph 订单["2. 订单完成"] direction TB Order["order-service"] -->|"order.events.v1"| O1["loyalty-service
发放购物积分"] Order -->|"order.events.v1"| O2["notification-service
发送订单确认邮件"] Order -->|"order.events.v1"| O3["webhook-service
向外部订阅方投递"] end subgraph 商品["3. 商品变更"] direction TB MP["marketplace-service"] -->|"marketplace.product.events.v1"| S1["search-service
更新 Meilisearch 索引"] end subgraph 充值["4. 钱包充值完成"] direction TB Wallet["wallet-service"] -->|"wallet.transactions.v1"| W1["promotion-service
生成充值奖励"] Wallet -->|"wallet.transactions.v1"| W2["notification-service
发送充值完成通知"] Wallet -->|"wallet.transactions.v1"| W3["webhook-service
向外部订阅方投递"] end
这样写虽然没有“一条龙故事”那么炫,但更贴近源码里当前已经落地的事实:哪些事件真的发了、哪些消费者真的接了、哪些只是预留配置,一眼能分清。
参考与实现位置#
- Spring Kafka Retry Topics:https://docs.spring.io/spring-kafka/reference/retrytopic.html
- Spring Kafka DLT Strategies:https://docs.spring.io/spring-kafka/reference/retrytopic/dlt-strategies.html
- CloudEvents 规范:https://cloudevents.io/
- Debezium Outbox Pattern:https://debezium.io/documentation/reference/stable/transformations/outbox-event-router.html
- 仓库实现入口:
shared/shop-contracts/shop-contracts-event-common/src/main/java/dev/meirong/shop/contracts/event/EventEnvelope.java、services/order-service/src/main/java/dev/meirong/shop/order/service/OrderOutboxPublisher.java、services/wallet-service/src/main/java/dev/meirong/shop/wallet/service/WalletOutboxPublisher.java、services/marketplace-service/src/main/java/dev/meirong/shop/marketplace/service/MarketplaceOutboxPublisher.java、services/activity-service/src/main/java/dev/meirong/shop/activity/service/ActivityOutboxPublisher.java、shared/shop-common/shop-common-core/src/main/java/dev/meirong/shop/common/idempotency/、services/loyalty-service/src/main/java/dev/meirong/shop/loyalty/listener/OrderEventListener.java、services/promotion-service/src/main/java/dev/meirong/shop/promotion/messaging/WalletRewardListener.java、services/search-service/src/main/java/dev/meirong/shop/search/consumer/ProductEventConsumer.java
小结#
Shop Platform 的事件驱动实现里,我更想记下这些点:
- EventEnvelope 统一模型:eventId、source、type、schemaVersion 标准化,
@JsonIgnoreProperties保证向后兼容 - Transactional Outbox:4 种实现变体,同一事务保证业务数据 + 事件原子性,定时轮询发布
- IdempotencyGuard 双层防护:DB-only 当前默认方案 + Redis Bloom Filter 高性能方案,失败时可降级
- @RetryableTopic 重试 + DLQ:指数退避、异常分类、死信处理
- 同步 Saga + 异步事件互补:结账等用户等待的用同步,积分/通知等用异步
下一篇将深入 插件化活动引擎:GamePlugin SPI 接口如何支持 4 种游戏插件(砸金蛋、抢红包、集卡、虚拟养成),Redis Lua 脚本如何实现原子抢红包,以及 Anti-Cheat 反作弊机制。
项目仓库:github.com/meirongdev/shop(私有,仅供参考)