使用應用事件

為了讓應用模組之間儘可能地解耦,它們主要的互動方式應該是事件的釋出和消費。這避免了發起模組瞭解所有可能感興趣的各方,這是實現應用模組整合測試(參見 應用模組整合測試)的關鍵方面。

我們通常會發現應用元件的定義如下

  • Java

  • Kotlin

@Service
@RequiredArgsConstructor
public class OrderManagement {

  private final InventoryManagement inventory;

  @Transactional
  public void complete(Order order) {

    // State transition on the order aggregate go here

    // Invoke related functionality
    inventory.updateStockFor(order);
  }
}
@Service
class OrderManagement(val inventory: InventoryManagement) {

  @Transactional
  fun complete(order: Order) {
    inventory.updateStockFor(order)
  }
}

complete(…) 方法會在某種程度上產生功能引力,它會吸引相關的功能,從而與在其他應用模組中定義的 Spring bean 進行互動。這尤其使得該元件更難測試,因為僅僅建立 OrderManagement 的例項就需要那些依賴的 bean 的可用例項(參見 處理外向依賴)。這也意味著無論何時我們希望與業務事件的訂單完成進一步整合功能,都必須修改該類。

我們可以按如下方式改變應用模組的互動

透過 Spring 的 ApplicationEventPublisher 釋出應用事件
  • Java

  • Kotlin

@Service
@RequiredArgsConstructor
public class OrderManagement {

  private final ApplicationEventPublisher events;
  private final OrderInternal dependency;

  @Transactional
  public void complete(Order order) {

    // State transition on the order aggregate go here

    events.publishEvent(new OrderCompleted(order.getId()));
  }
}
@Service
class OrderManagement(val events: ApplicationEventPublisher, val dependency: OrderInternal) {

  @Transactional
  fun complete(order: Order) {
    events.publishEvent(OrderCompleted(order.id))
  }
}

注意,我們沒有依賴其他應用模組的 Spring bean,而是使用 Spring 的 ApplicationEventPublisher 在主聚合上完成狀態轉換後釋出一個領域事件。有關更面向聚合的事件釋出方法,請參閱 Spring Data 的應用事件釋出機制瞭解詳情。由於事件釋出預設是同步發生的,因此整體安排的事務語義與上述示例相同。這既有好處,因為我們獲得了一個非常簡單的最終一致性模型(訂單的狀態更改和庫存更新要麼都成功,要麼都不成功),但也有壞處,因為更多觸發的相關功能將擴大事務邊界,並可能導致整個事務失敗,即使導致錯誤的功能並不關鍵。

另一種方法是將事件消費轉移到事務提交時的非同步處理,並將次要功能正是如此對待

一個非同步的事務性事件監聽器
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @Async
  @TransactionalEventListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @Async
  @TransactionalEventListener
  fun on(event: OrderCompleted) { /* … */ }
}

這有效地將原始事務與監聽器的執行解耦。雖然這避免了原始業務事務的擴充套件,但也帶來了一個風險:如果監聽器因任何原因失敗,事件釋出將丟失,除非每個監聽器都實現了自己的安全網。更糟糕的是,這甚至不能完全奏效,因為系統可能在方法被呼叫之前就失敗了。

應用模組監聽器

要在事務中執行事務性事件監聽器,它需要相應地用 @Transactional 進行註解。

一個執行在自身事務中的非同步事務性事件監聽器
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @Async
  @Transactional(propagation = Propagation.REQUIRES_NEW)
  @TransactionalEventListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @Async
  @Transactional(propagation = Propagation.REQUIRES_NEW)
  @TransactionalEventListener
  fun on(event: OrderCompleted) { /* … */ }
}

為了簡化透過事件整合模組的預設方式的宣告,Spring Modulith 提供了 @ApplicationModuleListener 作為快捷方式。

一個應用模組監聽器
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @ApplicationModuleListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @ApplicationModuleListener
  fun on(event: OrderCompleted) { /* … */ }
}

事件釋出登錄檔

Spring Modulith 附帶了一個事件釋出登錄檔,它與 Spring Framework 的核心事件釋出機制掛鉤。在事件釋出時,它會發現哪些事務性事件監聽器將接收到事件,並在原始業務事務中為每個監聽器(深藍色)寫入一個條目到事件釋出日誌中。

event publication registry start
圖 1. 執行前的事務性事件監聽器安排

每個事務性事件監聽器都被包裝在一個切面中,如果監聽器執行成功,該切面將該日誌條目標記為已完成。如果監聽器失敗,日誌條目保持不變,以便可以根據應用的需要部署重試機制。可以透過 spring.modulith.events.republish-outstanding-events-on-restart 屬性啟用事件的自動重新發布。

event publication registry end
圖 2. 執行後的事務性事件監聽器安排

Spring Boot 事件登錄檔 Starter

使用事務性事件釋出日誌需要向您的應用新增一組構件。為了簡化這項任務,Spring Modulith 提供了圍繞要使用的持久化技術而設計的 starter POM,並預設使用基於 Jackson 的 EventSerializer 實現。以下 starter 可用

持久化技術 構件 描述

JPA

spring-modulith-starter-jpa

使用 JPA 作為持久化技術。

JDBC

spring-modulith-starter-jdbc

使用 JDBC 作為持久化技術。也可用於基於 JPA 的應用,但繞過您的 JPA 提供程式進行實際的事件持久化。

MongoDB

spring-modulith-starter-mongodb

使用 MongoDB 作為持久化技術。也啟用 MongoDB 事務,並需要伺服器配置為副本集才能進行互動。透過將 spring.modulith.events.mongodb.transaction-management.enabled 屬性設定為 false 可以停用事務自動配置。

Neo4j

spring-modulith-starter-neo4j

在 Spring Data Neo4j 後面使用 Neo4j。

管理事件釋出

事件釋出可能需要在應用執行時以多種方式進行管理。未完成的釋出可能需要在一定時間後重新提交給相應的監聽器。另一方面,已完成的釋出很可能需要從資料庫中清除或移動到歸檔儲存中。由於這類清理工作的需求因應用而異,Spring Modulith 提供了一個 API 來處理這兩種型別的釋出。該 API 透過您可以新增到應用的 spring-modulith-events-api 構件提供

使用 Spring Modulith Events API 構件
  • Maven

  • Gradle

<dependency>
  <groupId>org.springframework.modulith</groupId>
  <artifactId>spring-modulith-events-api</artifactId>
  <version>1.3.5</version>
</dependency>
dependencies {
  implementation 'org.springframework.modulith:spring-modulith-events-api:1.3.5'
}

此構件包含兩個主要抽象,它們作為 Spring Bean 可供應用程式碼使用

  • CompletedEventPublications — 該介面允許訪問所有已完成的事件釋出,並提供一個 API,用於立即將它們全部從資料庫中清除,或清除早於指定時長(例如,1 分鐘)的已完成釋出。

  • IncompleteEventPublications — 該介面允許訪問所有未完成的事件釋出,以便重新提交符合指定謂詞或早於相對於原始釋出日期的指定 Duration 的釋出。

事件釋出完成

當事務性或 @ApplicationModuleListener 執行成功完成後,事件釋出會被標記為已完成。預設情況下,完成透過在 EventPublication 上設定完成日期來註冊。這意味著已完成的釋出將保留在事件釋出登錄檔中,以便可以透過上面描述的 CompletedEventPublications 介面進行檢查。這樣做的一個後果是,您需要編寫一些程式碼來定期清除舊的、已完成的 EventPublication。否則,它們的持久化抽象(例如關係資料庫表)將無限增長,並且與儲存進行互動建立和完成新的 EventPublication 可能會變慢。

Spring Modulith 1.3 引入了一個配置屬性 spring.modulith.events.completion-mode 來支援另外兩種完成模式。它預設為 UPDATE,由上面描述的策略支援。或者,可以將完成模式設定為 DELETE,這將更改登錄檔的持久化機制,改為在完成時刪除 EventPublication。這意味著 CompletedEventPublications 將不再返回任何釋出,但同時,您也不必再手動從持久化儲存中清除已完成的事件。

第三種選項是 ARCHIVE 模式,它將條目複製到歸檔表、集合或節點中。對於該歸檔條目,將設定完成日期並移除原始條目。與 DELETE 模式相反,已完成的事件釋出仍然可以透過 CompletedEventPublications 抽象進行訪問。

事件釋出倉庫

為了實際寫入事件釋出日誌,Spring Modulith 暴露了一個 EventPublicationRepository SPI 以及支援事務的流行持久化技術的實現,如 JPA、JDBC 和 MongoDB。透過將相應的 JAR 新增到您的 Spring Modulith 應用來選擇要使用的持久化技術。我們已準備好專用的 starter 來簡化該任務。

基於 JDBC 的實現可以在設定相應的配置屬性 (spring.modulith.events.jdbc.schema-initialization.enabled) 為 true 時,為事件釋出日誌建立一個專用表。詳情請查閱附錄中的 Schema 概覽

事件序列化器

每個日誌條目都包含序列化形式的原始事件。包含在 spring-modulith-events-core 中的 EventSerializer 抽象允許插入不同的策略,用於將事件例項轉換為適合資料儲存的格式。Spring Modulith 透過 spring-modulith-events-jackson 構件提供了一個基於 Jackson 的 JSON 實現,該構件預設透過標準 Spring Boot 自動配置註冊一個消費 ObjectMapperJacksonEventSerializer

自定義事件釋出日期

預設情況下,事件釋出登錄檔將使用 Clock.systemUTC() 返回的日期作為事件釋出日期。如果您想自定義,請在應用上下文註冊一個 Clock 型別的 bean

@Configuration
class MyConfiguration {

  @Bean
  Clock myCustomClock() {
    return … // Your custom Clock instance created here.
  }
}

外部化事件

應用模組之間交換的一些事件可能對外部系統感興趣。Spring Modulith 允許將選定的事件釋出到各種訊息代理。要使用該支援,您需要執行以下步驟

  1. 將特定於代理的 Spring Modulith 構件新增到您的專案中。

  2. 透過使用 Spring Modulith 或 jMolecules 的 @Externalized 註解來標記要外部化的事件型別。

  3. 在註解的值中指定特定於代理的路由目標。

要了解如何使用其他方式選擇要外部化的事件,或自定義它們在代理內的路由,請查閱 事件外部化基礎知識

支援的基礎設施

代理 構件 描述

Kafka

spring-modulith-events-kafka

使用 Spring Kafka 與代理互動。邏輯路由鍵將用作 Kafka 的 Topic 和 Message Key。

AMQP

spring-modulith-events-amqp

使用 Spring AMQP 與任何相容代理互動。例如,需要顯式宣告對 Spring Rabbit 的依賴。邏輯路由鍵將用作 AMQP 路由鍵。

JMS

spring-modulith-events-jms

使用 Spring 的核心 JMS 支援。不支援路由鍵。

SQS

spring-modulith-events-aws-sqs

已棄用(詳情參見 此處)。使用 Spring Cloud AWS SQS 支援。邏輯路由鍵將用作 SQS 訊息組 ID。當設定路由鍵時,要求 SQS 佇列配置為 FIFO 佇列。

SNS

spring-modulith-events-aws-sns

已棄用(詳情參見 此處)。使用 Spring Cloud AWS SNS 支援。邏輯路由鍵將用作 SNS 訊息組 ID。當設定路由鍵時,要求 SNS 配置為啟用基於內容的去重功能的 FIFO Topic。

Spring Messaging

spring-modulith-events-messaging

使用 Spring 的核心 MessageMessageChannel 支援。根據 Externalized 註解中的目標值,透過 bean 名稱解析目標 MessageChannel。將路由資訊作為名為 springModulith_routingTarget 的 Header 轉發,以便下游元件以任何方式處理,通常是在 Spring Integration 的 IntegrationFlow 中。

事件外部化基礎知識

事件外部化對釋出的每個應用事件執行三個步驟。

  1. 確定事件是否應該被外部化——我們稱之為“事件選擇”。預設情況下,只有位於 Spring Boot 自動配置包內並用支援的 @Externalized 註解之一標記的事件型別才會被選擇進行外部化。

  2. 準備訊息(可選)——預設情況下,事件由相應的代理基礎設施按原樣序列化。可選的對映步驟允許開發者自定義或甚至完全替換原始事件,使用適合外部方的 Payload。對於 Kafka 和 AMQP,開發者還可以向要釋出的訊息新增 Header。

  3. 確定路由目標——訊息代理客戶端需要一個邏輯目標來發布訊息。目標通常標識物理基礎設施(Topic、Exchange 或 Queue,取決於代理),並且通常靜態地從事件型別派生。除非在 @Externalized 註解中明確定義,Spring Modulith 使用應用本地型別名稱作為目標。換句話說,在一個基礎包為 com.acme.app 的 Spring Boot 應用中,事件型別 com.acme.app.sample.SampleEvent 將釋出到 sample.SampleEvent

    一些代理還允許定義一個更具動態性的路由鍵,它用於實際目標內的不同目的。預設情況下,不使用路由鍵。

基於註解的事件外部化配置

要透過 @Externalized 註解定義自定義路由鍵,可以在各個註解中的 target/value 屬性使用 $target::$key 模式。target 和 key 都可以是一個 SpEL 表示式,事件例項將被配置為根物件。

透過 SpEL 表示式定義動態路由鍵
  • Java

  • Kotlin

@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {

  String getLastname() { (1)
    // …
  }
}
@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {
  fun getLastname(): String { (1)
    // …
  }
}

CustomerCreated 事件透過 accessor 方法暴露客戶的姓氏。然後該方法透過目標宣告的 :: 分隔符後的 key 表示式 #this.getLastname() 使用。

如果鍵的計算變得更復雜,建議將其委託給一個接受事件作為引數的 Spring bean。

呼叫 Spring bean 計算路由鍵
  • Java

  • Kotlin

@Externalized("…::#{@beanName.someMethod(#this)}")
@Externalized("…::#{@beanName.someMethod(#this)}")

程式設計式事件外部化配置

spring-modulith-events-api 構件包含 EventExternalizationConfiguration,允許開發者自定義上述所有步驟。

程式設計式配置事件外部化
  • Java

  • Kotlin

@Configuration
class ExternalizationConfiguration {

  @Bean
  EventExternalizationConfiguration eventExternalizationConfiguration() {

    return EventExternalizationConfiguration.externalizing()                 (1)
      .select(EventExternalizationConfiguration.annotatedAsExternalized())   (2)
      .mapping(SomeEvent.class, event -> …)                                  (3)
      .headers(event -> …)                                                   (4)
      .routeKey(WithKeyProperty.class, WithKeyProperty::getKey)              (5)
      .build();
  }
}
@Configuration
class ExternalizationConfiguration {

  @Bean
  fun eventExternalizationConfiguration(): EventExternalizationConfiguration {

    EventExternalizationConfiguration.externalizing()                         (1)
      .select(EventExternalizationConfiguration.annotatedAsExternalized())    (2)
      .mapping(SomeEvent::class.java) { event -> … }                          (3)
      .headers() { event -> … }                                               (4)
      .routeKey(WithKeyProperty::class.java, WithKeyProperty::getKey)         (5)
      .build()
  }
}
1 我們首先建立一個預設的 EventExternalizationConfiguration 例項。
2 我們透過呼叫上一步返回的 Selector 例項上的一個或多個 select(…) 方法來定製事件選擇。這一步基本上停用了應用基礎包過濾器,因為我們現在只查詢註解。存在按型別、按包、按包和註解輕鬆選擇事件的便捷方法。此外,還有一步定義選擇和路由的快捷方式。
3 我們為 SomeEvent 例項定義一個對映步驟。請注意,路由仍將由原始事件例項確定,除非您額外呼叫 router 上的 ….routeMapped() 方法。
4 我們為要傳送的訊息新增自定義 Header,可以像所示的那樣通用新增,也可以針對特定 Payload 型別進行新增。
5 我們最終透過定義一個方法控制代碼來確定路由鍵,以提取事件例項的一個值。另外,可以透過使用上一步呼叫返回的 Router 例項上的通用 route(…) 方法為單個事件生成完整的 RoutingKey

測試已釋出的事件

以下部分描述了一種僅關注跟蹤 Spring 應用事件的測試方法。對於使用 @ApplicationModuleListener 的模組測試的更全面的方法,請檢視 Scenario API

Spring Modulith 的 @ApplicationModuleTest 使能夠將 PublishedEvents 例項注入到測試方法中,以驗證在被測試的業務操作過程中是否釋出了特定的一組事件。

基於事件的應用模組安排整合測試
  • Java

  • Kotlin

@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  void someTestMethod(PublishedEvents events) {

    // …
    var matchingMapped = events.ofType(OrderCompleted.class)
      .matching(OrderCompleted::getOrderId, reference.getId());

    assertThat(matchingMapped).hasSize(1);
  }
}
@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  fun someTestMethod(events: PublishedEvents events) {

    // …
    val matchingMapped = events.ofType(OrderCompleted::class.java)
      .matching(OrderCompleted::getOrderId, reference.getId())

    assertThat(matchingMapped).hasSize(1)
  }
}

注意 PublishedEvents 如何公開一個 API 來選擇匹配特定標準的事件。驗證是透過 AssertJ 斷言來完成的,該斷言驗證預期的元素數量。如果您本來就使用 AssertJ 進行這些斷言,您也可以使用 AssertablePublishedEvents 作為測試方法引數型別,並使用透過它提供的流式斷言 API。

使用 AssertablePublishedEvents 驗證事件釋出
  • Java

  • Kotlin

@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  void someTestMethod(AssertablePublishedEvents events) {

    // …
    assertThat(events)
      .contains(OrderCompleted.class)
      .matching(OrderCompleted::getOrderId, reference.getId());
  }
}
@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  fun someTestMethod(events: AssertablePublishedEvents) {

    // …
    assertThat(events)
      .contains(OrderCompleted::class.java)
      .matching(OrderCompleted::getOrderId, reference.getId())
  }
}

注意 assertThat(…) 表示式返回的型別如何允許直接對已釋出的事件定義約束。