Message

Spring Integration Message 是一個通用資料容器。任何物件都可以作為負載提供,並且每個 Message 例項都包含以鍵值對形式的使用者可擴充套件屬性的頭部。

Message 介面

以下列表顯示了 Message 介面的定義

public interface Message<T> {

    T getPayload();

    MessageHeaders getHeaders();

}

Message 介面是 API 的核心部分。透過將資料封裝在通用包裝器中,訊息系統可以在不知道資料型別的情況下傳遞它。隨著應用程式發展以支援新型別,或者當型別本身被修改或擴充套件時,訊息系統不會受到影響。另一方面,當訊息系統中的某些元件確實需要訪問有關 Message 的資訊時,此類元資料通常可以儲存到訊息頭部中的元資料並從中檢索。

訊息頭部

正如 Spring Integration 允許將任何 Object 用作 Message 的有效負載一樣,它也支援將任何 Object 型別用作頭部值。實際上,MessageHeaders 類實現了 java.util.Map_ 介面,如下面的類定義所示

public final class MessageHeaders implements Map<String, Object>, Serializable {
  ...
}
儘管 MessageHeaders 類實現了 Map,但它實際上是一個只讀實現。任何嘗試在 Map 中 put 值的操作都會導致 UnsupportedOperationExceptionremoveclear 也同樣適用。由於訊息可能會傳遞給多個消費者,因此 Map 的結構無法修改。同樣,訊息的有效負載 Object 在初始建立後不能 set。但是,頭部值本身(或有效負載 Object)的可變性有意地留作框架使用者的決定。

作為 Map 的實現,可以透過使用頭部名稱呼叫 get(..) 來檢索頭部。或者,您可以提供預期的 Class 作為附加引數。更好的是,當檢索預定義值之一時,可以使用方便的 getter。以下示例顯示了這三個選項中的每一個

Object someValue = message.getHeaders().get("someKey");

CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);

Long timestamp = message.getHeaders().getTimestamp();

下表描述了預定義的訊息頭部

表 1. 預定義的訊息頭部
頭部名稱 頭部型別 用法
 MessageHeaders.ID
 java.util.UUID

此訊息例項的識別符號。每次訊息被修改時都會改變。

 MessageHeaders.
TIMESTAMP
 java.lang.Long

訊息建立的時間。每次訊息被修改時都會改變。

 MessageHeaders.
REPLY_CHANNEL
 java.lang.Object
(String or
MessageChannel)

當沒有配置顯式輸出通道且沒有 ROUTING_SLIPROUTING_SLIP 已耗盡時,用於傳送回覆(如果有)的通道。如果值為 String,則它必須表示一個 bean 名稱或由 ChannelRegistry 生成。

 MessageHeaders.
ERROR_CHANNEL
 java.lang.Object
(String or
MessageChannel)

用於傳送錯誤的通道。如果值為 String,則它必須表示一個 bean 名稱或由 ChannelRegistry 生成。

許多入站和出站介面卡實現也提供或期望某些頭部,您可以配置其他使用者定義的頭部。這些頭部的常量可以在存在此類頭部的模組中找到,例如 AmqpHeadersJmsHeaders 等等。

MessageHeaderAccessor API

從 Spring Framework 4.0 和 Spring Integration 4.0 開始,核心訊息抽象已移至 spring-messaging 模組,並引入了 MessageHeaderAccessor API 以提供對訊息實現的額外抽象。所有(核心)Spring Integration 特定的訊息頭部常量現在都在 IntegrationMessageHeaderAccessor 類中宣告。下表描述了預定義的訊息頭部

表 2. 預定義的訊息頭部
頭部名稱 頭部型別 用法
 IntegrationMessageHeaderAccessor.
CORRELATION_ID
 java.lang.Object

用於關聯兩個或多個訊息。

 IntegrationMessageHeaderAccessor.
SEQUENCE_NUMBER
 java.lang.Integer

通常是具有 SEQUENCE_SIZE 的一組訊息中的序列號,但也可以在 <resequencer/> 中用於重新排序無界限的一組訊息。

 IntegrationMessageHeaderAccessor.
SEQUENCE_SIZE
 java.lang.Integer

一組關聯訊息中的訊息數量。

 IntegrationMessageHeaderAccessor.
EXPIRATION_DATE
 java.lang.Long

指示訊息何時過期。框架不直接使用,但可以透過頭部豐富器設定,並在配置了 UnexpiredMessageSelector<filter/> 中使用。

 IntegrationMessageHeaderAccessor.
PRIORITY
 java.lang.Integer

訊息優先順序——例如,在 PriorityChannel 中。

 IntegrationMessageHeaderAccessor.
DUPLICATE_MESSAGE
 java.lang.Boolean

如果訊息被冪等接收器攔截器檢測為重複。請參閱 冪等接收器企業整合模式

 IntegrationMessageHeaderAccessor.
CLOSEABLE_RESOURCE
 java.io.Closeable

如果訊息與一個 Closeable 關聯,則存在此頭部,該 Closeable 應在訊息處理完成後關閉。一個示例是使用 FTP、SFTP 等進行流式檔案傳輸的 Session

 IntegrationMessageHeaderAccessor.
DELIVERY_ATTEMPT
 java.lang.
AtomicInteger

如果訊息驅動通道介面卡支援配置 RetryTemplate,則此頭部包含當前的交付嘗試。

 IntegrationMessageHeaderAccessor.
ACKNOWLEDGMENT_CALLBACK
 o.s.i.support.
Acknowledgment
Callback

如果入站端點支援,則回撥以接受、拒絕或重新排隊訊息。請參閱 延遲確認可輪詢訊息源MQTT 手動確認

IntegrationMessageHeaderAccessor 類上提供了其中一些頭部的便捷型別化 getter,如下例所示

IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...

下表描述了也出現在 IntegrationMessageHeaderAccessor 中但通常不被使用者程式碼使用(即,它們通常被 Spring Integration 內部部分使用——此處包含它們是為了完整性)的頭部

表 3. 預定義的訊息頭部
頭部名稱 頭部型別 用法
 IntegrationMessageHeaderAccessor.
SEQUENCE_DETAILS
 java.util.
List<List<Object>>

當需要巢狀關聯時(例如,splitter→…​→splitter→…​→aggregator→…​→aggregator),使用的一組關聯資料。

 IntegrationMessageHeaderAccessor.
ROUTING_SLIP
 java.util.
Map<List<Object>, Integer>

請參閱 路由單

訊息 ID 生成

當訊息透過應用程式轉換時,每次修改它時(例如,透過轉換器),都會分配一個新的訊息 ID。訊息 ID 是一個 UUID。從 Spring Integration 3.0 開始,用於 IS 生成的預設策略比以前的 java.util.UUID.randomUUID() 實現更有效。它使用基於安全隨機種子的簡單隨機數,而不是每次都建立一個安全隨機數。

可以透過在應用程式上下文中宣告一個實現 org.springframework.util.IdGenerator 的 bean 來選擇不同的 UUID 生成策略。

在一個類載入器中只能使用一個 UUID 生成策略。這意味著如果兩個或多個應用程式上下文在同一個類載入器中執行,它們將共享相同的策略。如果其中一個上下文更改了策略,則所有上下文都將使用該策略。如果同一個類載入器中的兩個或多個上下文聲明瞭型別為 org.springframework.util.IdGenerator 的 bean,則它們都必須是同一類的例項。否則,嘗試替換自定義策略的上下文將無法初始化。如果策略相同,但引數化,則使用第一個初始化的上下文中的策略。

除了預設策略之外,還提供了另外兩個 IdGeneratorsorg.springframework.util.JdkIdGenerator 使用之前的 UUID.randomUUID() 機制。當不需要 UUID 並且簡單的遞增值就足夠時,您可以使用 o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator

只讀頭部

MessageHeaders.IDMessageHeaders.TIMESTAMP 是隻讀頭部,不能被覆蓋。

從 4.3.2 版本開始,MessageBuilder 提供了 readOnlyHeaders(String…​ readOnlyHeaders) API,用於自定義不應從上游 Message 複製的頭部列表。預設情況下,只有 MessageHeaders.IDMessageHeaders.TIMESTAMP 是隻讀的。提供了全域性 spring.integration.readOnly.headers 屬性(參見 全域性屬性)來為框架元件自定義 DefaultMessageBuilderFactory。當您不想由 ObjectToJsonTransformer(參見 JSON 轉換器)填充一些開箱即用的頭部(例如 contentType)時,這會很有用。

當您嘗試使用 MessageBuilder 構建新訊息時,此類頭部將被忽略,並且會在日誌中發出特定的 INFO 訊息。

從 5.0 版本開始,訊息閘道器頭部豐富器內容豐富器頭部過濾器 在使用 DefaultMessageBuilderFactory 時不允許您配置 MessageHeaders.IDMessageHeaders.TIMESTAMP 頭部名稱,並且會丟擲 BeanInitializationException

頭部傳播

當訊息生產者端點(例如 服務啟用器)處理(和修改)訊息時,通常,入站頭部會傳播到出站訊息。一個例外是 轉換器,當一個完整的訊息返回到框架時。在這種情況下,使用者程式碼負責整個出站訊息。當轉換器只返回有效負載時,入站頭部會傳播。此外,只有當頭部在出站訊息中尚不存在時才會傳播,從而允許您根據需要更改頭部值。

從 4.3.10 版本開始,您可以配置訊息處理器(修改訊息併產生輸出)以抑制特定頭部的傳播。要配置您不想複製的頭部,請在 MessageProducingMessageHandler 抽象類上呼叫 setNotPropagatedHeaders()addNotPropagatedHeaders() 方法。

您還可以透過將 META-INF/spring.integration.properties 中的 readOnlyHeaders 屬性設定為逗號分隔的頭部列表來全域性抑制特定訊息頭部的傳播。

從 5.0 版本開始,AbstractMessageProducingHandler 上的 setNotPropagatedHeaders() 實現應用簡單模式(xxx**xxx*xxx*xxx*yyy)以允許使用通用字尾或字首過濾頭部。有關更多資訊,請參閱 PatternMatchUtils Javadoc。當其中一個模式是 *(星號)時,不傳播任何頭部。所有其他模式都將被忽略。在這種情況下,服務啟用器與轉換器的行為方式相同,並且任何所需的頭部都必須在服務方法返回的 Message 中提供。notPropagatedHeaders() 選項在 Java DSL 的 ConsumerEndpointSpec 中可用。它也可用於 <service-activator> 元件的 XML 配置,作為 not-propagated-headers 屬性。

頭部傳播抑制不適用於不修改訊息的端點,例如 橋接器路由器

訊息實現

Message 介面的基本實現是 GenericMessage<T>,它提供兩個建構函式,如下所示

new GenericMessage<T>(T payload);

new GenericMessage<T>(T payload, Map<String, Object> headers)

當建立 Message 時,會生成一個隨機的唯一 ID。接受 Map 頭部的建構函式會將提供的頭部複製到新建立的 Message 中。

還有一個方便的 Message 實現,旨在傳達錯誤情況。此實現以 Throwable 物件作為其有效負載,如下例所示

ErrorMessage message = new ErrorMessage(someThrowable);

Throwable t = message.getPayload();

請注意,此實現利用了 GenericMessage 基類引數化的事實。因此,如兩個示例所示,在檢索 Message 有效負載 Object 時不需要進行任何強制轉換。

上述 Message 類實現是不可變的。在某些情況下,當可變性不是問題並且應用程式邏輯設計良好以避免併發修改時,可以使用 MutableMessage

MessageBuilder 輔助類

您可能會注意到 Message 介面定義了其有效負載和頭部的檢索方法,但沒有提供 setter。其原因是 Message 在初始建立後不能被修改。因此,當 Message 例項傳送給多個消費者時(例如,透過釋出/訂閱通道),如果其中一個消費者需要傳送帶有不同有效負載型別的回覆,它必須建立一個新的 Message。這樣,其他消費者就不會受到這些更改的影響。請記住,多個消費者可能訪問相同的有效負載例項或頭部值,並且此類例項本身是否不可變由您決定。換句話說,Message 例項的契約類似於不可修改的 CollectionMessageHeaders 對映進一步證明了這一點。儘管 MessageHeaders 類實現了 java.util.Map,但任何嘗試對 MessageHeaders 例項呼叫 put 操作(或“remove”或“clear”)都會導致 UnsupportedOperationException

Spring Integration 提供了一種更方便的方法來構建訊息,而不是要求建立和填充 Map 以傳遞給 GenericMessage 建構函式:MessageBuilderMessageBuilder 提供兩種工廠方法,用於從現有 Message 或使用有效負載 Object 建立 Message 例項。當從現有 Message 構建時,該 Message 的頭部和有效負載將複製到新的 Message 中,如下例所示

Message<String> message1 = MessageBuilder.withPayload("test")
        .setHeader("foo", "bar")
        .build();

Message<String> message2 = MessageBuilder.fromMessage(message1).build();

assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));

如果您需要建立具有新有效負載但仍希望從現有 Message 複製頭部的 Message,您可以使用其中一個“copy”方法,如下例所示

Message<String> message3 = MessageBuilder.withPayload("test3")
        .copyHeaders(message1.getHeaders())
        .build();

Message<String> message4 = MessageBuilder.withPayload("test4")
        .setHeader("foo", 123)
        .copyHeadersIfAbsent(message1.getHeaders())
        .build();

assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));

請注意,copyHeadersIfAbsent 方法不會覆蓋現有值。此外,在上面的示例中,您可以看到如何使用 setHeader 設定任何使用者定義的頭部。最後,預定義頭部也提供了 set 方法,以及一個用於設定任何頭部(MessageHeaders 也定義了預定義頭部名稱的常量)的非破壞性方法。

您還可以使用 MessageBuilder 設定訊息的優先順序,如下例所示

Message<Integer> importantMessage = MessageBuilder.withPayload(99)
        .setPriority(5)
        .build();

assertEquals(5, importantMessage.getHeaders().getPriority());

Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
        .setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
        .build();

assertEquals(2, lessImportantMessage.getHeaders().getPriority());

priority 頭部僅在使用 PriorityChannel 時才會被考慮(如下一章所述)。它被定義為 java.lang.Integer

提供了 MutableMessageBuilder 來處理 MutableMessage 例項。此類的邏輯是建立 MutableMessage 或保持原樣,並透過構建器方法修改其內容。這樣,當不可變性不是訊息交換的關注點時,執行中的應用程式的效能會略有提高。

從版本 6.4 開始,BaseMessageBuilder 類從 MessageBuilder 中提取出來,以簡化對預設訊息構建邏輯的擴充套件。例如,結合自定義 MessageBuilderFactory,自定義 BaseMessageBuilder 實現可以在應用程式上下文中全域性使用,以提供自定義 Message 例項。特別是,可以覆蓋 GenericMessage.toString() 方法,以便在記錄此類訊息時隱藏有效負載和頭部中的敏感資訊。

MessageBuilderFactory 抽象

名為 IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAMEMessageBuilderFactory bean 被全域性註冊到應用程式上下文中,並在框架的任何地方用於建立 Message 例項。預設情況下,它是一個 DefaultMessageBuilderFactory 例項。開箱即用,框架還提供了一個 MutableMessageBuilderFactory,用於在框架元件中建立 MutableMessage 例項。為了自定義 Message 例項的建立,必須在目標應用程式上下文中提供一個名為 IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAMEMessageBuilderFactory bean,以覆蓋預設的。例如,可以為 BaseMessageBuilder 的實現註冊一個自定義 MessageBuilderFactory,在該實現中,我們希望提供一個擴充套件了 GenericMessage 並覆蓋了 toString() 的例項,以便在記錄此類訊息時隱藏有效負載和頭部中的敏感資訊。

這些類的快速實現可以像這樣演示個人身份資訊緩解

class PiiMessageBuilderFactory implements MessageBuilderFactory {

	@Override
	public <T> PiiMessageBuilder<T> fromMessage(Message<T> message) {
	    return new PiiMessageBuilder<>(message.getPayload(), message);
	}

	@Override
	public <T> PiiMessageBuilder<T> withPayload(T payload) {
	    return new PiiMessageBuilder<>(payload, null);
	}

}

class PiiMessageBuilder<P> extends BaseMessageBuilder<P, PiiMessageBuilder<P>> {

    public PiiMessageBuilder(P payload, @Nullable Message<P> originalMessage) {
        super(payload, originalMessage);
    }

    @Override
    public Message<P> build() {
        return new PiiMessage<>(getPayload(), getHeaders());
    }

}

class PiiMessage<P> extends GenericMessage<P> {

    @Serial
    private static final long serialVersionUID = -354503673433669578L;

    public PiiMessage(P payload, Map<String, Object> headers) {
        super(payload, headers);
    }

    @Override
    public String toString() {
        return "PiiMessage [payload=" + getPayload() + ", headers=" + maskHeaders(getHeaders()) + ']';
    }

    private static Map<String, Object> maskHeaders(Map<String, Object> headers) {
        return headers.entrySet()
                .stream()
                .map((entry) -> entry.getKey().equals("password") ? Map.entry(entry.getKey(), "******") : entry)
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

}

然後可以將此 PiiMessageBuilderFactory 註冊為 bean,每當框架記錄訊息時(例如在 errorChannel 的情況下),password 頭部將被遮蔽。

© . This site is unofficial and not affiliated with VMware.