STOMP 支援
Spring Integration 4.2 版本引入了 STOMP (Simple Text Orientated Messaging Protocol) 客戶端支援。它基於 Spring Framework 訊息模組 stomp 包的架構、基礎設施和 API。Spring Integration 使用了許多 Spring STOMP 元件(例如 StompSession
和 StompClientSupport
)。更多資訊,請參閱 Spring Framework 參考手冊中的Spring Framework STOMP 支援章節。
您需要在專案中包含此依賴項
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stomp</artifactId>
<version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-stomp:6.4.4"
對於伺服器端元件,您需要新增 org.springframework:spring-websocket
和/或 io.projectreactor.netty:reactor-netty
依賴項。
概述
要配置 STOMP,您應該從 STOMP 客戶端物件開始。Spring Framework 提供了以下實現
-
WebSocketStompClient
:基於 Spring WebSocket API 構建,支援標準的 JSR-356 WebSocket、Jetty 9 以及使用 SockJS Client 進行基於 HTTP 的 WebSocket 模擬的 SockJS。 -
ReactorNettyTcpStompClient
:基於reactor-netty
專案中的ReactorNettyTcpClient
構建。
您可以提供任何其他 StompClientSupport
實現。請參閱這些類的Javadoc。
StompClientSupport
類被設計為生成給定 StompSessionHandler
的 StompSession
的工廠,所有剩餘工作都透過對該 StompSessionHandler
和 StompSession
抽象的回撥來完成。透過 Spring Integration 的介面卡抽象,我們需要提供一些託管共享物件來代表我們的應用程式作為一個擁有唯一會話的 STOMP 客戶端。為此,Spring Integration 提供了 StompSessionManager
抽象來管理任意給定 StompSessionHandler
之間的單個 StompSession
。這允許對特定的 STOMP Broker 使用入站或出站通道介面卡(或兩者)。有關更多資訊,請參閱 StompSessionManager
(及其實現)的 JavaDocs。
STOMP 入站通道介面卡
StompInboundChannelAdapter
是一個一站式的 MessageProducer
元件,它將您的 Spring Integration 應用程式訂閱到提供的 STOMP 目標,並從它們接收訊息(透過在連線的 StompSession
上使用提供的 MessageConverter
從 STOMP 幀轉換而來)。您可以透過在 StompInboundChannelAdapter
上使用適當的 @ManagedOperation
註解在執行時更改目標(以及 STOMP 訂閱)。
有關更多配置選項,請參閱STOMP Namespace 支援和 StompInboundChannelAdapter
的Javadoc。
STOMP 出站通道介面卡
StompMessageHandler
是用於 <int-stomp:outbound-channel-adapter>
的 MessageHandler
,它用於透過 StompSession
(由共享的 StompSessionManager
提供)將傳出的 Message<?>
例項傳送到 STOMP destination
(預配置或在執行時使用 SpEL 表示式確定)。
有關更多配置選項,請參閱STOMP Namespace 支援和 StompMessageHandler
的Javadoc。
STOMP 頭對映
STOMP 協議在其幀中提供了頭。STOMP 幀的整個結構具有以下格式
....
COMMAND
header1:value1
header2:value2
Body^@
....
Spring Framework 提供了 StompHeaders
來表示這些頭。有關更多詳細資訊,請參閱Javadoc。STOMP 幀被轉換為 Message<?>
例項以及從 Message<?>
例項轉換,這些頭也被對映到 MessageHeaders
例項以及從 MessageHeaders
例項對映。Spring Integration 為 STOMP 介面卡提供了預設的 HeaderMapper
實現。該實現是 StompHeaderMapper
。它分別為入站和出站介面卡提供了 fromHeaders()
和 toHeaders()
操作。
與許多其他 Spring Integration 模組一樣,引入了 IntegrationStompHeaders
類來將標準 STOMP 頭對映到 MessageHeaders
,其中以 stomp_
作為頭名稱字首。此外,所有帶有該字首的 MessageHeaders
例項在傳送到目標時都對映到 StompHeaders
。
有關更多資訊,請參閱這些類的Javadoc 以及STOMP Namespace Support 中 mapped-headers
屬性的描述。
STOMP 整合事件
許多 STOMP 操作是非同步的,包括錯誤處理。例如,STOMP 有一個 RECEIPT
伺服器幀,當客戶端幀透過新增 RECEIPT
頭請求時返回該幀。為了提供對這些非同步事件的訪問,Spring Integration 會發出 StompIntegrationEvent
例項,您可以透過實現 ApplicationListener
或使用 <int-event:inbound-channel-adapter>
來獲取這些例項(請參閱 接收 Spring Application 事件)。
具體來說,當 stompSessionListenableFuture
由於連線 STOMP broker 失敗而接收到 onFailure()
時,AbstractStompSessionManager
會發出一個 StompExceptionEvent
。另一個例子是 StompMessageHandler
。它處理 ERROR
STOMP 幀,這些幀是伺服器對該 StompMessageHandler
傳送的不當(未被接受的)訊息的響應。
StompMessageHandler
在傳送到 StompSession
的訊息的非同步回覆中,作為 StompSession.Receiptable
回撥的一部分,發出 StompReceiptEvent
。StompReceiptEvent
可以是正面的或負面的,這取決於是否在 receiptTimeLimit
期間從伺服器接收到 RECEIPT
幀,您可以在 StompClientSupport
例項上配置該時間限制。它預設為 15 * 1000
(毫秒,即 15 秒)。
StompSession.Receiptable 回撥僅在要傳送的訊息的 RECEIPT STOMP 頭不為 null 時才會新增。您可以透過 StompSession 的 autoReceipt 選項以及 StompSessionManager 上分別啟用自動 RECEIPT 頭生成。 |
有關如何配置 Spring Integration 以接受這些 ApplicationEvent
例項的更多資訊,請參閱STOMP Adapters Java Configuration。
STOMP 介面卡 Java 配置
以下示例顯示了 STOMP 介面卡的全面 Java 配置
@Configuration
@EnableIntegration
public class StompConfiguration {
@Bean
public ReactorNettyTcpStompClient stompClient() {
ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
stompClient.setMessageConverter(new PassThruMessageConverter());
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
stompClient.setTaskScheduler(taskScheduler);
stompClient.setReceiptTimeLimit(5000);
return stompClient;
}
@Bean
public StompSessionManager stompSessionManager() {
ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
stompSessionManager.setAutoReceipt(true);
return stompSessionManager;
}
@Bean
public PollableChannel stompInputChannel() {
return new QueueChannel();
}
@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
adapter.setOutputChannel(stompInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public MessageHandler stompMessageHandler() {
StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
handler.setDestination("/topic/myTopic");
return handler;
}
@Bean
public PollableChannel stompEvents() {
return new QueueChannel();
}
@Bean
public ApplicationListener<ApplicationEvent> stompEventListener() {
ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
producer.setEventTypes(StompIntegrationEvent.class);
producer.setOutputChannel(stompEvents());
return producer;
}
}
STOMP Namespace 支援
Spring Integration STOMP namespace 實現了入站和出站通道介面卡元件。要將其包含在您的配置中,請在您的應用程式上下文配置檔案中提供以下 namespace 宣告
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stomp
https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
...
</beans>
理解 <int-stomp:outbound-channel-adapter>
元素
以下列表顯示了 STOMP 出站通道介面卡的可用屬性
<int-stomp:outbound-channel-adapter
id="" (1)
channel="" (2)
stomp-session-manager="" (3)
header-mapper="" (4)
mapped-headers="" (5)
destination="" (6)
destination-expression="" (7)
auto-startup="" (8)
phase=""/> (9)
1 | 元件 bean 名稱。MessageHandler 會註冊一個 bean 別名,其名稱為 id 加上 .handler 。如果您未設定 channel 屬性,則會在應用程式上下文中建立一個 DirectChannel 並註冊,其 bean 名稱為該 id 屬性的值。在這種情況下,端點會註冊一個 bean 名稱,其名稱為 id 加上 .adapter 。 |
2 | 如果存在 id ,則標識連線到此介面卡的通道。請參閱 id 。可選。 |
3 | 對 StompSessionManager bean 的引用,該 bean 封裝了底層連線和 StompSession 處理操作。必需。 |
4 | 對實現 HeaderMapper<StompHeaders> 的 bean 的引用,該 bean 將 Spring Integration MessageHeaders 對映到 STOMP 幀頭以及從 STOMP 幀頭對映。它與 mapped-headers 互斥。預設值為 StompHeaderMapper 。 |
5 | 逗號分隔的 STOMP 頭名稱列表,這些頭將被對映到 STOMP 幀頭。只有在未設定 header-mapper 引用時才能提供此項。此列表中的值也可以是與頭名稱匹配的簡單模式(例如 myheader* 或 *myheader )。一個特殊令牌 (STOMP_OUTBOUND_HEADERS ) 代表所有標準 STOMP 頭(內容長度、收據、心跳等)。它們預設包含在內。如果您想新增自己的頭,並且也希望標準頭被對映,則必須包含此令牌或透過使用 header-mapper 提供您自己的 HeaderMapper 實現。 |
6 | 傳送 STOMP 訊息的目標名稱。它與 destination-expression 互斥。 |
7 | 一個 SpEL 表示式,將在執行時針對每個 Spring Integration Message 作為根物件進行評估。它與 destination 互斥。 |
8 | 布林值,指示此端點是否應自動啟動。預設為 true 。 |
9 | 此端點應在其生命週期階段內啟動和停止。值越低,此端點啟動越早,停止越晚。預設值為 Integer.MIN_VALUE 。值可以為負。請參閱 SmartLifeCycle 。 |
理解 <int-stomp:inbound-channel-adapter>
元素
以下列表顯示了 STOMP 入站通道介面卡的可用屬性
<int-stomp:inbound-channel-adapter
id="" (1)
channel="" (2)
error-channel="" (3)
stomp-session-manager="" (4)
header-mapper="" (5)
mapped-headers="" (6)
destinations="" (7)
send-timeout="" (8)
payload-type="" (9)
auto-startup="" (10)
phase=""/> (11)
1 | 元件 bean 名稱。如果您未設定 channel 屬性,則會在應用程式上下文中建立一個 DirectChannel 並註冊,其 bean 名稱為該 id 屬性的值。在這種情況下,端點會註冊一個 bean 名稱,其名稱為 id 加上 .adapter 。 |
2 | 標識連線到此介面卡的通道。 |
3 | MessageChannel bean 引用,ErrorMessage 例項應傳送到該引用。 |
4 | 請參閱 <int-stomp:outbound-channel-adapter> 上的相同選項。 |
5 | 逗號分隔的 STOMP 頭名稱列表,這些頭將從 STOMP 幀頭對映而來。只有在未設定 header-mapper 引用時才能提供此項。此列表中的值也可以是與頭名稱匹配的簡單模式(例如 myheader* 或 *myheader )。一個特殊令牌 (STOMP_INBOUND_HEADERS ) 代表所有標準 STOMP 頭(內容長度、收據、心跳等)。它們預設包含在內。如果您想新增自己的頭,並且也希望標準頭被對映,則必須也包含此令牌或透過使用 header-mapper 提供您自己的 HeaderMapper 實現。 |
6 | 請參閱 <int-stomp:outbound-channel-adapter> 上的相同選項。 |
7 | 逗號分隔的 STOMP 目標名稱列表,用於訂閱。目標列表(以及訂閱)可以透過 addDestination() 和 removeDestination() @ManagedOperation 註解在執行時進行修改。 |
8 | 當傳送訊息到可能阻塞的通道時,最大等待時間(毫秒)。例如,如果 QueueChannel 達到最大容量,它可能會阻塞直到有空間可用。 |
9 | 要從傳入 STOMP 幀轉換的目標 payload 的 Java 型別的完全限定名稱。預設為 String.class 。 |
10 | 請參閱 <int-stomp:outbound-channel-adapter> 上的相同選項。 |
11 | 請參閱 <int-stomp:outbound-channel-adapter> 上的相同選項。 |