Message
Spring Integration Message 是一個通用資料容器。任何物件都可以作為負載提供,並且每個 Message 例項都包含以鍵值對形式的使用者可擴充套件屬性的頭部。
Message 介面
以下列表顯示了 Message 介面的定義
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
Message 介面是 API 的核心部分。透過將資料封裝在通用包裝器中,訊息系統可以在不知道資料型別的情況下傳遞它。隨著應用程式發展以支援新型別,或者當型別本身被修改或擴充套件時,訊息系統不會受到影響。另一方面,當訊息系統中的某些元件確實需要訪問有關 Message 的資訊時,此類元資料通常可以儲存到訊息頭部中的元資料並從中檢索。
訊息頭部
正如 Spring Integration 允許將任何 Object 用作 Message 的有效負載一樣,它也支援將任何 Object 型別用作頭部值。實際上,MessageHeaders 類實現了 java.util.Map_ 介面,如下面的類定義所示
public final class MessageHeaders implements Map<String, Object>, Serializable {
...
}
儘管 MessageHeaders 類實現了 Map,但它實際上是一個只讀實現。任何嘗試在 Map 中 put 值的操作都會導致 UnsupportedOperationException。remove 和 clear 也同樣適用。由於訊息可能會傳遞給多個消費者,因此 Map 的結構無法修改。同樣,訊息的有效負載 Object 在初始建立後不能 set。但是,頭部值本身(或有效負載 Object)的可變性有意地留作框架使用者的決定。 |
作為 Map 的實現,可以透過使用頭部名稱呼叫 get(..) 來檢索頭部。或者,您可以提供預期的 Class 作為附加引數。更好的是,當檢索預定義值之一時,可以使用方便的 getter。以下示例顯示了這三個選項中的每一個
Object someValue = message.getHeaders().get("someKey");
CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);
Long timestamp = message.getHeaders().getTimestamp();
下表描述了預定義的訊息頭部
| 頭部名稱 | 頭部型別 | 用法 |
|---|---|---|
MessageHeaders.ID |
java.util.UUID |
此訊息例項的識別符號。每次訊息被修改時都會改變。 |
MessageHeaders. TIMESTAMP |
java.lang.Long |
訊息建立的時間。每次訊息被修改時都會改變。 |
MessageHeaders. REPLY_CHANNEL |
java.lang.Object (String or MessageChannel) |
當沒有配置顯式輸出通道且沒有 |
MessageHeaders. ERROR_CHANNEL |
java.lang.Object (String or MessageChannel) |
用於傳送錯誤的通道。如果值為 |
許多入站和出站介面卡實現也提供或期望某些頭部,您可以配置其他使用者定義的頭部。這些頭部的常量可以在存在此類頭部的模組中找到,例如 AmqpHeaders、JmsHeaders 等等。
MessageHeaderAccessor API
從 Spring Framework 4.0 和 Spring Integration 4.0 開始,核心訊息抽象已移至 spring-messaging 模組,並引入了 MessageHeaderAccessor API 以提供對訊息實現的額外抽象。所有(核心)Spring Integration 特定的訊息頭部常量現在都在 IntegrationMessageHeaderAccessor 類中宣告。下表描述了預定義的訊息頭部
| 頭部名稱 | 頭部型別 | 用法 |
|---|---|---|
IntegrationMessageHeaderAccessor. CORRELATION_ID |
java.lang.Object |
用於關聯兩個或多個訊息。 |
IntegrationMessageHeaderAccessor. SEQUENCE_NUMBER |
java.lang.Integer |
通常是具有 |
IntegrationMessageHeaderAccessor. SEQUENCE_SIZE |
java.lang.Integer |
一組關聯訊息中的訊息數量。 |
IntegrationMessageHeaderAccessor. EXPIRATION_DATE |
java.lang.Long |
指示訊息何時過期。框架不直接使用,但可以透過頭部豐富器設定,並在配置了 |
IntegrationMessageHeaderAccessor. PRIORITY |
java.lang.Integer |
訊息優先順序——例如,在 |
IntegrationMessageHeaderAccessor. DUPLICATE_MESSAGE |
java.lang.Boolean |
如果訊息被冪等接收器攔截器檢測為重複。請參閱 冪等接收器企業整合模式。 |
IntegrationMessageHeaderAccessor. CLOSEABLE_RESOURCE |
java.io.Closeable |
如果訊息與一個 |
IntegrationMessageHeaderAccessor. DELIVERY_ATTEMPT |
java.lang. AtomicInteger |
如果訊息驅動通道介面卡支援配置 |
IntegrationMessageHeaderAccessor. ACKNOWLEDGMENT_CALLBACK |
o.s.i.support. Acknowledgment Callback |
如果入站端點支援,則回撥以接受、拒絕或重新排隊訊息。請參閱 延遲確認可輪詢訊息源 和 MQTT 手動確認。 |
IntegrationMessageHeaderAccessor 類上提供了其中一些頭部的便捷型別化 getter,如下例所示
IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...
下表描述了也出現在 IntegrationMessageHeaderAccessor 中但通常不被使用者程式碼使用(即,它們通常被 Spring Integration 內部部分使用——此處包含它們是為了完整性)的頭部
| 頭部名稱 | 頭部型別 | 用法 |
|---|---|---|
IntegrationMessageHeaderAccessor. SEQUENCE_DETAILS |
java.util. List<List<Object>> |
當需要巢狀關聯時(例如, |
IntegrationMessageHeaderAccessor. ROUTING_SLIP |
java.util. Map<List<Object>, Integer> |
請參閱 路由單。 |
訊息 ID 生成
當訊息透過應用程式轉換時,每次修改它時(例如,透過轉換器),都會分配一個新的訊息 ID。訊息 ID 是一個 UUID。從 Spring Integration 3.0 開始,用於 IS 生成的預設策略比以前的 java.util.UUID.randomUUID() 實現更有效。它使用基於安全隨機種子的簡單隨機數,而不是每次都建立一個安全隨機數。
可以透過在應用程式上下文中宣告一個實現 org.springframework.util.IdGenerator 的 bean 來選擇不同的 UUID 生成策略。
在一個類載入器中只能使用一個 UUID 生成策略。這意味著如果兩個或多個應用程式上下文在同一個類載入器中執行,它們將共享相同的策略。如果其中一個上下文更改了策略,則所有上下文都將使用該策略。如果同一個類載入器中的兩個或多個上下文聲明瞭型別為 org.springframework.util.IdGenerator 的 bean,則它們都必須是同一類的例項。否則,嘗試替換自定義策略的上下文將無法初始化。如果策略相同,但引數化,則使用第一個初始化的上下文中的策略。 |
除了預設策略之外,還提供了另外兩個 IdGenerators。org.springframework.util.JdkIdGenerator 使用之前的 UUID.randomUUID() 機制。當不需要 UUID 並且簡單的遞增值就足夠時,您可以使用 o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator。
只讀頭部
MessageHeaders.ID 和 MessageHeaders.TIMESTAMP 是隻讀頭部,不能被覆蓋。
從 4.3.2 版本開始,MessageBuilder 提供了 readOnlyHeaders(String… readOnlyHeaders) API,用於自定義不應從上游 Message 複製的頭部列表。預設情況下,只有 MessageHeaders.ID 和 MessageHeaders.TIMESTAMP 是隻讀的。提供了全域性 spring.integration.readOnly.headers 屬性(參見 全域性屬性)來為框架元件自定義 DefaultMessageBuilderFactory。當您不想由 ObjectToJsonTransformer(參見 JSON 轉換器)填充一些開箱即用的頭部(例如 contentType)時,這會很有用。
當您嘗試使用 MessageBuilder 構建新訊息時,此類頭部將被忽略,並且會在日誌中發出特定的 INFO 訊息。
頭部傳播
當訊息生產者端點(例如 服務啟用器)處理(和修改)訊息時,通常,入站頭部會傳播到出站訊息。一個例外是 轉換器,當一個完整的訊息返回到框架時。在這種情況下,使用者程式碼負責整個出站訊息。當轉換器只返回有效負載時,入站頭部會傳播。此外,只有當頭部在出站訊息中尚不存在時才會傳播,從而允許您根據需要更改頭部值。
從 4.3.10 版本開始,您可以配置訊息處理器(修改訊息併產生輸出)以抑制特定頭部的傳播。要配置您不想複製的頭部,請在 MessageProducingMessageHandler 抽象類上呼叫 setNotPropagatedHeaders() 或 addNotPropagatedHeaders() 方法。
您還可以透過將 META-INF/spring.integration.properties 中的 readOnlyHeaders 屬性設定為逗號分隔的頭部列表來全域性抑制特定訊息頭部的傳播。
從 5.0 版本開始,AbstractMessageProducingHandler 上的 setNotPropagatedHeaders() 實現應用簡單模式(xxx*、*xxx、*xxx* 或 xxx*yyy)以允許使用通用字尾或字首過濾頭部。有關更多資訊,請參閱 PatternMatchUtils Javadoc。當其中一個模式是 *(星號)時,不傳播任何頭部。所有其他模式都將被忽略。在這種情況下,服務啟用器與轉換器的行為方式相同,並且任何所需的頭部都必須在服務方法返回的 Message 中提供。notPropagatedHeaders() 選項在 Java DSL 的 ConsumerEndpointSpec 中可用。它也可用於 <service-activator> 元件的 XML 配置,作為 not-propagated-headers 屬性。
訊息實現
Message 介面的基本實現是 GenericMessage<T>,它提供兩個建構函式,如下所示
new GenericMessage<T>(T payload);
new GenericMessage<T>(T payload, Map<String, Object> headers)
當建立 Message 時,會生成一個隨機的唯一 ID。接受 Map 頭部的建構函式會將提供的頭部複製到新建立的 Message 中。
還有一個方便的 Message 實現,旨在傳達錯誤情況。此實現以 Throwable 物件作為其有效負載,如下例所示
ErrorMessage message = new ErrorMessage(someThrowable);
Throwable t = message.getPayload();
請注意,此實現利用了 GenericMessage 基類引數化的事實。因此,如兩個示例所示,在檢索 Message 有效負載 Object 時不需要進行任何強制轉換。
上述 Message 類實現是不可變的。在某些情況下,當可變性不是問題並且應用程式邏輯設計良好以避免併發修改時,可以使用 MutableMessage。
MessageBuilder 輔助類
您可能會注意到 Message 介面定義了其有效負載和頭部的檢索方法,但沒有提供 setter。其原因是 Message 在初始建立後不能被修改。因此,當 Message 例項傳送給多個消費者時(例如,透過釋出/訂閱通道),如果其中一個消費者需要傳送帶有不同有效負載型別的回覆,它必須建立一個新的 Message。這樣,其他消費者就不會受到這些更改的影響。請記住,多個消費者可能訪問相同的有效負載例項或頭部值,並且此類例項本身是否不可變由您決定。換句話說,Message 例項的契約類似於不可修改的 Collection,MessageHeaders 對映進一步證明了這一點。儘管 MessageHeaders 類實現了 java.util.Map,但任何嘗試對 MessageHeaders 例項呼叫 put 操作(或“remove”或“clear”)都會導致 UnsupportedOperationException。
Spring Integration 提供了一種更方便的方法來構建訊息,而不是要求建立和填充 Map 以傳遞給 GenericMessage 建構函式:MessageBuilder。MessageBuilder 提供兩種工廠方法,用於從現有 Message 或使用有效負載 Object 建立 Message 例項。當從現有 Message 構建時,該 Message 的頭部和有效負載將複製到新的 Message 中,如下例所示
Message<String> message1 = MessageBuilder.withPayload("test")
.setHeader("foo", "bar")
.build();
Message<String> message2 = MessageBuilder.fromMessage(message1).build();
assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));
如果您需要建立具有新有效負載但仍希望從現有 Message 複製頭部的 Message,您可以使用其中一個“copy”方法,如下例所示
Message<String> message3 = MessageBuilder.withPayload("test3")
.copyHeaders(message1.getHeaders())
.build();
Message<String> message4 = MessageBuilder.withPayload("test4")
.setHeader("foo", 123)
.copyHeadersIfAbsent(message1.getHeaders())
.build();
assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));
請注意,copyHeadersIfAbsent 方法不會覆蓋現有值。此外,在上面的示例中,您可以看到如何使用 setHeader 設定任何使用者定義的頭部。最後,預定義頭部也提供了 set 方法,以及一個用於設定任何頭部(MessageHeaders 也定義了預定義頭部名稱的常量)的非破壞性方法。
您還可以使用 MessageBuilder 設定訊息的優先順序,如下例所示
Message<Integer> importantMessage = MessageBuilder.withPayload(99)
.setPriority(5)
.build();
assertEquals(5, importantMessage.getHeaders().getPriority());
Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
.setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
.build();
assertEquals(2, lessImportantMessage.getHeaders().getPriority());
priority 頭部僅在使用 PriorityChannel 時才會被考慮(如下一章所述)。它被定義為 java.lang.Integer。
提供了 MutableMessageBuilder 來處理 MutableMessage 例項。此類的邏輯是建立 MutableMessage 或保持原樣,並透過構建器方法修改其內容。這樣,當不可變性不是訊息交換的關注點時,執行中的應用程式的效能會略有提高。
從版本 6.4 開始,BaseMessageBuilder 類從 MessageBuilder 中提取出來,以簡化對預設訊息構建邏輯的擴充套件。例如,結合自定義 MessageBuilderFactory,自定義 BaseMessageBuilder 實現可以在應用程式上下文中全域性使用,以提供自定義 Message 例項。特別是,可以覆蓋 GenericMessage.toString() 方法,以便在記錄此類訊息時隱藏有效負載和頭部中的敏感資訊。 |
MessageBuilderFactory 抽象
名為 IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME 的 MessageBuilderFactory bean 被全域性註冊到應用程式上下文中,並在框架的任何地方用於建立 Message 例項。預設情況下,它是一個 DefaultMessageBuilderFactory 例項。開箱即用,框架還提供了一個 MutableMessageBuilderFactory,用於在框架元件中建立 MutableMessage 例項。為了自定義 Message 例項的建立,必須在目標應用程式上下文中提供一個名為 IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME 的 MessageBuilderFactory bean,以覆蓋預設的。例如,可以為 BaseMessageBuilder 的實現註冊一個自定義 MessageBuilderFactory,在該實現中,我們希望提供一個擴充套件了 GenericMessage 並覆蓋了 toString() 的例項,以便在記錄此類訊息時隱藏有效負載和頭部中的敏感資訊。
這些類的快速實現可以像這樣演示個人身份資訊緩解
class PiiMessageBuilderFactory implements MessageBuilderFactory {
@Override
public <T> PiiMessageBuilder<T> fromMessage(Message<T> message) {
return new PiiMessageBuilder<>(message.getPayload(), message);
}
@Override
public <T> PiiMessageBuilder<T> withPayload(T payload) {
return new PiiMessageBuilder<>(payload, null);
}
}
class PiiMessageBuilder<P> extends BaseMessageBuilder<P, PiiMessageBuilder<P>> {
public PiiMessageBuilder(P payload, @Nullable Message<P> originalMessage) {
super(payload, originalMessage);
}
@Override
public Message<P> build() {
return new PiiMessage<>(getPayload(), getHeaders());
}
}
class PiiMessage<P> extends GenericMessage<P> {
@Serial
private static final long serialVersionUID = -354503673433669578L;
public PiiMessage(P payload, Map<String, Object> headers) {
super(payload, headers);
}
@Override
public String toString() {
return "PiiMessage [payload=" + getPayload() + ", headers=" + maskHeaders(getHeaders()) + ']';
}
private static Map<String, Object> maskHeaders(Map<String, Object> headers) {
return headers.entrySet()
.stream()
.map((entry) -> entry.getKey().equals("password") ? Map.entry(entry.getKey(), "******") : entry)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
然後可以將此 PiiMessageBuilderFactory 註冊為 bean,每當框架記錄訊息時(例如在 errorChannel 的情況下),password 頭部將被遮蔽。