在前两篇文章中,我们看了 API Gateway 的路由分发和 BFF 的聚合编排。这一篇继续往里走,看看业务承载最集中的一层——领域服务层

📦 本文基于的完整项目源码:https://github.com/meirongdev/shop

上一篇:(三)BFF 聚合层

2026-04 实践更新 领域服务侧当前已经和早期文章发布时相比前进了一步:Problem Details、@ServiceConnection、补偿任务持久化、Kafka 幂等守卫都已落地;本文中的领域边界划分仍然成立,但涉及“后续会做”的部分请以主线仓库现状为准。


领域服务清单#

Shop Platform 核心业务按边界拆成 11 个领域服务,每个服务独立开发、独立部署、拥有自己的数据库 schema;此外 auth-server 自己维护 shop_auth,但它更偏认证边界,本文重点放在业务域服务。

服务 数据库 核心业务
profile-service shop_profile 用户档案、地址簿、卖家档案
marketplace-service shop_marketplace 商品目录、SKU、库存、评价
order-service shop_order 订单状态机、购物车、退款
wallet-service shop_wallet 钱包余额、充值、Stripe 支付
promotion-service shop_promotion 促销引擎、优惠券
loyalty-service shop_loyalty 积分账户、签到、兑换
activity-service shop_activity 插件化游戏引擎
search-service Meilisearch 集成、Feature Toggle
notification-service shop_notification 邮件通知、Channel SPI
webhook-service shop_webhook 开放平台 Webhook、HMAC 签名
subscription-service shop_subscription 订阅计划、自动续费

其中 search-service 没有关系型数据库——它的数据存储在 Meilisearch 搜索引擎中。


每服务独立数据库#

为什么每服务独立 schema#

在微服务架构中,每个领域服务拥有自己的数据库 schema 是一种很常见的组织方式。它的好处:

  • 隔离性:一个服务的数据库变更不影响其他服务
  • 自治性:每个团队可以独立选择 ORM、迁移策略、备份策略
  • 性能边界清晰:慢查询只影响自己的服务,不会拖累整个系统
  • 数据所有权明确order-service 拥有订单数据,其他服务只能通过 API 获取,不能直连数据库

Schema 一览#

Schema 核心表
shop_profile buyer_profile, seller_profile, buyer_address
shop_marketplace marketplace_product, product_sku, product_category, product_review
shop_order shop_order, order_item, order_outbox_event
shop_wallet wallet_account, wallet_transaction, wallet_outbox_event, wallet_idempotency_key
shop_promotion promotion_offer, coupon_template, coupon_instance
shop_loyalty loyalty_account, loyalty_checkin, loyalty_redemption, loyalty_idempotency_key
shop_activity activity_game, activity_participation, activity_reward_prize, activity_collect_card_def, activity_player_card, activity_virtual_farm, activity_outbox_event
shop_notification notification_log
shop_webhook webhook_subscription, webhook_delivery
shop_subscription subscription_plan, subscription, subscription_order_log
shop_auth user_account, social_account

所有服务通过环境变量注入数据库连接信息:

spring:
  datasource:
    url: ${ORDER_DB_URL:jdbc:mysql://mysql:3306/shop_order?createDatabaseIfNotExist=true&useSSL=false&allowPublicKeyRetrieval=true}
    username: ${ORDER_DB_USERNAME:shop}
    password: ${ORDER_DB_PASSWORD:shop}
  jpa:
    hibernate:
      ddl-auto: validate  # 禁止 Hibernate 自动变更 schema
  flyway:
    enabled: true
    repair-on-migrate: true

ddl-auto: validate 是关键配置——Hibernate 启动时校验实体类和数据库 schema 是否匹配,不匹配则启动失败。这有助于在启动阶段就发现 schema 不一致的问题,而不是等到运行时才暴露。


Flyway 渐进式迁移#

每个服务使用 Flyway 管理数据库迁移,迁移脚本放在 src/main/resources/db/migration/ 下。

迁移策略#

  • 版本号递增V1__init.sql, V2__add_index.sql, V3__new_table.sql
  • 不可变原则:已发布的迁移脚本不再直接修改,新增变更通常写成新脚本
  • repair-on-migrate: true:当手动修改过 schema(如本地开发调试)时,Flyway 先修复 metadata 再执行迁移

这里也要说明边界:repair-on-migrate 在本仓库里主要是为了降低本地演进和 POC 调试成本。正式环境里如果有严格的变更审计要求,通常还是会更谨慎地使用它,并优先避免直接手工改 schema。

迁移示例:activity-service#

V1__init.sql                → activity_game, activity_reward_prize, activity_participation, activity_outbox_event
V2__collect_card_tables.sql → activity_collect_card_def, activity_player_card
V3__virtual_farm_tables.sql → activity_virtual_farm
V4__rename_player_id_to_buyer_id.sql → 字段重命名,统一项目里的命名约定

V4 是一个典型的演进式迁移——将 player_id 重命名为 buyer_id,统一项目中的术语:

-- V4__rename_player_id_to_buyer_id.sql
ALTER TABLE activity_participation CHANGE COLUMN player_id buyer_id VARCHAR(36);
ALTER TABLE activity_player_card CHANGE COLUMN player_id buyer_id VARCHAR(36);
ALTER TABLE activity_virtual_farm CHANGE COLUMN player_id buyer_id VARCHAR(36);

测试环境#

单元测试使用 Testcontainers 启动临时 MySQL 容器,Flyway 自动执行迁移:

@Testcontainers
@SpringBootTest
class OrderServiceApplicationTests {
    @Container
    @ServiceConnection
    static MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.4");
}

@ServiceConnection 是 Spring Boot 3.1+ 的注解,自动将 Testcontainers 的连接信息注入到 Spring 上下文中。


JPA 实体与 Repository 模式#

领域包结构#

每个服务将 JPA 实体和 Repository 放在 domain 包下:

order-service/
├── domain/
│   ├── ShopOrderEntity.java
│   ├── OrderItemEntity.java
│   ├── ShopOrderRepository.java
│   ├── OrderItemRepository.java
│   ├── OrderStatus.java              # 字符串常量
│   └── OrderOutboxEventEntity.java   # Outbox 实体
├── service/
│   ├── OrderService.java
│   └── OrderOutboxPublisher.java
└── controller/
    ├── OrderController.java
    └── InternalOrderController.java

在当前仓库里,ArchUnit 规则 LAYER-02 要求 Controller 不能直接访问 Repository,而是通过 Service 层。

实体设计模式#

使用 String 类型 ID(UUID)

@Entity
@Table(name = "shop_order")
public class ShopOrderEntity {
    @Id
    @Column(nullable = false, length = 36)
    private String id;  // UUID.randomUUID().toString()

    @Column(name = "order_no", nullable = false, unique = true, length = 32)
    private String orderNo;  // 对外展示用订单号

    @Column(name = "buyer_id", nullable = false, length = 64)
    private String buyerId;

    @Column(nullable = false, length = 32)
    private String status;  // OrderStatus.PENDING_PAYMENT 等字符串常量

    @Column(name = "total_amount", nullable = false, precision = 19, scale = 2)
    private BigDecimal totalAmount;

    // ... 其他字段
}
  • UUID 字符串而非自增数字:分布式友好,不暴露业务量,跨库迁移无冲突
  • 业务编号(orderNo)与技术 ID 分离:id 用于内部关联,orderNo 用于对外展示
  • 当前仓库里 status 使用 OrderStatus 常量类而不是 enum。这牺牲了一点编译期类型约束,但和当前 API/事件contract里的字符串状态保持了一致

Repository 接口#

public interface ShopOrderRepository extends JpaRepository<ShopOrderEntity, String> {
    List<ShopOrderEntity> findByBuyerIdOrderByCreatedAtDesc(String buyerId);
    List<ShopOrderEntity> findBySellerIdOrderByCreatedAtDesc(String sellerId);
    Optional<ShopOrderEntity> findByPaymentTransactionId(String paymentTransactionId);
    Optional<ShopOrderEntity> findByOrderToken(String orderToken);
}

简单查询用方法名派生,复杂查询用 @QuerySpecification 或 QueryDSL 当然也能用,但 Shop Platform 当前这批查询还没复杂到需要引入这些工具。


统一响应模型:ApiResponse#

所有领域服务的公开 API 返回统一的 ApiResponse<T>

public record ApiResponse<T>(
        String traceId,
        String status,
        String message,
        T data) {

    public static <T> ApiResponse<T> success(T data) {
        return new ApiResponse<>(TraceIdExtractor.currentTraceId(), "SC_OK", "Success", data);
    }

    public static ApiResponse<Void> failure(BusinessErrorCode errorCode, String message) {
        return new ApiResponse<>(TraceIdExtractor.currentTraceId(), errorCode.getCode(), message, null);
    }
}

BFF 层通过 .data() 解包提取实际数据;traceId 则让一次请求在北向 API、日志和调用链之间更容易串起来。这个响应模型是项目约定,不一定适合作为所有公开 API 的唯一答案;如果面向外部生态,Problem Details 之类标准错误模型也完全值得结合使用。

错误语义通过 BusinessException + 统一错误码处理:

public enum CommonErrorCode implements BusinessErrorCode {
    VALIDATION_ERROR("SC_VALIDATION_ERROR", HttpStatus.BAD_REQUEST),
    TOO_MANY_REQUESTS("SC_TOO_MANY_REQUESTS", HttpStatus.TOO_MANY_REQUESTS),
    UNAUTHORIZED("SC_UNAUTHORIZED", HttpStatus.UNAUTHORIZED),
    FORBIDDEN("SC_FORBIDDEN", HttpStatus.FORBIDDEN),
    NOT_FOUND("SC_NOT_FOUND", HttpStatus.NOT_FOUND),
    DOWNSTREAM_ERROR("SC_DOWNSTREAM_ERROR", HttpStatus.BAD_GATEWAY),
    INTERNAL_ERROR("SC_INTERNAL_ERROR", HttpStatus.INTERNAL_SERVER_ERROR),
    INSUFFICIENT_BALANCE("SC_INSUFFICIENT_BALANCE", HttpStatus.BAD_REQUEST),
    COUPON_INVALID("SC_COUPON_INVALID", HttpStatus.BAD_REQUEST),
    COUPON_EXPIRED("SC_COUPON_EXPIRED", HttpStatus.BAD_REQUEST)
}

可信 Header 边界:当前可直接验证的现状#

当前仓库里能够直接从源码验证的主链路是:

flowchart LR Client["客户端"] -->|"JWT"| GW["Gateway"] GW -->|"X-Buyer-Id / X-Username
X-Roles / X-Portal / X-Request-Id"| DS["领域服务"] DS -->|"RestClient"| Header["HeaderPropagationInterceptor 继续透传"]

也就是说,当前稳定能力是 Gateway 统一验 JWT + 注入 Trusted Headers + 下游继续透传。至于早期文档里出现过的 InternalAccessFilter / X-Internal-Token 方案,Kubernetes NetworkPolicy 清单已经明确标注它属于被替代的旧方案,因此本文不再把它写成现状实现。


订单状态机:10 个状态的订单流转#

order-service 的订单状态机是领域服务中最复杂的业务逻辑之一:

public final class OrderStatus {
    public static final String PENDING_PAYMENT = "PENDING_PAYMENT";
    public static final String PAID = "PAID";
    public static final String PROCESSING = "PROCESSING";
    public static final String SHIPPED = "SHIPPED";
    public static final String DELIVERED = "DELIVERED";
    public static final String COMPLETED = "COMPLETED";
    public static final String CANCELLED = "CANCELLED";
    public static final String REFUND_REQUESTED = "REFUND_REQUESTED";
    public static final String REFUND_APPROVED = "REFUND_APPROVED";
    public static final String REFUND_REJECTED = "REFUND_REJECTED";
}

状态转换规则:

stateDiagram-v2 [*] --> PENDING_PAYMENT PENDING_PAYMENT --> PAID: 支付成功 PENDING_PAYMENT --> CANCELLED: 取消 PAID --> PROCESSING: 确认 PAID --> CANCELLED: 退款取消 PROCESSING --> SHIPPED: 发货 PROCESSING --> REFUND_REQUESTED: 申请退款 SHIPPED --> DELIVERED: 送达 DELIVERED --> COMPLETED: 完成 REFUND_REQUESTED --> REFUND_APPROVED: 批准 REFUND_REQUESTED --> REFUND_REJECTED: 拒绝 CANCELLED --> [*] COMPLETED --> [*] REFUND_APPROVED --> [*] REFUND_REJECTED --> [*]

状态转换主要封装在 ShopOrderEntity 自己的方法里,而不是散落在 Controller 或 Service 的分支判断中:

public void markPaid(String paymentTransactionId) {
    assertStatus(OrderStatus.PENDING_PAYMENT, "pay");
    this.status = OrderStatus.PAID;
    this.paymentTransactionId = paymentTransactionId;
    this.paidAt = Instant.now();
}

和文章一开始的旧版状态命名相比,当前实现把“支付中 / 履约中 / 已履约 / 退款中 / 已退款”这类更细粒度概念压缩成了更贴近现状的 10 个字符串状态。这更符合当前 POC 的行为范围,也更容易与对外 DTO 和事件contract保持一致。


补偿任务:持久化重试机制#

对于关键的补偿操作(如库存恢复),marketplace-service 实现了持久化补偿任务机制:

@Entity
@Table(name = "compensation_task",
        indexes = @Index(name = "idx_comp_task_status_next",
                        columnList = "status, next_retry_at"))
public class CompensationTaskEntity {
    public enum Status { PENDING, SUCCEEDED, FAILED }

    @Id
    private String id;                      // UUID
    @Column(nullable = false)
    private String taskType;                // "ROLLBACK_INVENTORY", "CANCEL_RESERVATION"
    @Column(nullable = false)
    private String aggregateId;             // productId
    @Column(columnDefinition = "TEXT")
    private String payload;                 // quantity 等参数的 JSON
    @Enumerated(EnumType.STRING)
    private Status status;
    private int retryCount;
    private int maxRetries = 5;
    private Instant nextRetryAt;
    private String lastError;
}

调度器每 60 秒扫描到期任务:

@Scheduled(fixedDelayString = "${shop.compensation.retry-interval-ms:60000}")
@Transactional
public void retryPendingTasks() {
    List<CompensationTaskEntity> dueTasks = taskRepository.findDuePendingTasks(Instant.now());
    for (CompensationTaskEntity task : dueTasks) {
        try {
            executeTask(task);
            task.markSucceeded();
        } catch (Exception e) {
            task.recordFailure(e.getMessage());  // 指数退避: 4^retryCount * 30s
        }
        taskRepository.save(task);
    }
}

指数退避公式:delay = 4^retryCount * 30 秒。第 1 次重试 2 分钟后,第 2 次 8 分钟后,第 3 次 32 分钟后……超过最大重试次数后标记为 FAILED,需要人工介入。当前实现还没有引入 jitter 或 circuit breaker,这在生产化时通常值得继续补强。


OpenAPI 注解集成#

每个领域服务通过 SpringDoc OpenAPI 自动生成 API 文档:

@RestController
@RequestMapping("/internal/marketplace/v1")
@Tag(name = "Marketplace Internal", description = "Internal API for BFF consumption")
public class MarketplaceInternalController {

    @Operation(summary = "List products by IDs",
               description = "Batch fetch products by their IDs")
    @ApiResponses({
        @ApiResponse(responseCode = "200", description = "Products found"),
        @ApiResponse(responseCode = "400", description = "Invalid request")
    })
    @PostMapping("/products/batch")
    public ApiResponse<List<ProductResponse>> listProductsByIds(
            @RequestBody @Valid ProductBatchRequest request) {
        // ...
    }
}

Gateway 将所有服务的 /v3/api-docs 聚合到一个 Swagger UI 中,前端开发不需要单独启动每个服务就能浏览完整 API。


参考与实现位置#

  • Flyway Versioned Migrations:https://documentation.red-gate.com/flyway/flyway-concepts/migrations/versioned-migrations
  • Spring Boot Testcontainers:https://docs.spring.io/spring-boot/reference/testing/testcontainers.html
  • 仓库实现入口:services/order-service/src/main/resources/application.ymlservices/order-service/src/main/java/dev/meirong/shop/order/domain/ShopOrderEntity.javaservices/order-service/src/main/java/dev/meirong/shop/order/domain/ShopOrderRepository.javaservices/order-service/src/main/java/dev/meirong/shop/order/domain/OrderStatus.javashared/shop-common/shop-common-core/src/main/java/dev/meirong/shop/common/api/ApiResponse.javashared/shop-common/shop-common-core/src/main/java/dev/meirong/shop/common/http/HeaderPropagationInterceptor.javaplatform/k8s/apps/base/network-policies.yaml

小结#

领域服务这一层目前采用的主要实践,可以概括为:

  • 数据库隔离:每服务独立 schema,Flyway 渐进式迁移,ddl-auto: validate 避免 Hibernate 自动改 schema
  • JPA 组织方式:UUID 字符串 ID、业务编号与技术 ID 分离、domain 包集中放实体与 Repository
  • 统一响应模型traceId + status + message + data,错误码采用 SC_* 约定
  • 调用边界:Gateway 注入 Trusted Headers,下游通过 HeaderPropagationInterceptor 继续透传
  • 状态机驱动:订单 10 个字符串状态由实体方法集中约束转换规则
  • 持久化补偿CompensationTaskEntity + 调度器 + 指数退避,尽量降低关键补偿被漏掉的风险

下一篇将深入 事件驱动架构:Kafka 话题设计、EventEnvelope 规范、Outbox Pattern 实现、IdempotencyGuard + Bloom Filter 幂等保障、以及重试与 DLQ 策略。

项目仓库:github.com/meirongdev/shop(公开,可结合源码一起看)