STOMP 支援

Spring Integration 4.2 版本引入了 STOMP (Simple Text Orientated Messaging Protocol) 客戶端支援。它基於 Spring Framework 訊息模組 stomp 包的架構、基礎設施和 API。Spring Integration 使用了許多 Spring STOMP 元件(例如 StompSessionStompClientSupport)。更多資訊,請參閱 Spring Framework 參考手冊中的Spring Framework STOMP 支援章節。

您需要在專案中包含此依賴項

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stomp</artifactId>
    <version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-stomp:6.4.4"

對於伺服器端元件,您需要新增 org.springframework:spring-websocket 和/或 io.projectreactor.netty:reactor-netty 依賴項。

概述

要配置 STOMP,您應該從 STOMP 客戶端物件開始。Spring Framework 提供了以下實現

  • WebSocketStompClient:基於 Spring WebSocket API 構建,支援標準的 JSR-356 WebSocket、Jetty 9 以及使用 SockJS Client 進行基於 HTTP 的 WebSocket 模擬的 SockJS。

  • ReactorNettyTcpStompClient:基於 reactor-netty 專案中的 ReactorNettyTcpClient 構建。

您可以提供任何其他 StompClientSupport 實現。請參閱這些類的Javadoc

StompClientSupport 類被設計為生成給定 StompSessionHandlerStompSession工廠,所有剩餘工作都透過對該 StompSessionHandlerStompSession 抽象的回撥來完成。透過 Spring Integration 的介面卡抽象,我們需要提供一些託管共享物件來代表我們的應用程式作為一個擁有唯一會話的 STOMP 客戶端。為此,Spring Integration 提供了 StompSessionManager 抽象來管理任意給定 StompSessionHandler 之間的單個 StompSession。這允許對特定的 STOMP Broker 使用入站出站通道介面卡(或兩者)。有關更多資訊,請參閱 StompSessionManager(及其實現)的 JavaDocs。

STOMP 入站通道介面卡

StompInboundChannelAdapter 是一個一站式的 MessageProducer 元件,它將您的 Spring Integration 應用程式訂閱到提供的 STOMP 目標,並從它們接收訊息(透過在連線的 StompSession 上使用提供的 MessageConverter 從 STOMP 幀轉換而來)。您可以透過在 StompInboundChannelAdapter 上使用適當的 @ManagedOperation 註解在執行時更改目標(以及 STOMP 訂閱)。

有關更多配置選項,請參閱STOMP Namespace 支援StompInboundChannelAdapterJavadoc

STOMP 出站通道介面卡

StompMessageHandler 是用於 <int-stomp:outbound-channel-adapter>MessageHandler,它用於透過 StompSession(由共享的 StompSessionManager 提供)將傳出的 Message<?> 例項傳送到 STOMP destination(預配置或在執行時使用 SpEL 表示式確定)。

有關更多配置選項,請參閱STOMP Namespace 支援StompMessageHandlerJavadoc

STOMP 頭對映

STOMP 協議在其幀中提供了頭。STOMP 幀的整個結構具有以下格式

....
COMMAND
header1:value1
header2:value2

Body^@
....

Spring Framework 提供了 StompHeaders 來表示這些頭。有關更多詳細資訊,請參閱Javadoc。STOMP 幀被轉換為 Message<?> 例項以及從 Message<?> 例項轉換,這些頭也被對映到 MessageHeaders 例項以及從 MessageHeaders 例項對映。Spring Integration 為 STOMP 介面卡提供了預設的 HeaderMapper 實現。該實現是 StompHeaderMapper。它分別為入站和出站介面卡提供了 fromHeaders()toHeaders() 操作。

與許多其他 Spring Integration 模組一樣,引入了 IntegrationStompHeaders 類來將標準 STOMP 頭對映到 MessageHeaders,其中以 stomp_ 作為頭名稱字首。此外,所有帶有該字首的 MessageHeaders 例項在傳送到目標時都對映到 StompHeaders

有關更多資訊,請參閱這些類的Javadoc 以及STOMP Namespace Supportmapped-headers 屬性的描述。

STOMP 整合事件

許多 STOMP 操作是非同步的,包括錯誤處理。例如,STOMP 有一個 RECEIPT 伺服器幀,當客戶端幀透過新增 RECEIPT 頭請求時返回該幀。為了提供對這些非同步事件的訪問,Spring Integration 會發出 StompIntegrationEvent 例項,您可以透過實現 ApplicationListener 或使用 <int-event:inbound-channel-adapter> 來獲取這些例項(請參閱 接收 Spring Application 事件)。

具體來說,當 stompSessionListenableFuture 由於連線 STOMP broker 失敗而接收到 onFailure() 時,AbstractStompSessionManager 會發出一個 StompExceptionEvent。另一個例子是 StompMessageHandler。它處理 ERROR STOMP 幀,這些幀是伺服器對該 StompMessageHandler 傳送的不當(未被接受的)訊息的響應。

StompMessageHandler 在傳送到 StompSession 的訊息的非同步回覆中,作為 StompSession.Receiptable 回撥的一部分,發出 StompReceiptEventStompReceiptEvent 可以是正面的或負面的,這取決於是否在 receiptTimeLimit 期間從伺服器接收到 RECEIPT 幀,您可以在 StompClientSupport 例項上配置該時間限制。它預設為 15 * 1000(毫秒,即 15 秒)。

StompSession.Receiptable 回撥僅在要傳送的訊息的 RECEIPT STOMP 頭不為 null 時才會新增。您可以透過 StompSessionautoReceipt 選項以及 StompSessionManager 上分別啟用自動 RECEIPT 頭生成。

有關如何配置 Spring Integration 以接受這些 ApplicationEvent 例項的更多資訊,請參閱STOMP Adapters Java Configuration

STOMP 介面卡 Java 配置

以下示例顯示了 STOMP 介面卡的全面 Java 配置

@Configuration
@EnableIntegration
public class StompConfiguration {

    @Bean
    public ReactorNettyTcpStompClient stompClient() {
        ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
        stompClient.setMessageConverter(new PassThruMessageConverter());
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.afterPropertiesSet();
        stompClient.setTaskScheduler(taskScheduler);
        stompClient.setReceiptTimeLimit(5000);
        return stompClient;
    }

    @Bean
    public StompSessionManager stompSessionManager() {
        ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
        stompSessionManager.setAutoReceipt(true);
        return stompSessionManager;
    }

    @Bean
    public PollableChannel stompInputChannel() {
        return new QueueChannel();
    }

    @Bean
    public StompInboundChannelAdapter stompInboundChannelAdapter() {
        StompInboundChannelAdapter adapter =
        		new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
        adapter.setOutputChannel(stompInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "stompOutputChannel")
    public MessageHandler stompMessageHandler() {
        StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
        handler.setDestination("/topic/myTopic");
        return handler;
    }

    @Bean
    public PollableChannel stompEvents() {
        return new QueueChannel();
    }

    @Bean
    public ApplicationListener<ApplicationEvent> stompEventListener() {
        ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
        producer.setEventTypes(StompIntegrationEvent.class);
        producer.setOutputChannel(stompEvents());
        return producer;
    }

}

STOMP Namespace 支援

Spring Integration STOMP namespace 實現了入站和出站通道介面卡元件。要將其包含在您的配置中,請在您的應用程式上下文配置檔案中提供以下 namespace 宣告

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/stomp
    https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
    ...
</beans>

理解 <int-stomp:outbound-channel-adapter> 元素

以下列表顯示了 STOMP 出站通道介面卡的可用屬性

<int-stomp:outbound-channel-adapter
                           id=""                      (1)
                           channel=""                 (2)
                           stomp-session-manager=""   (3)
                           header-mapper=""           (4)
                           mapped-headers=""          (5)
                           destination=""             (6)
                           destination-expression=""  (7)
                           auto-startup=""            (8)
                           phase=""/>                 (9)
1 元件 bean 名稱。MessageHandler 會註冊一個 bean 別名,其名稱為 id 加上 .handler。如果您未設定 channel 屬性,則會在應用程式上下文中建立一個 DirectChannel 並註冊,其 bean 名稱為該 id 屬性的值。在這種情況下,端點會註冊一個 bean 名稱,其名稱為 id 加上 .adapter
2 如果存在 id,則標識連線到此介面卡的通道。請參閱 id。可選。
3 StompSessionManager bean 的引用,該 bean 封裝了底層連線和 StompSession 處理操作。必需。
4 對實現 HeaderMapper<StompHeaders> 的 bean 的引用,該 bean 將 Spring Integration MessageHeaders 對映到 STOMP 幀頭以及從 STOMP 幀頭對映。它與 mapped-headers 互斥。預設值為 StompHeaderMapper
5 逗號分隔的 STOMP 頭名稱列表,這些頭將被對映到 STOMP 幀頭。只有在未設定 header-mapper 引用時才能提供此項。此列表中的值也可以是與頭名稱匹配的簡單模式(例如 myheader**myheader)。一個特殊令牌 (STOMP_OUTBOUND_HEADERS) 代表所有標準 STOMP 頭(內容長度、收據、心跳等)。它們預設包含在內。如果您想新增自己的頭,並且也希望標準頭被對映,則必須包含此令牌或透過使用 header-mapper 提供您自己的 HeaderMapper 實現。
6 傳送 STOMP 訊息的目標名稱。它與 destination-expression 互斥。
7 一個 SpEL 表示式,將在執行時針對每個 Spring Integration Message 作為根物件進行評估。它與 destination 互斥。
8 布林值,指示此端點是否應自動啟動。預設為 true
9 此端點應在其生命週期階段內啟動和停止。值越低,此端點啟動越早,停止越晚。預設值為 Integer.MIN_VALUE。值可以為負。請參閱 SmartLifeCycle

理解 <int-stomp:inbound-channel-adapter> 元素

以下列表顯示了 STOMP 入站通道介面卡的可用屬性

<int-stomp:inbound-channel-adapter
                           id=""                     (1)
                           channel=""                (2)
                           error-channel=""          (3)
                           stomp-session-manager=""  (4)
                           header-mapper=""          (5)
                           mapped-headers=""         (6)
                           destinations=""           (7)
                           send-timeout=""           (8)
                           payload-type=""           (9)
                           auto-startup=""           (10)
                           phase=""/>                (11)
1 元件 bean 名稱。如果您未設定 channel 屬性,則會在應用程式上下文中建立一個 DirectChannel 並註冊,其 bean 名稱為該 id 屬性的值。在這種情況下,端點會註冊一個 bean 名稱,其名稱為 id 加上 .adapter
2 標識連線到此介面卡的通道。
3 MessageChannel bean 引用,ErrorMessage 例項應傳送到該引用。
4 請參閱 <int-stomp:outbound-channel-adapter> 上的相同選項。
5 逗號分隔的 STOMP 頭名稱列表,這些頭將從 STOMP 幀頭對映而來。只有在未設定 header-mapper 引用時才能提供此項。此列表中的值也可以是與頭名稱匹配的簡單模式(例如 myheader**myheader)。一個特殊令牌 (STOMP_INBOUND_HEADERS) 代表所有標準 STOMP 頭(內容長度、收據、心跳等)。它們預設包含在內。如果您想新增自己的頭,並且也希望標準頭被對映,則必須也包含此令牌或透過使用 header-mapper 提供您自己的 HeaderMapper 實現。
6 請參閱 <int-stomp:outbound-channel-adapter> 上的相同選項。
7 逗號分隔的 STOMP 目標名稱列表,用於訂閱。目標列表(以及訂閱)可以透過 addDestination()removeDestination() @ManagedOperation 註解在執行時進行修改。
8 當傳送訊息到可能阻塞的通道時,最大等待時間(毫秒)。例如,如果 QueueChannel 達到最大容量,它可能會阻塞直到有空間可用。
9 要從傳入 STOMP 幀轉換的目標 payload 的 Java 型別的完全限定名稱。預設為 String.class
10 請參閱 <int-stomp:outbound-channel-adapter> 上的相同選項。
11 請參閱 <int-stomp:outbound-channel-adapter> 上的相同選項。