使用應用程式事件
為了使應用程式模組之間儘可能解耦,它們主要透過事件釋出和消費進行互動。這避免了原始模組瞭解所有可能感興趣的方,這是實現應用程式模組整合測試的關鍵方面(參見 整合測試應用程式模組)。
通常我們會發現應用程式元件的定義如下:
-
Java
-
Kotlin
@Service
@RequiredArgsConstructor
public class OrderManagement {
private final InventoryManagement inventory;
@Transactional
public void complete(Order order) {
// State transition on the order aggregate go here
// Invoke related functionality
inventory.updateStockFor(order);
}
}
@Service
class OrderManagement(val inventory: InventoryManagement) {
@Transactional
fun complete(order: Order) {
inventory.updateStockFor(order)
}
}
complete(…) 方法在某種程度上產生了功能引力,它吸引了相關功能以及與其他應用程式模組中定義的 Spring Bean 的互動。這使得元件更難測試,因為我們只需要依賴這些 Bean 的例項即可建立 OrderManagement 例項(參見 處理傳出依賴)。這也意味著,每當我們想將更多功能與業務事件訂單完成整合時,都必須修改該類。
我們可以如下更改應用程式模組的互動方式:
ApplicationEventPublisher 釋出應用程式事件-
Java
-
Kotlin
@Service
@RequiredArgsConstructor
public class OrderManagement {
private final ApplicationEventPublisher events;
private final OrderInternal dependency;
@Transactional
public void complete(Order order) {
// State transition on the order aggregate go here
events.publishEvent(new OrderCompleted(order.getId()));
}
}
@Service
class OrderManagement(val events: ApplicationEventPublisher, val dependency: OrderInternal) {
@Transactional
fun complete(order: Order) {
events.publishEvent(OrderCompleted(order.id))
}
}
請注意,我們沒有依賴其他應用程式模組的 Spring bean,而是使用 Spring 的 ApplicationEventPublisher 在主聚合上的狀態轉換完成後釋出領域事件。有關更聚合驅動的事件釋出方法,請參閱 Spring Data 的應用程式事件釋出機制 以獲取詳細資訊。由於事件釋出預設是同步進行的,因此整個安排的事務語義與上述示例相同。這既有好處,因為我們得到一個非常簡單的一致性模型(訂單的狀態更改和庫存更新要麼都成功,要麼都失敗),也有壞處,因為更多觸發的相關功能會擴大事務邊界,並可能導致整個事務失敗,即使導致錯誤的功能並不關鍵。
另一種方法是將事件消費轉移到事務提交時的非同步處理,並將次要功能視為次要功能。
-
Java
-
Kotlin
@Component
class InventoryManagement {
@Async
@TransactionalEventListener
void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {
@Async
@TransactionalEventListener
fun on(event: OrderCompleted) { /* … */ }
}
現在,這有效地將原始事務與偵聽器的執行分離。雖然這避免了原始業務事務的擴充套件,但它也帶來了風險:如果偵聽器因任何原因失敗,事件釋出就會丟失,除非每個偵聽器都實現了自己的安全網。更糟糕的是,這甚至不能完全奏效,因為系統可能在方法被呼叫之前就失敗了。
應用程式模組監聽器
為了使事務性事件監聽器在其自身的事務中執行,它需要反過來用 @Transactional 進行註解。
-
Java
-
Kotlin
@Component
class InventoryManagement {
@Async
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener
void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {
@Async
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener
fun on(event: OrderCompleted) { /* … */ }
}
為了簡化透過事件整合模組的預設方式的宣告,Spring Modulith 提供了 @ApplicationModuleListener 作為快捷方式。
-
Java
-
Kotlin
@Component
class InventoryManagement {
@ApplicationModuleListener
void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {
@ApplicationModuleListener
fun on(event: OrderCompleted) { /* … */ }
}
事件釋出登錄檔
Spring Modulith 附帶了一個事件釋出登錄檔,它與 Spring Framework 的核心事件釋出機制掛鉤。在事件釋出時,它會發現將接收到事件的事務性事件監聽器,併為每個監聽器(深藍色)寫入條目,作為原始業務事務的一部分,將其記錄到事件釋出日誌中。
每個事務性事件監聽器都封裝在一個切面中,如果監聽器執行成功,則將該日誌條目標記為已完成。如果監聽器失敗,則日誌條目保持不變,以便可以根據應用程式的需求部署重試機制。可以透過 spring.modulith.events.republish-outstanding-events-on-restart 屬性啟用事件的自動重新發布。
Spring Boot 事件登錄檔啟動器
使用事務性事件釋出日誌需要將一系列工件新增到應用程式中。為了簡化此任務,Spring Modulith 提供了圍繞 要使用的持久化技術 為中心的 starter POM,並預設使用基於 Jackson 的 EventSerializer 實現。以下 starter 可用:
| 持久化技術 | 工件 | 描述 |
|---|---|---|
JPA |
|
使用 JPA 作為持久化技術。 |
JDBC |
|
使用 JDBC 作為持久化技術。也適用於基於 JPA 的應用程式,但會繞過 JPA 提供程式進行實際的事件持久化。 |
MongoDB |
|
使用 MongoDB 作為持久化技術。還支援 MongoDB 事務,並要求伺服器設定副本集以進行互動。可以透過將 |
Neo4j |
|
使用 Spring Data Neo4j 背後的 Neo4j。 |
管理事件釋出
在應用程式執行期間,事件釋出可能需要以各種方式進行管理。未完成的釋出可能需要在給定時間後重新提交給相應的偵聽器。另一方面,已完成的釋出可能需要從資料庫中清除或移動到歸檔儲存中。由於這種清理需求因應用程式而異,Spring Modulith 提供了一個 API 來處理這兩種型別的釋出。該 API 透過 spring-modulith-events-api 工件提供,您可以將其新增到您的應用程式中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-events-api</artifactId>
<version>2.0.0</version>
</dependency>
dependencies {
implementation 'org.springframework.modulith:spring-modulith-events-api:2.0.0'
}
此工件包含兩個主要抽象,它們可作為 Spring Beans 供應用程式程式碼使用:
-
CompletedEventPublications— 此介面允許訪問所有已完成的事件釋出,並提供一個 API 來立即從資料庫中清除所有這些釋出,或清除早於給定持續時間(例如,1 分鐘)的已完成釋出。 -
IncompleteEventPublications— 此介面允許訪問所有未完成的事件釋出,以重新提交與給定謂詞匹配的釋出,或重新提交早於相對於原始釋出日期的給定Duration的釋出。
事件釋出完成
當事務性或 @ApplicationModuleListener 執行成功完成時,事件釋出被標記為已完成。預設情況下,透過在 EventPublication 上設定完成日期來註冊完成。這意味著已完成的釋出將保留在事件釋出登錄檔中,以便可以透過如上所述的 CompletedEventPublications 介面進行檢查。這樣做的結果是,您需要編寫一些程式碼來定期清除舊的、已完成的 EventPublication。否則,它們的持久化抽象(例如關係資料庫表)將無限增長,並且與建立和完成新 EventPublication 的儲存互動可能會變慢。
Spring Modulith 1.3 引入了一個配置屬性 spring.modulith.events.completion-mode 以支援兩種額外的完成模式。它預設為 UPDATE,由上述策略支援。另外,完成模式可以設定為 DELETE,這將改變登錄檔的持久化機制,以便在完成時刪除 EventPublication。這意味著 CompletedEventPublications 將不再返回任何釋出,但同時,您也不必再手動從持久化儲存中清除已完成的事件。
第三種選擇是 ARCHIVE 模式,它將條目複製到存檔表、集合或節點中。對於該存檔條目,設定完成日期並刪除原始條目。與 DELETE 模式相反,已完成的事件釋出仍然可以透過 CompletedEventPublications 抽象進行訪問。
事件釋出倉庫
為了實際寫入事件釋出日誌,Spring Modulith 暴露了一個 EventPublicationRepository SPI,以及針對支援事務的流行持久化技術(如 JPA、JDBC 和 MongoDB)的實現。您可以透過將相應的 JAR 新增到 Spring Modulith 應用程式中來選擇要使用的持久化技術。我們已經準備了專門的 starter 來簡化此任務。
當相應的配置屬性 (spring.modulith.events.jdbc.schema-initialization.enabled) 設定為 true 時,基於 JDBC 的實現可以為事件釋出日誌建立專用表。有關詳細資訊,請查閱附錄中的 架構概述。
事件外部化
在應用程式模組之間交換的一些事件可能對外部系統感興趣。Spring Modulith 允許將選定的事件釋出到各種訊息代理。要使用該支援,您需要執行以下步驟:
-
將 特定於代理的 Spring Modulith 工件 新增到您的專案中。
-
透過使用 Spring Modulith 或 jMolecules 的
@Externalized註解對事件型別進行註釋,選擇要外部化的事件型別。 -
在註解的值中指定特定於代理的路由目標。
要了解如何使用其他方式選擇要外部化的事件,或自定義其在代理中的路由,請檢視 事件外部化基礎。
支援的基礎設施
| 代理 | 工件 | 描述 |
|---|---|---|
Kafka |
|
使用 Spring Kafka 與代理進行互動。邏輯路由鍵將用作 Kafka 的主題和訊息鍵。 |
AMQP |
|
使用 Spring AMQP 與任何相容的代理進行互動。例如,需要明確宣告 Spring Rabbit 的依賴項。邏輯路由鍵將用作 AMQP 路由鍵。 |
JMS |
|
使用 Spring 的核心 JMS 支援。不支援路由鍵。 |
Spring Messaging |
|
使用 Spring 的核心 |
事件外部化基礎
Spring Modulith 的事件外部化是作為 事務性事件監聽器 實現的,它委託給特定於代理的釋出實現。這意味著 Spring Modulith 的 事件釋出登錄檔 在與代理互動期間防止外部化失敗,以便可以透過提供的 API 重新提交發布。
事件外部化對每個釋出的應用程式事件執行三個步驟。
-
確定事件是否應該被外部化 — 我們稱之為“事件選擇”。預設情況下,只有位於 Spring Boot 自動配置包中並用一個受支援的
@Externalized註解標記的事件型別才會被選擇進行外部化。 -
準備訊息(可選) — 預設情況下,事件由相應的代理基礎設施按原樣序列化。可選的對映步驟允許開發人員自定義甚至完全替換原始事件,使其成為適合外部方的有效載荷。對於 Kafka 和 AMQP,開發人員還可以向要釋出的訊息新增頭部。
-
確定路由目標 — 訊息代理客戶端需要一個邏輯目標來發布訊息。目標通常標識物理基礎設施(根據代理的不同,可以是主題、交換機或佇列),並且通常根據事件型別靜態派生。除非在
@Externalized註解中明確定義,否則 Spring Modulith 將使用應用程式本地型別名稱作為目標。換句話說,在一個基礎包為com.acme.app的 Spring Boot 應用程式中,事件型別com.acme.app.sample.SampleEvent將釋出到sample.SampleEvent。一些代理還允許定義一個更動態的路由鍵,該路由鍵在實際目標中用於不同的目的。預設情況下,不使用路由鍵。
基於註解的事件外部化配置
要透過 @Externalized 註解定義自定義路由鍵,可以使用 $target::$key 模式作為每個特定註解中可用的目標/值屬性。目標和鍵都可以是 SpEL 表示式,它將事件例項配置為根物件。
-
Java
-
Kotlin
@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {
String getLastname() { (1)
// …
}
}
@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {
fun getLastname(): String { (1)
// …
}
}
CustomerCreated 事件透過訪問器方法暴露客戶的姓氏。該方法透過目標宣告中 :: 分隔符後的鍵表示式中的 #this.getLastname() 表示式使用。
如果鍵的計算變得更加複雜,建議將其委託給一個以事件作為引數的 Spring Bean。
-
Java
-
Kotlin
@Externalized("…::#{@beanName.someMethod(#this)}")
@Externalized("…::#{@beanName.someMethod(#this)}")
程式化事件外部化配置
spring-modulith-events-api 工件包含 EventExternalizationConfiguration,允許開發人員自定義上述所有步驟。
-
Java
-
Kotlin
@Configuration
class ExternalizationConfiguration {
@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {
return EventExternalizationConfiguration.externalizing() (1)
.select(EventExternalizationConfiguration.annotatedAsExternalized()) (2)
.mapping(SomeEvent.class, event -> …) (3)
.headers(event -> …) (4)
.routeKey(WithKeyProperty.class, WithKeyProperty::getKey) (5)
.build();
}
}
@Configuration
class ExternalizationConfiguration {
@Bean
fun eventExternalizationConfiguration(): EventExternalizationConfiguration {
EventExternalizationConfiguration.externalizing() (1)
.select(EventExternalizationConfiguration.annotatedAsExternalized()) (2)
.mapping(SomeEvent::class.java) { event -> … } (3)
.headers() { event -> … } (4)
.routeKey(WithKeyProperty::class.java, WithKeyProperty::getKey) (5)
.build()
}
}
| 1 | 我們首先建立一個 EventExternalizationConfiguration 的預設例項。 |
| 2 | 我們透過呼叫上一次呼叫返回的 Selector 例項上的 select(…) 方法之一來定製事件選擇。此步驟從根本上停用了應用程式基本包過濾器,因為我們現在只查詢註解。存在用於透過型別、包、包和註解輕鬆選擇事件的便捷方法。此外,還有一個快捷方式可以一步定義選擇和路由。 |
| 3 | 我們為 SomeEvent 例項定義一個對映步驟。請注意,路由仍將由原始事件例項確定,除非您另外在路由器上呼叫 ….routeMapped()。 |
| 4 | 我們將自定義頭部新增到要傳送的訊息中,可以是如所示的通用方式,也可以是特定於某個有效載荷型別的方式。 |
| 5 | 我們最終透過定義一個方法控制代碼來提取事件例項的值來確定路由鍵。或者,可以透過使用上一次呼叫返回的 Router 例項上的通用 route(…) 方法為單個事件生成完整的 RoutingKey。 |
序列化事件外部化
Spring Modulith 的事件外部化是作為事務性事件監聽器實現的。這意味著多個執行緒可能同時觸發與代理的互動。當事件釋出被重新提交時,這尤其重要。由於代理可能會突然出現互動高峰,一些互動可能需要更長的時間,因此後期事件的外部化可能會超過先前的事件。
為了防止這種情況,可以透過將 spring.modulith.events.externalization.serialize-externalization 屬性設定為 true 來序列化與代理的互動,從而一次只發送一個事件。
測試已釋出的事件
以下部分描述了一種僅關注跟蹤 Spring 應用程式事件的測試方法。有關使用 @ApplicationModuleListener 測試模組的更全面的方法,請檢視 Scenario API。 |
Spring Modulith 的 @ApplicationModuleTest 允許將 PublishedEvents 例項注入到測試方法中,以驗證在受測試的業務操作過程中是否釋出了特定的一組事件。
-
Java
-
Kotlin
@ApplicationModuleTest
class OrderIntegrationTests {
@Test
void someTestMethod(PublishedEvents events) {
// …
var matchingMapped = events.ofType(OrderCompleted.class)
.matching(OrderCompleted::getOrderId, reference.getId());
assertThat(matchingMapped).hasSize(1);
}
}
@ApplicationModuleTest
class OrderIntegrationTests {
@Test
fun someTestMethod(events: PublishedEvents events) {
// …
val matchingMapped = events.ofType(OrderCompleted::class.java)
.matching(OrderCompleted::getOrderId, reference.getId())
assertThat(matchingMapped).hasSize(1)
}
}
請注意 PublishedEvents 如何暴露一個 API 來選擇匹配特定條件的事件。驗證透過 AssertJ 斷言完成,該斷言驗證預期的元素數量。如果您無論如何都使用 AssertJ 進行這些斷言,您還可以使用 AssertablePublishedEvents 作為測試方法引數型別,並使用透過它提供的流暢斷言 API。
AssertablePublishedEvents 驗證事件釋出-
Java
-
Kotlin
@ApplicationModuleTest
class OrderIntegrationTests {
@Test
void someTestMethod(AssertablePublishedEvents events) {
// …
assertThat(events)
.contains(OrderCompleted.class)
.matching(OrderCompleted::getOrderId, reference.getId());
}
}
@ApplicationModuleTest
class OrderIntegrationTests {
@Test
fun someTestMethod(events: AssertablePublishedEvents) {
// …
assertThat(events)
.contains(OrderCompleted::class.java)
.matching(OrderCompleted::getOrderId, reference.getId())
}
}
請注意,assertThat(…) 表示式返回的型別允許直接定義已釋出事件的約束。