STOMP 支援
Spring Integration 4.2 版引入了 STOMP (Simple Text Orientated Messaging Protocol) 客戶端支援。它基於 Spring Framework 訊息模組 stomp 包的架構、基礎設施和 API。Spring Integration 使用了許多 Spring STOMP 元件(例如 StompSession 和 StompClientSupport)。有關更多資訊,請參閱 Spring Framework 參考手冊中的Spring Framework STOMP 支援一章。
專案需要此依賴項
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stomp</artifactId>
<version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-stomp:7.0.0"
對於伺服器端元件,您需要新增 org.springframework:spring-websocket 和/或 io.projectreactor.netty:reactor-netty 依賴項。
概述
要配置 STOMP,您應該從 STOMP 客戶端物件開始。Spring Framework 提供了以下實現:
-
WebSocketStompClient:基於 Spring WebSocket API 構建,支援標準 JSR-356 WebSocket、Jetty 9 和用於基於 HTTP 的 WebSocket 模擬的 SockJS 客戶端。 -
ReactorNettyTcpStompClient:基於reactor-netty專案中的ReactorNettyTcpClient構建。
您可以提供任何其他 StompClientSupport 實現。請參閱這些類的Javadoc。
StompClientSupport 類被設計為 工廠,用於為提供的 StompSessionHandler 生成一個 StompSession,所有剩餘的工作都透過對 StompSessionHandler 和 StompSession 抽象的 回撥 來完成。使用 Spring Integration 介面卡 抽象,我們需要提供一些託管共享物件來將我們的應用程式表示為具有其唯一會話的 STOMP 客戶端。為此,Spring Integration 提供了 StompSessionManager 抽象來管理任何提供的 StompSessionHandler 之間的 單個 StompSession。這允許將 入站 或 出站 通道介面卡(或兩者)用於特定的 STOMP Broker。有關更多資訊,請參閱 StompSessionManager(及其實現)的 Javadocs。
STOMP 入站通道介面卡
StompInboundChannelAdapter 是一個一站式 MessageProducer 元件,它將您的 Spring Integration 應用程式訂閱到提供的 STOMP 目的地,並從它們接收訊息(透過連線的 StompSession 上使用提供的 MessageConverter 從 STOMP 幀轉換而來)。您可以透過在 StompInboundChannelAdapter 上使用適當的 @ManagedOperation 註解在執行時更改目的地(以及 STOMP 訂閱)。
有關更多配置選項,請參閱STOMP 名稱空間支援和 StompInboundChannelAdapter 的Javadoc。
STOMP 出站通道介面卡
StompMessageHandler 是 <int-stomp:outbound-channel-adapter> 的 MessageHandler,用於透過 StompSession(由共享的 StompSessionManager 提供)將傳出的 Message<?> 例項傳送到 STOMP destination(預配置或在執行時透過 SpEL 表示式確定)。
有關更多配置選項,請參閱STOMP 名稱空間支援和 StompMessageHandler 的Javadoc。
STOMP 頭部對映
STOMP 協議將其幀的一部分作為頭部提供。STOMP 幀的整個結構具有以下格式:
....
COMMAND
header1:value1
header2:value2
Body^@
....
Spring Framework 提供了 StompHeaders 來表示這些頭部。有關更多詳細資訊,請參閱Javadoc。STOMP 幀在 Message<?> 例項之間進行轉換,這些頭部在 MessageHeaders 例項之間進行對映。Spring Integration 為 STOMP 介面卡提供了預設的 HeaderMapper 實現。該實現是 StompHeaderMapper。它分別提供了用於入站和出站介面卡的 fromHeaders() 和 toHeaders() 操作。
與許多其他 Spring Integration 模組一樣,IntegrationStompHeaders 類已被引入,用於將標準 STOMP 頭部對映到 MessageHeaders,並以 stomp_ 作為頭部名稱字首。此外,所有帶有該字首的 MessageHeaders 例項在傳送到目的地時都對映到 StompHeaders。
有關更多資訊,請參閱這些類的Javadoc以及STOMP 名稱空間支援中 mapped-headers 屬性的描述。
STOMP 整合事件
許多 STOMP 操作是非同步的,包括錯誤處理。例如,STOMP 有一個 RECEIPT 伺服器幀,當客戶端幀透過新增 RECEIPT 頭部請求它時,它會返回該幀。為了提供對這些非同步事件的訪問,Spring Integration 發出 StompIntegrationEvent 例項,您可以透過實現 ApplicationListener 或使用 <int-event:inbound-channel-adapter>(請參閱接收 Spring 應用程式事件)來獲取這些例項。
具體來說,當 stompSessionListenableFuture 因無法連線到 STOMP broker 而收到 onFailure() 時,AbstractStompSessionManager 會發出 StompExceptionEvent。另一個例子是 StompMessageHandler。它處理 ERROR STOMP 幀,這些幀是伺服器對 StompMessageHandler 傳送的不正確(未接受)訊息的響應。
StompMessageHandler 在傳送到 StompSession 的訊息的非同步響應中,作為 StompSession.Receiptable 回撥的一部分發出 StompReceiptEvent。StompReceiptEvent 可以是正面的或負面的,這取決於在 receiptTimeLimit 期間(您可以在 StompClientSupport 例項上配置)是否從伺服器收到了 RECEIPT 幀。它預設為 15 * 1000(以毫秒為單位,即 15 秒)。
僅當要傳送的訊息的 RECEIPT STOMP 頭部不為 null 時才新增 StompSession.Receiptable 回撥。您可以透過其 autoReceipt 選項在 StompSession 上以及在 StompSessionManager 上分別啟用自動 RECEIPT 頭部生成。 |
有關如何配置 Spring Integration 以接受這些 ApplicationEvent 例項的更多資訊,請參閱STOMP 介面卡 Java 配置。
STOMP 介面卡 Java 配置
以下示例顯示了 STOMP 介面卡的全面 Java 配置:
@Configuration
@EnableIntegration
public class StompConfiguration {
@Bean
public ReactorNettyTcpStompClient stompClient() {
ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
stompClient.setMessageConverter(new PassThruMessageConverter());
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
stompClient.setTaskScheduler(taskScheduler);
stompClient.setReceiptTimeLimit(5000);
return stompClient;
}
@Bean
public StompSessionManager stompSessionManager() {
ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
stompSessionManager.setAutoReceipt(true);
return stompSessionManager;
}
@Bean
public PollableChannel stompInputChannel() {
return new QueueChannel();
}
@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
adapter.setOutputChannel(stompInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public MessageHandler stompMessageHandler() {
StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
handler.setDestination("/topic/myTopic");
return handler;
}
@Bean
public PollableChannel stompEvents() {
return new QueueChannel();
}
@Bean
public ApplicationListener<ApplicationEvent> stompEventListener() {
ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
producer.setEventTypes(StompIntegrationEvent.class);
producer.setOutputChannel(stompEvents());
return producer;
}
}
STOMP 名稱空間支援
Spring Integration STOMP 名稱空間實現了入站和出站通道介面卡元件。要將其包含在您的配置中,請在您的應用程式上下文配置檔案中提供以下名稱空間宣告:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stomp
https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
...
</beans>
理解 <int-stomp:outbound-channel-adapter> 元素
以下列表顯示了 STOMP 出站通道介面卡的可用屬性:
<int-stomp:outbound-channel-adapter
id="" (1)
channel="" (2)
stomp-session-manager="" (3)
header-mapper="" (4)
mapped-headers="" (5)
destination="" (6)
destination-expression="" (7)
auto-startup="" (8)
phase=""/> (9)
| 1 | 元件 bean 名稱。MessageHandler 以 id 加 .handler 的 bean 別名註冊。如果您不設定 channel 屬性,則會建立一個 DirectChannel 並將其以 id 屬性的值作為 bean 名稱註冊到應用程式上下文中。在這種情況下,端點以 id 加 .adapter 的 bean 名稱註冊。 |
| 2 | 如果存在 id,則標識附加到此介面卡的通道。請參閱 id。可選。 |
| 3 | 對 StompSessionManager bean 的引用,該 bean 封裝了低階連線和 StompSession 處理操作。必需。 |
| 4 | 對實現 HeaderMapper<StompHeaders> 的 bean 的引用,該 bean 將 Spring Integration MessageHeaders 對映到 STOMP 幀頭部。它與 mapped-headers 互斥。它預設為 StompHeaderMapper。 |
| 5 | 以逗號分隔的 STOMP 頭部名稱列表,用於對映到 STOMP 幀頭部。僅當未設定 header-mapper 引用時才能提供。此列表中的值也可以是用於匹配頭部名稱的簡單模式(例如 myheader* 或 *myheader)。特殊標記 (STOMP_OUTBOUND_HEADERS) 表示所有標準 STOMP 頭部(內容長度、回執、心跳等)。它們預設包含在內。如果您想新增自己的頭部並希望標準頭部也進行對映,則必須包含此標記或使用 header-mapper 提供您自己的 HeaderMapper 實現。 |
| 6 | 傳送 STOMP 訊息的目的地名稱。它與 destination-expression 互斥。 |
| 7 | 一個 SpEL 表示式,將在執行時針對每個 Spring Integration Message 作為根物件進行評估。它與 destination 互斥。 |
| 8 | 布林值,指示此端點是否應自動啟動。它預設為 true。 |
| 9 | 此端點應在其內部啟動和停止的生命週期階段。值越低,此端點啟動越早,停止越晚。預設值為 Integer.MIN_VALUE。值可以為負數。請參閱SmartLifeCycle。 |
理解 <int-stomp:inbound-channel-adapter> 元素
以下列表顯示了 STOMP 入站通道介面卡的可用屬性:
<int-stomp:inbound-channel-adapter
id="" (1)
channel="" (2)
error-channel="" (3)
stomp-session-manager="" (4)
header-mapper="" (5)
mapped-headers="" (6)
destinations="" (7)
send-timeout="" (8)
payload-type="" (9)
auto-startup="" (10)
phase=""/> (11)
| 1 | 元件 bean 名稱。如果您不設定 channel 屬性,則會建立一個 DirectChannel 並將其以 id 屬性的值作為 bean 名稱註冊到應用程式上下文中。在這種情況下,端點以 id 加 .adapter 的 bean 名稱註冊。 |
| 2 | 標識附加到此介面卡的通道。 |
| 3 | MessageChannel bean 引用,ErrorMessage 例項應傳送到該引用。 |
| 4 | 請參閱<int-stomp:outbound-channel-adapter> 上的相同選項。 |
| 5 | 以逗號分隔的 STOMP 頭部名稱列表,用於從 STOMP 幀頭部對映。僅當未設定 header-mapper 引用時才能提供。此列表中的值也可以是用於匹配頭部名稱的簡單模式(例如 myheader* 或 *myheader)。特殊標記 (STOMP_INBOUND_HEADERS) 表示所有標準 STOMP 頭部(內容長度、回執、心跳等)。它們預設包含在內。如果您想新增自己的頭部並希望標準頭部也進行對映,則必須包含此標記或使用 header-mapper 提供您自己的 HeaderMapper 實現。 |
| 6 | 請參閱<int-stomp:outbound-channel-adapter> 上的相同選項。 |
| 7 | 以逗號分隔的 STOMP 目的地名稱列表,用於訂閱。目的地列表(以及訂閱)可以透過 addDestination() 和 removeDestination() @ManagedOperation 註解在執行時進行修改。 |
| 8 | 當向通道傳送訊息時(如果通道可以阻塞,例如,如果其最大容量已達到,QueueChannel 可以阻塞直到有可用空間),最大等待時間(以毫秒為單位)。 |
| 9 | 目標 payload 的 Java 型別的完全限定名稱,用於從傳入的 STOMP 幀進行轉換。它預設為 String.class。 |
| 10 | 請參閱<int-stomp:outbound-channel-adapter> 上的相同選項。 |
| 11 | 請參閱<int-stomp:outbound-channel-adapter> 上的相同選項。 |