訊息

Spring Integration Message 是一個通用的資料容器。任何物件都可以作為負載 (payload),並且每個 Message 例項都包含帶有使用者可擴充套件屬性的頭 (headers),這些屬性以鍵值對的形式儲存。

Message 介面

以下列表展示了 Message 介面的定義

public interface Message<T> {

    T getPayload();

    MessageHeaders getHeaders();

}

Message 介面是 API 的核心部分。透過將資料封裝在通用包裝器中,訊息系統可以在不瞭解資料型別的情況下傳遞資料。隨著應用程式演進以支援新型別,或者型別本身被修改或擴充套件時,訊息系統不會受到影響。另一方面,當訊息系統中的某些元件確實需要訪問有關 Message 的資訊時,此類元資料通常可以儲存到訊息頭中並從中檢索。

訊息頭 (Message Headers)

正如 Spring Integration 允許任何 Object 用作 Message 的負載一樣,它也支援任何 Object 型別作為頭值。實際上,MessageHeaders 類實現了 java.util.Map_ interface,如下面的類定義所示

public final class MessageHeaders implements Map<String, Object>, Serializable {
  ...
}
儘管 MessageHeaders 類實現了 Map,但它實際上是一個只讀實現。任何嘗試在 Map 中 put 值都會導致 UnsupportedOperationException。對於 removeclear 也是如此。由於訊息可能傳遞給多個消費者,因此不能修改 Map 的結構。同樣,訊息的負載 Object 在初次建立後不能被 set。然而,頭值本身(或負載 Object)的可變性有意留給框架使用者自行決定。

作為 Map 的實現,可以透過呼叫帶有頭名稱的 get(..) 來檢索頭。或者,您可以提供預期的 Class 作為附加引數。更好的是,當檢索預定義值時,提供了方便的 getter 方法。以下示例展示了這三種選項

Object someValue = message.getHeaders().get("someKey");

CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);

Long timestamp = message.getHeaders().getTimestamp();

下表描述了預定義的訊息頭

表 1. 預定義訊息頭
頭名稱 頭型別 用途
 MessageHeaders.ID
 java.util.UUID

此訊息例項的識別符號。每次訊息被修改時都會改變。

 MessageHeaders.
TIMESTAMP
 java.lang.Long

訊息建立的時間。每次訊息被修改時都會改變。

 MessageHeaders.
REPLY_CHANNEL
 java.lang.Object
(String or
MessageChannel)

當未配置顯式輸出通道,且沒有 ROUTING_SLIPROUTING_SLIP 已用盡時,回覆(如果有)傳送到的通道。如果值是 String,它必須表示一個 Bean 名稱或由 ChannelRegistry 生成。

 MessageHeaders.
ERROR_CHANNEL
 java.lang.Object
(String or
MessageChannel)

錯誤傳送到的通道。如果值是 String,它必須表示一個 Bean 名稱或由 ChannelRegistry 生成。

許多入站和出站介面卡實現也提供或期望某些頭,並且您可以配置額外的使用者定義頭。這些頭的常量可以在存在這些頭的模組中找到——例如 AmqpHeadersJmsHeaders 等等。

MessageHeaderAccessor API

從 Spring Framework 4.0 和 Spring Integration 4.0 開始,核心訊息抽象已移至 spring-messaging 模組,並引入了 MessageHeaderAccessor API,以提供對訊息實現的額外抽象。所有 (核心) Spring Integration 特定訊息頭常量現在都在 IntegrationMessageHeaderAccessor 類中宣告。下表描述了預定義的訊息頭

表 2. 預定義訊息頭
頭名稱 頭型別 用途
 IntegrationMessageHeaderAccessor.
CORRELATION_ID
 java.lang.Object

用於關聯兩個或更多訊息。

 IntegrationMessageHeaderAccessor.
SEQUENCE_NUMBER
 java.lang.Integer

通常是帶有 SEQUENCE_SIZE 的訊息組的序列號,但也可以在 <resequencer/> 中使用,以對無界訊息組進行重排序。

 IntegrationMessageHeaderAccessor.
SEQUENCE_SIZE
 java.lang.Integer

一組關聯訊息中的訊息數量。

 IntegrationMessageHeaderAccessor.
EXPIRATION_DATE
 java.lang.Long

指示訊息何時過期。框架不直接使用,但可以透過 Header Enricher 設定,並在配置了 UnexpiredMessageSelector<filter/> 中使用。

 IntegrationMessageHeaderAccessor.
PRIORITY
 java.lang.Integer

訊息優先順序——例如,在 PriorityChannel 中。

 IntegrationMessageHeaderAccessor.
DUPLICATE_MESSAGE
 java.lang.Boolean

如果訊息被冪等接收者攔截器檢測為重複。參見 冪等接收者企業整合模式

 IntegrationMessageHeaderAccessor.
CLOSEABLE_RESOURCE
 java.io.Closeable

如果訊息與一個處理完成後應關閉的 Closeable 相關聯,則存在此頭。例如,使用 FTP、SFTP 等進行流式檔案傳輸時關聯的 Session

 IntegrationMessageHeaderAccessor.
DELIVERY_ATTEMPT
 java.lang.
AtomicInteger

如果訊息驅動通道介面卡支援配置 RetryTemplate,則此頭包含當前的投遞嘗試次數。

 IntegrationMessageHeaderAccessor.
ACKNOWLEDGMENT_CALLBACK
 o.s.i.support.
Acknowledgment
Callback

如果入站端點支援,則會有一個回撥函式用於接受、拒絕或重新入隊訊息。參見 延遲確認可輪詢訊息源MQTT 手動確認

IntegrationMessageHeaderAccessor 類提供了其中一些頭的便捷型別化 getter,如下例所示

IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...

下表描述了也出現在 IntegrationMessageHeaderAccessor 中但通常不由使用者程式碼使用的頭(即,它們通常由 Spring Integration 的內部部分使用——此處包含它們是為了完整性)

表 3. 預定義訊息頭
頭名稱 頭型別 用途
 IntegrationMessageHeaderAccessor.
SEQUENCE_DETAILS
 java.util.
List<List<Object>>

當需要巢狀關聯時使用的關聯資料堆疊(例如,分發器→…​→分發器→…​→聚合器→…​→聚合器)。

 IntegrationMessageHeaderAccessor.
ROUTING_SLIP
 java.util.
Map<List<Object>, Integer>

參見 路由單

訊息 ID 生成

當訊息透過應用程式轉換時,每次被修改(例如,透過轉換器)都會分配一個新的訊息 ID。訊息 ID 是一個 UUID。從 Spring Integration 3.0 開始,用於 ID 生成的預設策略比以前的 java.util.UUID.randomUUID() 實現更高效。它基於安全的隨機種子使用簡單的隨機數,而不是每次都建立安全的隨機數。

可以透過在應用程式上下文中宣告一個實現 org.springframework.util.IdGenerator 介面的 Bean 來選擇不同的 UUID 生成策略。

在一個 ClassLoader 中只能使用一種 UUID 生成策略。這意味著,如果在同一個 ClassLoader 中執行兩個或更多應用程式上下文,它們會共享相同的策略。如果其中一個上下文更改了策略,所有上下文都會使用它。如果在同一個 ClassLoader 中的兩個或更多上下文聲明瞭型別為 org.springframework.util.IdGenerator 的 Bean,它們必須是同一類的例項。否則,嘗試替換自定義策略的上下文將無法初始化。如果策略相同,但引數化不同,則使用第一個被初始化的上下文中的策略。

除了預設策略外,還提供了另外兩個 IdGeneratorsorg.springframework.util.JdkIdGenerator 使用之前的 UUID.randomUUID() 機制。當實際上不需要 UUID 並且簡單的遞增值就足夠時,您可以使用 o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator

只讀頭

MessageHeaders.IDMessageHeaders.TIMESTAMP 是隻讀頭,不能被覆蓋。

從版本 4.3.2 開始,MessageBuilder 提供了 readOnlyHeaders(String…​ readOnlyHeaders) API,用於自定義不應從上游 Message 複製的頭列表。預設情況下,只有 MessageHeaders.IDMessageHeaders.TIMESTAMP 是隻讀的。提供了全域性屬性 spring.integration.readOnly.headers(參見 全域性屬性)來為框架元件自定義 DefaultMessageBuilderFactory。當您不想讓 ObjectToJsonTransformer 等元件填充某些現成的頭(例如 contentType)時,這非常有用(參見 JSON 轉換器)。

當您嘗試使用 MessageBuilder 構建新訊息時,此類頭將被忽略,並且會在日誌中發出特定的 INFO 訊息。

從版本 5.0 開始,當使用 DefaultMessageBuilderFactory 時,訊息閘道器Header Enricher內容增強器Header Filter 不允許您配置 MessageHeaders.IDMessageHeaders.TIMESTAMP 頭名稱,並且會丟擲 BeanInitializationException

頭傳播

當訊息被產生訊息的端點(例如 服務啟用器)處理(和修改)時,通常,入站頭會傳播到出站訊息。一個例外是 轉換器,當完整的訊息返回給框架時。在這種情況下,使用者程式碼負責整個出站訊息。當轉換器只返回負載時,入站頭會被傳播。此外,只有當出站訊息中不存在該頭時,頭才會被傳播,從而允許您根據需要更改頭值。

從版本 4.3.10 開始,您可以配置訊息處理器(它們修改訊息併產生輸出)以抑制特定頭的傳播。要配置您不想複製的頭,請在 MessageProducingMessageHandler 抽象類上呼叫 setNotPropagatedHeaders()addNotPropagatedHeaders() 方法。

您還可以透過在 META-INF/spring.integration.properties 中將 readOnlyHeaders 屬性設定為逗號分隔的頭列表來全域性抑制特定訊息頭的傳播。

從版本 5.0 開始,AbstractMessageProducingHandler 上的 setNotPropagatedHeaders() 實現應用簡單的模式 (xxx*, *xxx, *xxx*, 或 xxx*yyy) 來允許過濾具有共同字尾或字首的頭。有關更多資訊,請參閱 PatternMatchUtils Javadoc。當其中一個模式是 *(星號)時,不傳播任何頭。所有其他模式都被忽略。在這種情況下,服務啟用器的行為與轉換器相同,任何必需的頭必須在服務方法返回的 Message 中提供。notPropagatedHeaders() 選項在 Java DSL 的 ConsumerEndpointSpec 中可用。它也作為 <service-activator> 元件的 XML 配置中的 not-propagated-headers 屬性可用。

頭傳播抑制不適用於不修改訊息的端點,例如 橋接器路由器

訊息實現

Message 介面的基本實現是 GenericMessage<T>,它提供兩個建構函式,如下列表所示

new GenericMessage<T>(T payload);

new GenericMessage<T>(T payload, Map<String, Object> headers)

建立 Message 時,會生成一個隨機的唯一 ID。接受 Map 作為頭的建構函式會將提供的頭複製到新建立的 Message 中。

還有一個方便的 Message 實現,用於表示錯誤條件。此實現接受一個 Throwable 物件作為其負載,如下例所示

ErrorMessage message = new ErrorMessage(someThrowable);

Throwable t = message.getPayload();

請注意,此實現利用了 GenericMessage 基類是引數化的這一事實。因此,如兩個示例所示,檢索 Message 負載 Object 時無需進行強制轉換。

前面提到的 Message 類實現是不可變的。在某些情況下,當可變性不是問題且應用程式邏輯設計良好以避免併發修改時,可以使用 MutableMessage

MessageBuilder 幫助類

您可能注意到 Message 介面定義了用於檢索其負載和頭的方法,但沒有提供 setter。這是因為 Message 在初次建立後不能被修改。因此,當一個 Message 例項被髮送到多個消費者(例如,透過釋出-訂閱通道)時,如果其中一個消費者需要傳送帶有不同負載型別的回覆,它必須建立一個新的 Message。結果是,其他消費者不受這些更改的影響。請記住,多個消費者可能訪問相同的負載例項或頭值,並且此類例項本身是否可變是留給您決定的。換句話說,Message 例項的契約類似於不可變的 CollectionMessageHeaders map 進一步例證了這一點。儘管 MessageHeaders 類實現了 java.util.Map,但任何嘗試在 MessageHeaders 例項上呼叫 put 操作(或 'remove' 或 'clear')都會導致 UnsupportedOperationException

Spring Integration 提供了一種更方便的方式來構建 Message,而不是要求建立和填充 Map 然後傳遞給 GenericMessage 建構函式:MessageBuilderMessageBuilder 提供了兩個工廠方法,用於從現有 Message 或使用負載 Object 建立 Message 例項。從現有 Message 構建時,該 Message 的頭和負載會被複制到新的 Message 中,如下例所示

Message<String> message1 = MessageBuilder.withPayload("test")
        .setHeader("foo", "bar")
        .build();

Message<String> message2 = MessageBuilder.fromMessage(message1).build();

assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));

如果您需要建立一個帶有新負載但仍想從現有 Message 中複製頭的 Message,您可以使用 'copy' 方法之一,如下例所示

Message<String> message3 = MessageBuilder.withPayload("test3")
        .copyHeaders(message1.getHeaders())
        .build();

Message<String> message4 = MessageBuilder.withPayload("test4")
        .setHeader("foo", 123)
        .copyHeadersIfAbsent(message1.getHeaders())
        .build();

assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));

請注意,copyHeadersIfAbsent 方法不會覆蓋現有值。此外,在上面的示例中,您可以看到如何使用 setHeader 設定任何使用者定義的頭。最後,對於預定義頭和設定任何頭的非破壞性方法也提供了 set 方法(MessageHeaders 也定義了預定義頭名稱的常量)。

您還可以使用 MessageBuilder 設定訊息的優先順序,如下例所示

Message<Integer> importantMessage = MessageBuilder.withPayload(99)
        .setPriority(5)
        .build();

assertEquals(5, importantMessage.getHeaders().getPriority());

Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
        .setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
        .build();

assertEquals(2, lessImportantMessage.getHeaders().getPriority());

只有在使用 PriorityChannel 時才會考慮 priority 頭(如下一章所述)。它被定義為 java.lang.Integer

提供了 MutableMessageBuilder 來處理 MutableMessage 例項。此類邏輯是建立 MutableMessage 或保留原樣,並透過構建器方法修改其內容。這樣可以在執行應用程式中獲得輕微的效能提升,前提是不可變性不是訊息交換的關注點。

從版本 6.4 開始,從 MessageBuilder 中提取了一個 BaseMessageBuilder 類,以簡化預設訊息構建邏輯的擴充套件。例如,結合自定義的 MessageBuilderFactory,可以在應用程式上下文中全域性使用自定義的 BaseMessageBuilder 實現來提供自定義的 Message 例項。特別是,可以覆蓋 GenericMessage.toString() 方法,以便在記錄此類訊息時隱藏負載和頭中的敏感資訊。

MessageBuilderFactory 抽象

帶有 IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME 名稱的 MessageBuilderFactory bean 會全域性註冊到應用程式上下文中,並在框架中隨處用於建立 Message 例項。預設情況下,它是一個 DefaultMessageBuilderFactory 例項。開箱即用,框架還提供了一個 MutableMessageBuilderFactory,用於在框架元件中建立 MutableMessage 例項。要自定義 Message 例項的建立,必須在目標應用程式上下文中提供一個帶有 IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME 名稱的 MessageBuilderFactory bean,以覆蓋預設的。例如,可以為 BaseMessageBuilder 的實現註冊一個自定義的 MessageBuilderFactory,我們希望在該實現中提供一個 GenericMessage 的擴充套件,並覆蓋 toString() 方法,以便在記錄此類訊息時隱藏負載和頭中的敏感資訊。

這些類的一些快速實現以演示個人可識別資訊的緩解措施可以像這樣

class PiiMessageBuilderFactory implements MessageBuilderFactory {

	@Override
	public <T> PiiMessageBuilder<T> fromMessage(Message<T> message) {
	    return new PiiMessageBuilder<>(message.getPayload(), message);
	}

	@Override
	public <T> PiiMessageBuilder<T> withPayload(T payload) {
	    return new PiiMessageBuilder<>(payload, null);
	}

}

class PiiMessageBuilder<P> extends BaseMessageBuilder<P, PiiMessageBuilder<P>> {

    public PiiMessageBuilder(P payload, @Nullable Message<P> originalMessage) {
        super(payload, originalMessage);
    }

    @Override
    public Message<P> build() {
        return new PiiMessage<>(getPayload(), getHeaders());
    }

}

class PiiMessage<P> extends GenericMessage<P> {

    @Serial
    private static final long serialVersionUID = -354503673433669578L;

    public PiiMessage(P payload, Map<String, Object> headers) {
        super(payload, headers);
    }

    @Override
    public String toString() {
        return "PiiMessage [payload=" + getPayload() + ", headers=" + maskHeaders(getHeaders()) + ']';
    }

    private static Map<String, Object> maskHeaders(Map<String, Object> headers) {
        return headers.entrySet()
                .stream()
                .map((entry) -> entry.getKey().equals("password") ? Map.entry(entry.getKey(), "******") : entry)
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

}

這個 PiiMessageBuilderFactory 可以註冊為一個 Bean,並且無論何時框架記錄訊息(例如,在 errorChannel 的情況下),password 頭都將被遮蓋。