訊息
Spring Integration Message
是一個通用的資料容器。任何物件都可以作為負載 (payload),並且每個 Message
例項都包含帶有使用者可擴充套件屬性的頭 (headers),這些屬性以鍵值對的形式儲存。
Message
介面
以下列表展示了 Message
介面的定義
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
Message
介面是 API 的核心部分。透過將資料封裝在通用包裝器中,訊息系統可以在不瞭解資料型別的情況下傳遞資料。隨著應用程式演進以支援新型別,或者型別本身被修改或擴充套件時,訊息系統不會受到影響。另一方面,當訊息系統中的某些元件確實需要訪問有關 Message
的資訊時,此類元資料通常可以儲存到訊息頭中並從中檢索。
訊息頭 (Message Headers)
正如 Spring Integration 允許任何 Object
用作 Message
的負載一樣,它也支援任何 Object
型別作為頭值。實際上,MessageHeaders
類實現了 java.util.Map_ interface
,如下面的類定義所示
public final class MessageHeaders implements Map<String, Object>, Serializable {
...
}
儘管 MessageHeaders 類實現了 Map ,但它實際上是一個只讀實現。任何嘗試在 Map 中 put 值都會導致 UnsupportedOperationException 。對於 remove 和 clear 也是如此。由於訊息可能傳遞給多個消費者,因此不能修改 Map 的結構。同樣,訊息的負載 Object 在初次建立後不能被 set 。然而,頭值本身(或負載 Object)的可變性有意留給框架使用者自行決定。 |
作為 Map
的實現,可以透過呼叫帶有頭名稱的 get(..)
來檢索頭。或者,您可以提供預期的 Class
作為附加引數。更好的是,當檢索預定義值時,提供了方便的 getter 方法。以下示例展示了這三種選項
Object someValue = message.getHeaders().get("someKey");
CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);
Long timestamp = message.getHeaders().getTimestamp();
下表描述了預定義的訊息頭
頭名稱 | 頭型別 | 用途 |
---|---|---|
MessageHeaders.ID |
java.util.UUID |
此訊息例項的識別符號。每次訊息被修改時都會改變。 |
MessageHeaders. TIMESTAMP |
java.lang.Long |
訊息建立的時間。每次訊息被修改時都會改變。 |
MessageHeaders. REPLY_CHANNEL |
java.lang.Object (String or MessageChannel) |
當未配置顯式輸出通道,且沒有 |
MessageHeaders. ERROR_CHANNEL |
java.lang.Object (String or MessageChannel) |
錯誤傳送到的通道。如果值是 |
許多入站和出站介面卡實現也提供或期望某些頭,並且您可以配置額外的使用者定義頭。這些頭的常量可以在存在這些頭的模組中找到——例如 AmqpHeaders
、JmsHeaders
等等。
MessageHeaderAccessor
API
從 Spring Framework 4.0 和 Spring Integration 4.0 開始,核心訊息抽象已移至 spring-messaging
模組,並引入了 MessageHeaderAccessor
API,以提供對訊息實現的額外抽象。所有 (核心) Spring Integration 特定訊息頭常量現在都在 IntegrationMessageHeaderAccessor
類中宣告。下表描述了預定義的訊息頭
頭名稱 | 頭型別 | 用途 |
---|---|---|
IntegrationMessageHeaderAccessor. CORRELATION_ID |
java.lang.Object |
用於關聯兩個或更多訊息。 |
IntegrationMessageHeaderAccessor. SEQUENCE_NUMBER |
java.lang.Integer |
通常是帶有 |
IntegrationMessageHeaderAccessor. SEQUENCE_SIZE |
java.lang.Integer |
一組關聯訊息中的訊息數量。 |
IntegrationMessageHeaderAccessor. EXPIRATION_DATE |
java.lang.Long |
指示訊息何時過期。框架不直接使用,但可以透過 Header Enricher 設定,並在配置了 |
IntegrationMessageHeaderAccessor. PRIORITY |
java.lang.Integer |
訊息優先順序——例如,在 |
IntegrationMessageHeaderAccessor. DUPLICATE_MESSAGE |
java.lang.Boolean |
如果訊息被冪等接收者攔截器檢測為重複。參見 冪等接收者企業整合模式。 |
IntegrationMessageHeaderAccessor. CLOSEABLE_RESOURCE |
java.io.Closeable |
如果訊息與一個處理完成後應關閉的 |
IntegrationMessageHeaderAccessor. DELIVERY_ATTEMPT |
java.lang. AtomicInteger |
如果訊息驅動通道介面卡支援配置 |
IntegrationMessageHeaderAccessor. ACKNOWLEDGMENT_CALLBACK |
o.s.i.support. Acknowledgment Callback |
如果入站端點支援,則會有一個回撥函式用於接受、拒絕或重新入隊訊息。參見 延遲確認可輪詢訊息源 和 MQTT 手動確認。 |
IntegrationMessageHeaderAccessor
類提供了其中一些頭的便捷型別化 getter,如下例所示
IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...
下表描述了也出現在 IntegrationMessageHeaderAccessor
中但通常不由使用者程式碼使用的頭(即,它們通常由 Spring Integration 的內部部分使用——此處包含它們是為了完整性)
頭名稱 | 頭型別 | 用途 |
---|---|---|
IntegrationMessageHeaderAccessor. SEQUENCE_DETAILS |
java.util. List<List<Object>> |
當需要巢狀關聯時使用的關聯資料堆疊(例如, |
IntegrationMessageHeaderAccessor. ROUTING_SLIP |
java.util. Map<List<Object>, Integer> |
參見 路由單。 |
訊息 ID 生成
當訊息透過應用程式轉換時,每次被修改(例如,透過轉換器)都會分配一個新的訊息 ID。訊息 ID 是一個 UUID
。從 Spring Integration 3.0 開始,用於 ID 生成的預設策略比以前的 java.util.UUID.randomUUID()
實現更高效。它基於安全的隨機種子使用簡單的隨機數,而不是每次都建立安全的隨機數。
可以透過在應用程式上下文中宣告一個實現 org.springframework.util.IdGenerator
介面的 Bean 來選擇不同的 UUID 生成策略。
在一個 ClassLoader 中只能使用一種 UUID 生成策略。這意味著,如果在同一個 ClassLoader 中執行兩個或更多應用程式上下文,它們會共享相同的策略。如果其中一個上下文更改了策略,所有上下文都會使用它。如果在同一個 ClassLoader 中的兩個或更多上下文聲明瞭型別為 org.springframework.util.IdGenerator 的 Bean,它們必須是同一類的例項。否則,嘗試替換自定義策略的上下文將無法初始化。如果策略相同,但引數化不同,則使用第一個被初始化的上下文中的策略。 |
除了預設策略外,還提供了另外兩個 IdGenerators
。org.springframework.util.JdkIdGenerator
使用之前的 UUID.randomUUID()
機制。當實際上不需要 UUID 並且簡單的遞增值就足夠時,您可以使用 o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator
。
只讀頭
MessageHeaders.ID
和 MessageHeaders.TIMESTAMP
是隻讀頭,不能被覆蓋。
從版本 4.3.2 開始,MessageBuilder
提供了 readOnlyHeaders(String… readOnlyHeaders)
API,用於自定義不應從上游 Message
複製的頭列表。預設情況下,只有 MessageHeaders.ID
和 MessageHeaders.TIMESTAMP
是隻讀的。提供了全域性屬性 spring.integration.readOnly.headers
(參見 全域性屬性)來為框架元件自定義 DefaultMessageBuilderFactory
。當您不想讓 ObjectToJsonTransformer
等元件填充某些現成的頭(例如 contentType
)時,這非常有用(參見 JSON 轉換器)。
當您嘗試使用 MessageBuilder
構建新訊息時,此類頭將被忽略,並且會在日誌中發出特定的 INFO
訊息。
從版本 5.0 開始,當使用 DefaultMessageBuilderFactory
時,訊息閘道器、Header Enricher、內容增強器 和 Header Filter 不允許您配置 MessageHeaders.ID
和 MessageHeaders.TIMESTAMP
頭名稱,並且會丟擲 BeanInitializationException
。
頭傳播
當訊息被產生訊息的端點(例如 服務啟用器)處理(和修改)時,通常,入站頭會傳播到出站訊息。一個例外是 轉換器,當完整的訊息返回給框架時。在這種情況下,使用者程式碼負責整個出站訊息。當轉換器只返回負載時,入站頭會被傳播。此外,只有當出站訊息中不存在該頭時,頭才會被傳播,從而允許您根據需要更改頭值。
從版本 4.3.10 開始,您可以配置訊息處理器(它們修改訊息併產生輸出)以抑制特定頭的傳播。要配置您不想複製的頭,請在 MessageProducingMessageHandler
抽象類上呼叫 setNotPropagatedHeaders()
或 addNotPropagatedHeaders()
方法。
您還可以透過在 META-INF/spring.integration.properties
中將 readOnlyHeaders
屬性設定為逗號分隔的頭列表來全域性抑制特定訊息頭的傳播。
從版本 5.0 開始,AbstractMessageProducingHandler
上的 setNotPropagatedHeaders()
實現應用簡單的模式 (xxx*
, *xxx
, *xxx*
, 或 xxx*yyy
) 來允許過濾具有共同字尾或字首的頭。有關更多資訊,請參閱 PatternMatchUtils
Javadoc。當其中一個模式是 *
(星號)時,不傳播任何頭。所有其他模式都被忽略。在這種情況下,服務啟用器的行為與轉換器相同,任何必需的頭必須在服務方法返回的 Message
中提供。notPropagatedHeaders()
選項在 Java DSL 的 ConsumerEndpointSpec
中可用。它也作為 <service-activator>
元件的 XML 配置中的 not-propagated-headers
屬性可用。
訊息實現
Message
介面的基本實現是 GenericMessage<T>
,它提供兩個建構函式,如下列表所示
new GenericMessage<T>(T payload);
new GenericMessage<T>(T payload, Map<String, Object> headers)
建立 Message
時,會生成一個隨機的唯一 ID。接受 Map
作為頭的建構函式會將提供的頭複製到新建立的 Message
中。
還有一個方便的 Message
實現,用於表示錯誤條件。此實現接受一個 Throwable
物件作為其負載,如下例所示
ErrorMessage message = new ErrorMessage(someThrowable);
Throwable t = message.getPayload();
請注意,此實現利用了 GenericMessage
基類是引數化的這一事實。因此,如兩個示例所示,檢索 Message
負載 Object
時無需進行強制轉換。
前面提到的 Message
類實現是不可變的。在某些情況下,當可變性不是問題且應用程式邏輯設計良好以避免併發修改時,可以使用 MutableMessage
。
MessageBuilder
幫助類
您可能注意到 Message
介面定義了用於檢索其負載和頭的方法,但沒有提供 setter。這是因為 Message
在初次建立後不能被修改。因此,當一個 Message
例項被髮送到多個消費者(例如,透過釋出-訂閱通道)時,如果其中一個消費者需要傳送帶有不同負載型別的回覆,它必須建立一個新的 Message
。結果是,其他消費者不受這些更改的影響。請記住,多個消費者可能訪問相同的負載例項或頭值,並且此類例項本身是否可變是留給您決定的。換句話說,Message
例項的契約類似於不可變的 Collection
,MessageHeaders
map 進一步例證了這一點。儘管 MessageHeaders
類實現了 java.util.Map
,但任何嘗試在 MessageHeaders
例項上呼叫 put
操作(或 'remove' 或 'clear')都會導致 UnsupportedOperationException
。
Spring Integration 提供了一種更方便的方式來構建 Message,而不是要求建立和填充 Map 然後傳遞給 GenericMessage 建構函式:MessageBuilder
。MessageBuilder
提供了兩個工廠方法,用於從現有 Message
或使用負載 Object
建立 Message
例項。從現有 Message
構建時,該 Message
的頭和負載會被複制到新的 Message
中,如下例所示
Message<String> message1 = MessageBuilder.withPayload("test")
.setHeader("foo", "bar")
.build();
Message<String> message2 = MessageBuilder.fromMessage(message1).build();
assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));
如果您需要建立一個帶有新負載但仍想從現有 Message
中複製頭的 Message
,您可以使用 'copy' 方法之一,如下例所示
Message<String> message3 = MessageBuilder.withPayload("test3")
.copyHeaders(message1.getHeaders())
.build();
Message<String> message4 = MessageBuilder.withPayload("test4")
.setHeader("foo", 123)
.copyHeadersIfAbsent(message1.getHeaders())
.build();
assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));
請注意,copyHeadersIfAbsent
方法不會覆蓋現有值。此外,在上面的示例中,您可以看到如何使用 setHeader
設定任何使用者定義的頭。最後,對於預定義頭和設定任何頭的非破壞性方法也提供了 set
方法(MessageHeaders
也定義了預定義頭名稱的常量)。
您還可以使用 MessageBuilder
設定訊息的優先順序,如下例所示
Message<Integer> importantMessage = MessageBuilder.withPayload(99)
.setPriority(5)
.build();
assertEquals(5, importantMessage.getHeaders().getPriority());
Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
.setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
.build();
assertEquals(2, lessImportantMessage.getHeaders().getPriority());
只有在使用 PriorityChannel
時才會考慮 priority
頭(如下一章所述)。它被定義為 java.lang.Integer
。
提供了 MutableMessageBuilder
來處理 MutableMessage
例項。此類邏輯是建立 MutableMessage
或保留原樣,並透過構建器方法修改其內容。這樣可以在執行應用程式中獲得輕微的效能提升,前提是不可變性不是訊息交換的關注點。
從版本 6.4 開始,從 MessageBuilder 中提取了一個 BaseMessageBuilder 類,以簡化預設訊息構建邏輯的擴充套件。例如,結合自定義的 MessageBuilderFactory ,可以在應用程式上下文中全域性使用自定義的 BaseMessageBuilder 實現來提供自定義的 Message 例項。特別是,可以覆蓋 GenericMessage.toString() 方法,以便在記錄此類訊息時隱藏負載和頭中的敏感資訊。 |
MessageBuilderFactory
抽象
帶有 IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME
名稱的 MessageBuilderFactory
bean 會全域性註冊到應用程式上下文中,並在框架中隨處用於建立 Message
例項。預設情況下,它是一個 DefaultMessageBuilderFactory
例項。開箱即用,框架還提供了一個 MutableMessageBuilderFactory
,用於在框架元件中建立 MutableMessage
例項。要自定義 Message
例項的建立,必須在目標應用程式上下文中提供一個帶有 IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME
名稱的 MessageBuilderFactory
bean,以覆蓋預設的。例如,可以為 BaseMessageBuilder
的實現註冊一個自定義的 MessageBuilderFactory
,我們希望在該實現中提供一個 GenericMessage
的擴充套件,並覆蓋 toString()
方法,以便在記錄此類訊息時隱藏負載和頭中的敏感資訊。
這些類的一些快速實現以演示個人可識別資訊的緩解措施可以像這樣
class PiiMessageBuilderFactory implements MessageBuilderFactory {
@Override
public <T> PiiMessageBuilder<T> fromMessage(Message<T> message) {
return new PiiMessageBuilder<>(message.getPayload(), message);
}
@Override
public <T> PiiMessageBuilder<T> withPayload(T payload) {
return new PiiMessageBuilder<>(payload, null);
}
}
class PiiMessageBuilder<P> extends BaseMessageBuilder<P, PiiMessageBuilder<P>> {
public PiiMessageBuilder(P payload, @Nullable Message<P> originalMessage) {
super(payload, originalMessage);
}
@Override
public Message<P> build() {
return new PiiMessage<>(getPayload(), getHeaders());
}
}
class PiiMessage<P> extends GenericMessage<P> {
@Serial
private static final long serialVersionUID = -354503673433669578L;
public PiiMessage(P payload, Map<String, Object> headers) {
super(payload, headers);
}
@Override
public String toString() {
return "PiiMessage [payload=" + getPayload() + ", headers=" + maskHeaders(getHeaders()) + ']';
}
private static Map<String, Object> maskHeaders(Map<String, Object> headers) {
return headers.entrySet()
.stream()
.map((entry) -> entry.getKey().equals("password") ? Map.entry(entry.getKey(), "******") : entry)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
這個 PiiMessageBuilderFactory
可以註冊為一個 Bean,並且無論何時框架記錄訊息(例如,在 errorChannel
的情況下),password
頭都將被遮蓋。