使用應用事件
為了讓應用模組之間儘可能地解耦,它們主要的互動方式應該是事件的釋出和消費。這避免了發起模組瞭解所有可能感興趣的各方,這是實現應用模組整合測試(參見 應用模組整合測試)的關鍵方面。
我們通常會發現應用元件的定義如下
-
Java
-
Kotlin
@Service
@RequiredArgsConstructor
public class OrderManagement {
private final InventoryManagement inventory;
@Transactional
public void complete(Order order) {
// State transition on the order aggregate go here
// Invoke related functionality
inventory.updateStockFor(order);
}
}
@Service
class OrderManagement(val inventory: InventoryManagement) {
@Transactional
fun complete(order: Order) {
inventory.updateStockFor(order)
}
}
complete(…)
方法會在某種程度上產生功能引力,它會吸引相關的功能,從而與在其他應用模組中定義的 Spring bean 進行互動。這尤其使得該元件更難測試,因為僅僅建立 OrderManagement
的例項就需要那些依賴的 bean 的可用例項(參見 處理外向依賴)。這也意味著無論何時我們希望與業務事件的訂單完成進一步整合功能,都必須修改該類。
我們可以按如下方式改變應用模組的互動
-
Java
-
Kotlin
@Service
@RequiredArgsConstructor
public class OrderManagement {
private final ApplicationEventPublisher events;
private final OrderInternal dependency;
@Transactional
public void complete(Order order) {
// State transition on the order aggregate go here
events.publishEvent(new OrderCompleted(order.getId()));
}
}
@Service
class OrderManagement(val events: ApplicationEventPublisher, val dependency: OrderInternal) {
@Transactional
fun complete(order: Order) {
events.publishEvent(OrderCompleted(order.id))
}
}
注意,我們沒有依賴其他應用模組的 Spring bean,而是使用 Spring 的 ApplicationEventPublisher
在主聚合上完成狀態轉換後釋出一個領域事件。有關更面向聚合的事件釋出方法,請參閱 Spring Data 的應用事件釋出機制瞭解詳情。由於事件釋出預設是同步發生的,因此整體安排的事務語義與上述示例相同。這既有好處,因為我們獲得了一個非常簡單的最終一致性模型(訂單的狀態更改和庫存更新要麼都成功,要麼都不成功),但也有壞處,因為更多觸發的相關功能將擴大事務邊界,並可能導致整個事務失敗,即使導致錯誤的功能並不關鍵。
另一種方法是將事件消費轉移到事務提交時的非同步處理,並將次要功能正是如此對待
-
Java
-
Kotlin
@Component
class InventoryManagement {
@Async
@TransactionalEventListener
void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {
@Async
@TransactionalEventListener
fun on(event: OrderCompleted) { /* … */ }
}
這有效地將原始事務與監聽器的執行解耦。雖然這避免了原始業務事務的擴充套件,但也帶來了一個風險:如果監聽器因任何原因失敗,事件釋出將丟失,除非每個監聽器都實現了自己的安全網。更糟糕的是,這甚至不能完全奏效,因為系統可能在方法被呼叫之前就失敗了。
應用模組監聽器
要在事務中執行事務性事件監聽器,它需要相應地用 @Transactional
進行註解。
-
Java
-
Kotlin
@Component
class InventoryManagement {
@Async
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener
void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {
@Async
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener
fun on(event: OrderCompleted) { /* … */ }
}
為了簡化透過事件整合模組的預設方式的宣告,Spring Modulith 提供了 @ApplicationModuleListener
作為快捷方式。
-
Java
-
Kotlin
@Component
class InventoryManagement {
@ApplicationModuleListener
void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {
@ApplicationModuleListener
fun on(event: OrderCompleted) { /* … */ }
}
事件釋出登錄檔
Spring Modulith 附帶了一個事件釋出登錄檔,它與 Spring Framework 的核心事件釋出機制掛鉤。在事件釋出時,它會發現哪些事務性事件監聽器將接收到事件,並在原始業務事務中為每個監聽器(深藍色)寫入一個條目到事件釋出日誌中。

每個事務性事件監聽器都被包裝在一個切面中,如果監聽器執行成功,該切面將該日誌條目標記為已完成。如果監聽器失敗,日誌條目保持不變,以便可以根據應用的需要部署重試機制。可以透過 spring.modulith.events.republish-outstanding-events-on-restart
屬性啟用事件的自動重新發布。

Spring Boot 事件登錄檔 Starter
使用事務性事件釋出日誌需要向您的應用新增一組構件。為了簡化這項任務,Spring Modulith 提供了圍繞要使用的持久化技術而設計的 starter POM,並預設使用基於 Jackson 的 EventSerializer
實現。以下 starter 可用
持久化技術 | 構件 | 描述 |
---|---|---|
JPA |
|
使用 JPA 作為持久化技術。 |
JDBC |
|
使用 JDBC 作為持久化技術。也可用於基於 JPA 的應用,但繞過您的 JPA 提供程式進行實際的事件持久化。 |
MongoDB |
|
使用 MongoDB 作為持久化技術。也啟用 MongoDB 事務,並需要伺服器配置為副本集才能進行互動。透過將 |
Neo4j |
|
在 Spring Data Neo4j 後面使用 Neo4j。 |
管理事件釋出
事件釋出可能需要在應用執行時以多種方式進行管理。未完成的釋出可能需要在一定時間後重新提交給相應的監聽器。另一方面,已完成的釋出很可能需要從資料庫中清除或移動到歸檔儲存中。由於這類清理工作的需求因應用而異,Spring Modulith 提供了一個 API 來處理這兩種型別的釋出。該 API 透過您可以新增到應用的 spring-modulith-events-api
構件提供
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-events-api</artifactId>
<version>1.3.5</version>
</dependency>
dependencies {
implementation 'org.springframework.modulith:spring-modulith-events-api:1.3.5'
}
此構件包含兩個主要抽象,它們作為 Spring Bean 可供應用程式碼使用
-
CompletedEventPublications
— 該介面允許訪問所有已完成的事件釋出,並提供一個 API,用於立即將它們全部從資料庫中清除,或清除早於指定時長(例如,1 分鐘)的已完成釋出。 -
IncompleteEventPublications
— 該介面允許訪問所有未完成的事件釋出,以便重新提交符合指定謂詞或早於相對於原始釋出日期的指定Duration
的釋出。
事件釋出完成
當事務性或 @ApplicationModuleListener
執行成功完成後,事件釋出會被標記為已完成。預設情況下,完成透過在 EventPublication
上設定完成日期來註冊。這意味著已完成的釋出將保留在事件釋出登錄檔中,以便可以透過上面描述的 CompletedEventPublications
介面進行檢查。這樣做的一個後果是,您需要編寫一些程式碼來定期清除舊的、已完成的 EventPublication
。否則,它們的持久化抽象(例如關係資料庫表)將無限增長,並且與儲存進行互動建立和完成新的 EventPublication
可能會變慢。
Spring Modulith 1.3 引入了一個配置屬性 spring.modulith.events.completion-mode
來支援另外兩種完成模式。它預設為 UPDATE
,由上面描述的策略支援。或者,可以將完成模式設定為 DELETE
,這將更改登錄檔的持久化機制,改為在完成時刪除 EventPublication
。這意味著 CompletedEventPublications
將不再返回任何釋出,但同時,您也不必再手動從持久化儲存中清除已完成的事件。
第三種選項是 ARCHIVE
模式,它將條目複製到歸檔表、集合或節點中。對於該歸檔條目,將設定完成日期並移除原始條目。與 DELETE
模式相反,已完成的事件釋出仍然可以透過 CompletedEventPublications
抽象進行訪問。
事件釋出倉庫
為了實際寫入事件釋出日誌,Spring Modulith 暴露了一個 EventPublicationRepository
SPI 以及支援事務的流行持久化技術的實現,如 JPA、JDBC 和 MongoDB。透過將相應的 JAR 新增到您的 Spring Modulith 應用來選擇要使用的持久化技術。我們已準備好專用的 starter 來簡化該任務。
基於 JDBC 的實現可以在設定相應的配置屬性 (spring.modulith.events.jdbc.schema-initialization.enabled
) 為 true
時,為事件釋出日誌建立一個專用表。詳情請查閱附錄中的 Schema 概覽。
外部化事件
應用模組之間交換的一些事件可能對外部系統感興趣。Spring Modulith 允許將選定的事件釋出到各種訊息代理。要使用該支援,您需要執行以下步驟
-
將特定於代理的 Spring Modulith 構件新增到您的專案中。
-
透過使用 Spring Modulith 或 jMolecules 的
@Externalized
註解來標記要外部化的事件型別。 -
在註解的值中指定特定於代理的路由目標。
要了解如何使用其他方式選擇要外部化的事件,或自定義它們在代理內的路由,請查閱 事件外部化基礎知識。
支援的基礎設施
代理 | 構件 | 描述 |
---|---|---|
Kafka |
|
使用 Spring Kafka 與代理互動。邏輯路由鍵將用作 Kafka 的 Topic 和 Message Key。 |
AMQP |
|
使用 Spring AMQP 與任何相容代理互動。例如,需要顯式宣告對 Spring Rabbit 的依賴。邏輯路由鍵將用作 AMQP 路由鍵。 |
JMS |
|
使用 Spring 的核心 JMS 支援。不支援路由鍵。 |
SQS |
|
已棄用(詳情參見 此處)。使用 Spring Cloud AWS SQS 支援。邏輯路由鍵將用作 SQS 訊息組 ID。當設定路由鍵時,要求 SQS 佇列配置為 FIFO 佇列。 |
SNS |
|
已棄用(詳情參見 此處)。使用 Spring Cloud AWS SNS 支援。邏輯路由鍵將用作 SNS 訊息組 ID。當設定路由鍵時,要求 SNS 配置為啟用基於內容的去重功能的 FIFO Topic。 |
Spring Messaging |
|
使用 Spring 的核心 |
事件外部化基礎知識
事件外部化對釋出的每個應用事件執行三個步驟。
-
確定事件是否應該被外部化——我們稱之為“事件選擇”。預設情況下,只有位於 Spring Boot 自動配置包內並用支援的
@Externalized
註解之一標記的事件型別才會被選擇進行外部化。 -
準備訊息(可選)——預設情況下,事件由相應的代理基礎設施按原樣序列化。可選的對映步驟允許開發者自定義或甚至完全替換原始事件,使用適合外部方的 Payload。對於 Kafka 和 AMQP,開發者還可以向要釋出的訊息新增 Header。
-
確定路由目標——訊息代理客戶端需要一個邏輯目標來發布訊息。目標通常標識物理基礎設施(Topic、Exchange 或 Queue,取決於代理),並且通常靜態地從事件型別派生。除非在
@Externalized
註解中明確定義,Spring Modulith 使用應用本地型別名稱作為目標。換句話說,在一個基礎包為com.acme.app
的 Spring Boot 應用中,事件型別com.acme.app.sample.SampleEvent
將釋出到sample.SampleEvent
。一些代理還允許定義一個更具動態性的路由鍵,它用於實際目標內的不同目的。預設情況下,不使用路由鍵。
基於註解的事件外部化配置
要透過 @Externalized
註解定義自定義路由鍵,可以在各個註解中的 target/value 屬性使用 $target::$key
模式。target 和 key 都可以是一個 SpEL 表示式,事件例項將被配置為根物件。
-
Java
-
Kotlin
@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {
String getLastname() { (1)
// …
}
}
@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {
fun getLastname(): String { (1)
// …
}
}
CustomerCreated
事件透過 accessor 方法暴露客戶的姓氏。然後該方法透過目標宣告的 ::
分隔符後的 key 表示式 #this.getLastname()
使用。
如果鍵的計算變得更復雜,建議將其委託給一個接受事件作為引數的 Spring bean。
-
Java
-
Kotlin
@Externalized("…::#{@beanName.someMethod(#this)}")
@Externalized("…::#{@beanName.someMethod(#this)}")
程式設計式事件外部化配置
spring-modulith-events-api
構件包含 EventExternalizationConfiguration
,允許開發者自定義上述所有步驟。
-
Java
-
Kotlin
@Configuration
class ExternalizationConfiguration {
@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {
return EventExternalizationConfiguration.externalizing() (1)
.select(EventExternalizationConfiguration.annotatedAsExternalized()) (2)
.mapping(SomeEvent.class, event -> …) (3)
.headers(event -> …) (4)
.routeKey(WithKeyProperty.class, WithKeyProperty::getKey) (5)
.build();
}
}
@Configuration
class ExternalizationConfiguration {
@Bean
fun eventExternalizationConfiguration(): EventExternalizationConfiguration {
EventExternalizationConfiguration.externalizing() (1)
.select(EventExternalizationConfiguration.annotatedAsExternalized()) (2)
.mapping(SomeEvent::class.java) { event -> … } (3)
.headers() { event -> … } (4)
.routeKey(WithKeyProperty::class.java, WithKeyProperty::getKey) (5)
.build()
}
}
1 | 我們首先建立一個預設的 EventExternalizationConfiguration 例項。 |
2 | 我們透過呼叫上一步返回的 Selector 例項上的一個或多個 select(…) 方法來定製事件選擇。這一步基本上停用了應用基礎包過濾器,因為我們現在只查詢註解。存在按型別、按包、按包和註解輕鬆選擇事件的便捷方法。此外,還有一步定義選擇和路由的快捷方式。 |
3 | 我們為 SomeEvent 例項定義一個對映步驟。請注意,路由仍將由原始事件例項確定,除非您額外呼叫 router 上的 ….routeMapped() 方法。 |
4 | 我們為要傳送的訊息新增自定義 Header,可以像所示的那樣通用新增,也可以針對特定 Payload 型別進行新增。 |
5 | 我們最終透過定義一個方法控制代碼來確定路由鍵,以提取事件例項的一個值。另外,可以透過使用上一步呼叫返回的 Router 例項上的通用 route(…) 方法為單個事件生成完整的 RoutingKey 。 |
測試已釋出的事件
以下部分描述了一種僅關注跟蹤 Spring 應用事件的測試方法。對於使用 @ApplicationModuleListener 的模組測試的更全面的方法,請檢視 Scenario API 。 |
Spring Modulith 的 @ApplicationModuleTest
使能夠將 PublishedEvents
例項注入到測試方法中,以驗證在被測試的業務操作過程中是否釋出了特定的一組事件。
-
Java
-
Kotlin
@ApplicationModuleTest
class OrderIntegrationTests {
@Test
void someTestMethod(PublishedEvents events) {
// …
var matchingMapped = events.ofType(OrderCompleted.class)
.matching(OrderCompleted::getOrderId, reference.getId());
assertThat(matchingMapped).hasSize(1);
}
}
@ApplicationModuleTest
class OrderIntegrationTests {
@Test
fun someTestMethod(events: PublishedEvents events) {
// …
val matchingMapped = events.ofType(OrderCompleted::class.java)
.matching(OrderCompleted::getOrderId, reference.getId())
assertThat(matchingMapped).hasSize(1)
}
}
注意 PublishedEvents
如何公開一個 API 來選擇匹配特定標準的事件。驗證是透過 AssertJ 斷言來完成的,該斷言驗證預期的元素數量。如果您本來就使用 AssertJ 進行這些斷言,您也可以使用 AssertablePublishedEvents
作為測試方法引數型別,並使用透過它提供的流式斷言 API。
-
Java
-
Kotlin
@ApplicationModuleTest
class OrderIntegrationTests {
@Test
void someTestMethod(AssertablePublishedEvents events) {
// …
assertThat(events)
.contains(OrderCompleted.class)
.matching(OrderCompleted::getOrderId, reference.getId());
}
}
@ApplicationModuleTest
class OrderIntegrationTests {
@Test
fun someTestMethod(events: AssertablePublishedEvents) {
// …
assertThat(events)
.contains(OrderCompleted::class.java)
.matching(OrderCompleted::getOrderId, reference.getId())
}
}
注意 assertThat(…)
表示式返回的型別如何允許直接對已釋出的事件定義約束。