WebSocket 支援
從版本 4.1 開始,Spring Integration 支援 WebSocket。它基於 Spring Framework 的 web-socket
模組的架構、基礎設施和 API。因此,許多 Spring WebSocket 的元件(例如 SubProtocolHandler
或 WebSocketClient
)和配置選項(例如 @EnableWebSocketMessageBroker
)可以在 Spring Integration 中重用。更多資訊請參閱 Spring Framework WebSocket 支援 一章在 Spring Framework 參考手冊中。
您需要在專案中包含此依賴項
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-websocket</artifactId>
<version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-websocket:6.4.4"
對於伺服器端,必須顯式包含 org.springframework:spring-webmvc
依賴項。
Spring Framework 的 WebSocket 基礎設施基於 Spring 訊息基礎構建,並提供了一個基本的訊息框架,該框架基於 Spring Integration 使用的相同的 MessageChannel
實現和 MessageHandler
實現(以及一些 POJO 方法的註解對映)。因此,Spring Integration 可以直接參與 WebSocket 流,即使沒有 WebSocket 介面卡。為此,您可以使用適當的註解配置 Spring Integration @MessagingGateway
,如下例所示
@MessagingGateway
@Controller
public interface WebSocketGateway {
@MessageMapping("/greeting")
@SendToUser("/queue/answer")
@Gateway(requestChannel = "greetingChannel")
String greeting(String payload);
}
概覽
由於 WebSocket 協議本質上是流式的,並且我們可以同時向 WebSocket 傳送和接收訊息,因此無論是在客戶端還是伺服器端,我們都可以處理適當的 WebSocketSession
。為了封裝連線管理和 WebSocketSession
註冊,提供了 IntegrationWebSocketContainer
,其包含 ClientWebSocketContainer
和 ServerWebSocketContainer
實現。得益於 WebSocket API 及其在 Spring Framework 中的實現(包含許多擴充套件),伺服器端和客戶端(當然,從 Java 的角度來看)都使用相同的類。因此,大多數連線和 WebSocketSession
註冊選項在兩端是相同的。這使我們可以重用許多配置項和基礎設施鉤子來構建伺服器端和客戶端的 WebSocket 應用程式。以下示例展示了元件如何同時服務於這兩個目的
//Client side
@Bean
public WebSocketClient webSocketClient() {
return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient())));
}
@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint");
}
//Server side
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
return new ServerWebSocketContainer("/endpoint").withSockJs();
}
IntegrationWebSocketContainer
旨在實現雙向訊息傳遞,可以在入站和出站通道介面卡之間共享(見下文);在使用單向(傳送或接收)WebSocket 訊息傳遞時,也可以僅被其中一個引用。它可以在不使用任何通道介面卡的情況下使用,但在此情況下,IntegrationWebSocketContainer
僅充當 WebSocketSession
登錄檔的作用。
ServerWebSocketContainer 實現了 WebSocketConfigurer 以便將內部的 IntegrationWebSocketContainer.IntegrationWebSocketHandler 註冊為 Endpoint 。它在目標供應商 WebSocket Container 的 ServletWebSocketHandlerRegistry 中,根據提供的 paths 和其他伺服器 WebSocket 選項(如 HandshakeHandler 或 SockJS fallback )進行註冊。此註冊是透過基礎設施元件 WebSocketIntegrationConfigurationInitializer 完成的,它執行與 @EnableWebSocket 註解相同的功能。這意味著,透過使用 @EnableIntegration (或應用程式上下文中的任何 Spring Integration 名稱空間),您可以省略 @EnableWebSocket 宣告,因為 Spring Integration 基礎設施會檢測所有 WebSocket 端點。 |
從版本 6.1 開始,可以使用提供的 URI
配置 ClientWebSocketContainer
,而不是使用 uriTemplate
和 uriVariables
的組合。當 URI 的某些部分需要自定義編碼時,這非常有用。請參閱 UriComponentsBuilder
API 以獲取便利。
WebSocket 入站通道介面卡
WebSocketInboundChannelAdapter
實現了 WebSocketSession
互動的接收部分。您必須為其提供一個 IntegrationWebSocketContainer
,介面卡會將自身註冊為 WebSocketListener
,以處理傳入訊息和 WebSocketSession
事件。
在 IntegrationWebSocketContainer 中只能註冊一個 WebSocketListener 。 |
對於 WebSocket 子協議,WebSocketInboundChannelAdapter
可以配置 SubProtocolHandlerRegistry
作為第二個建構函式引數。介面卡將委託給 SubProtocolHandlerRegistry
,以確定接受的 WebSocketSession
適用的 SubProtocolHandler
,並根據子協議實現將 WebSocketMessage
轉換為 Message
。
預設情況下,WebSocketInboundChannelAdapter 僅依賴於原始的 PassThruSubProtocolHandler 實現,該實現將 WebSocketMessage 轉換為 Message 。 |
WebSocketInboundChannelAdapter
只接受併發送給底層整合流(integration flow)擁有 SimpMessageType.MESSAGE
或空的 simpMessageType
頭的 Message
例項。所有其他 Message
型別透過由 SubProtocolHandler
實現(例如 StompSubProtocolHandler
)發出的 ApplicationEvent
例項處理。
在伺服器端,如果存在 @EnableWebSocketMessageBroker
配置,您可以將 WebSocketInboundChannelAdapter
配置為啟用 useBroker = true
選項。在這種情況下,所有 non-MESSAGE
型別的 Message
將被委託給提供的 AbstractBrokerMessageHandler
處理。此外,如果 Broker 中繼(broker relay)配置了目標字首,那些與 Broker 目標匹配的訊息將被路由到 AbstractBrokerMessageHandler
,而不是 WebSocketInboundChannelAdapter
的 outputChannel
。
如果 useBroker = false
且接收到的訊息是 SimpMessageType.CONNECT
型別,WebSocketInboundChannelAdapter
會立即向 WebSocketSession
傳送 SimpMessageType.CONNECT_ACK
訊息,而不會將其傳送到通道。
Spring 的 WebSocket 支援只允許配置一個 Broker 中繼(broker relay)。因此,我們不需要 AbstractBrokerMessageHandler 引用。它會在 Application Context 中被檢測到。 |
有關更多配置選項,請參閱 WebSockets 名稱空間支援。
WebSocket 出站通道介面卡
WebSocketOutboundChannelAdapter
-
接受來自其
MessageChannel
的 Spring Integration 訊息 -
從
MessageHeaders
中確定WebSocketSession
的 id -
從提供的
IntegrationWebSocketContainer
中檢索WebSocketSession
-
將
WebSocketMessage
的轉換和傳送工作委託給提供的SubProtocolHandlerRegistry
中相應的SubProtocolHandler
。
在客戶端,不需要 WebSocketSession
id 訊息頭,因為 ClientWebSocketContainer
只處理單個連線及其對應的 WebSocketSession
。
要使用 STOMP 子協議,您應該使用 StompSubProtocolHandler
配置此介面卡。然後,您可以使用 StompHeaderAccessor.create(StompCommand…)
和 MessageBuilder
,或者僅僅使用 HeaderEnricher
(請參閱 Header Enricher),將任何 STOMP 訊息型別傳送到此介面卡。
本章的其餘部分主要介紹其他配置選項。
WebSockets 名稱空間支援
Spring Integration WebSocket 名稱空間包含本章其餘部分描述的幾個元件。要將其包含在您的配置中,請在您的應用程式上下文配置檔案中使用以下名稱空間宣告
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/websocket
https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">
...
</beans>
<int-websocket:client-container>
屬性
以下列表顯示了 <int-websocket:client-container>
元素的可用屬性
<int-websocket:client-container
id="" (1)
client="" (2)
uri="" (3)
uri-variables="" (4)
origin="" (5)
send-time-limit="" (6)
send-buffer-size-limit="" (7)
send-buffer-overflow-strategy="" (8)
auto-startup="" (9)
phase=""> (10)
<int-websocket:http-headers>
<entry key="" value=""/>
</int-websocket:http-headers> (11)
</int-websocket:client-container>
1 | 元件 bean 名稱。 |
2 | WebSocketClient bean 引用。 |
3 | 目標 WebSocket 服務的 uri 或 uriTemplate 。如果將其用作帶有 URI 變數佔位符的 uriTemplate ,則 uri-variables 屬性是必需的。 |
4 | uri 屬性值中 URI 變數佔位符的逗號分隔值。這些值根據其在 uri 中的順序替換到佔位符中。請參閱 UriComponents.expand(Object…uriVariableValues) 。 |
5 | Origin 握手 HTTP 頭值。 |
6 | WebSocket 會話的“傳送”超時限制。預設為 10000 。 |
7 | WebSocket 會話的“傳送”訊息大小限制。預設為 524288 。 |
8 | WebSocket 會話傳送緩衝區溢位策略,用於確定會話的出站訊息緩衝區達到 send-buffer-size-limit 時的行為。有關可能的值和更多詳細資訊,請參閱 ConcurrentWebSocketSessionDecorator.OverflowStrategy 。 |
9 | 布林值,指示此端點是否應自動啟動。預設為 false ,假定此容器是從 WebSocket 入站介面卡 啟動的。 |
10 | 此端點應在其內部啟動和停止的生命週期階段。值越低,此端點啟動越早,停止越晚。預設為 Integer.MAX_VALUE 。值可以是負數。請參閱 SmartLifeCycle 。 |
11 | 用於握手請求的 HttpHeaders Map。 |
<int-websocket:server-container>
屬性
以下列表顯示了 <int-websocket:server-container>
元素的可用屬性
<int-websocket:server-container
id="" (1)
path="" (2)
handshake-handler="" (3)
handshake-interceptors="" (4)
decorator-factories="" (5)
send-time-limit="" (6)
send-buffer-size-limit="" (7)
send-buffer-overflow-strategy="" (8)
allowed-origins=""> (9)
<int-websocket:sockjs
client-library-url="" (10)
stream-bytes-limit="" (11)
session-cookie-needed="" (12)
heartbeat-time="" (13)
disconnect-delay="" (14)
message-cache-size="" (15)
websocket-enabled="" (16)
scheduler="" (17)
message-codec="" (18)
transport-handlers="" (19)
suppress-cors="true" /> (20)
</int-websocket:server-container>
1 | 元件 bean 名稱。 |
2 | 將特定請求對映到 WebSocketHandler 的路徑(或逗號分隔的路徑)。支援精確路徑對映 URI(例如 /myPath )和 ant 風格的路徑模式(例如 /myPath/** )。 |
3 | HandshakeHandler bean 引用。預設為 DefaultHandshakeHandler 。 |
4 | HandshakeInterceptor bean 引用列表。 |
5 | 一個或多個工廠(WebSocketHandlerDecoratorFactory )的列表,用於裝飾處理 WebSocket 訊息的處理器。這對於某些高階用例可能很有用(例如,允許 Spring Security 在相應的 HTTP 會話過期時強制關閉 WebSocket 會話)。有關更多資訊,請參閱 Spring Session 專案。 |
6 | 請參閱 <int-websocket:client-container> 中的相同選項。 |
7 | 請參閱 <int-websocket:client-container> 中的相同選項。 |
8 | WebSocket 會話傳送緩衝區溢位策略,用於確定會話的出站訊息緩衝區達到 send-buffer-size-limit 時的行為。有關可能的值和更多詳細資訊,請參閱 ConcurrentWebSocketSessionDecorator.OverflowStrategy 。 |
9 | 允許的 origin 頭值。您可以指定多個 origin 作為逗號分隔列表。此檢查主要針對瀏覽器客戶端設計。沒有任何東西能阻止其他型別的客戶端修改 origin 頭值。當啟用 SockJS 並限制允許的 origin 時,不使用 origin 頭進行跨域請求的傳輸型別(jsonp-polling 、iframe-xhr-polling 、iframe-eventsource 和 iframe-htmlfile )將被停用。因此,不支援 IE6 和 IE7,IE8 和 IE9 僅在不使用 cookie 的情況下支援。預設情況下,允許所有 origin。 |
10 | 沒有原生跨域通訊的傳輸方式(例如 eventsource 和 htmlfile )必須在不可見的 iframe 中從“外部”域獲取一個簡單頁面,以便 iframe 中的程式碼可以在 SockJS 伺服器本地的域中執行。由於 iframe 需要載入 SockJS JavaScript 客戶端庫,此屬性允許您指定載入位置。預設情況下,它指向 d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js。但是,您也可以將其設定為指向由應用程式提供的 URL。請注意,可以指定相對 URL,在這種情況下,該 URL 必須相對於 iframe URL。例如,假設 SockJS 端點對映到 /sockjs ,生成的 iframe URL 為 /sockjs/iframe.html ,則相對 URL 必須以 ../../ 開頭,才能向上遍歷到 SockJS 對映上層的位置。對於基於字首的 servlet 對映,可能需要多一層遍歷。 |
11 | 在關閉之前,單個 HTTP 流請求可以傳送的最小位元組數。預設為 128K (即 128*1024 或 131072 位元組)。 |
12 | SockJs /info 端點響應中的 cookie_needed 值。此屬性指示應用程式是否需要 JSESSIONID cookie 才能正常執行(例如,用於負載均衡或 Java Servlet 容器中的 HTTP 會話使用)。 |
13 | 伺服器在未傳送任何訊息後經過一段時間(以毫秒為單位),應向客戶端傳送心跳幀以保持連線不中斷。預設值為 25,000 (25 秒)。 |
14 | 在沒有接收連線(即伺服器可以透過其向客戶端傳送資料的活動連線)後,客戶端被視為斷開連線的時間(以毫秒為單位)。預設值為 5000 。 |
15 | 會話在等待客戶端的下一個 HTTP 輪詢請求時可以快取的伺服器到客戶端訊息的數量。預設大小為 100 。 |
16 | 一些負載均衡器不支援 WebSockets。將此選項設定為 false 可在伺服器端停用 WebSocket 傳輸。預設值為 true 。 |
17 | TaskScheduler bean 引用。如果未提供值,則會建立一個新的 ThreadPoolTaskScheduler 例項。此排程器例項用於排程心跳訊息。 |
18 | SockJsMessageCodec bean 引用,用於編碼和解碼 SockJS 訊息。預設使用 Jackson2SockJsMessageCodec ,這要求 classpath 中存在 Jackson 庫。 |
19 | TransportHandler bean 引用列表。 |
20 | 是否停用為 SockJS 請求自動新增 CORS 頭。預設值為 false 。 |
<int-websocket:outbound-channel-adapter>
屬性
以下列表顯示了 <int-websocket:outbound-channel-adapter>
元素的可用屬性
<int-websocket:outbound-channel-adapter
id="" (1)
channel="" (2)
container="" (3)
default-protocol-handler="" (4)
protocol-handlers="" (5)
message-converters="" (6)
merge-with-default-converters="" (7)
auto-startup="" (8)
phase=""/> (9)
1 | 元件 bean 名稱。如果您未提供 channel 屬性,則會建立一個 DirectChannel 並以 id 屬性作為 bean 名稱註冊到應用程式上下文中。在這種情況下,端點會以 id 加上 .adapter 作為 bean 名稱進行註冊。並且 MessageHandler 會以 id 加上 .handler 作為 bean 別名進行註冊。 |
2 | 標識附加到此介面卡的通道。 |
3 | IntegrationWebSocketContainer bean 的引用,它封裝了底層連線和 WebSocketSession 處理操作。必需。 |
4 | 可選的 SubProtocolHandler 例項引用。當客戶端未請求子協議或只有一個協議處理器時使用。如果未提供此引用或 protocol-handlers 列表,則預設使用 PassThruSubProtocolHandler 。 |
5 | 此通道介面卡的 SubProtocolHandler bean 引用列表。如果您只提供一個 bean 引用且未提供 default-protocol-handler ,則該單個 SubProtocolHandler 將用作 default-protocol-handler 。如果您未設定此屬性或 default-protocol-handler ,則預設使用 PassThruSubProtocolHandler 。 |
6 | 此通道介面卡的 MessageConverter bean 引用列表。 |
7 | 布林值,指示是否應在任何自定義轉換器之後註冊預設轉換器。僅在提供了 message-converters 時使用此標誌。否則,將註冊所有預設轉換器。預設為 false 。預設轉換器按順序為:StringMessageConverter 、ByteArrayMessageConverter 和 MappingJackson2MessageConverter (如果 classpath 中存在 Jackson 庫)。 |
8 | 布林值,指示此端點是否應自動啟動。預設為 true 。 |
9 | 此端點應在其內部啟動和停止的生命週期階段。值越低,此端點啟動越早,停止越晚。預設為 Integer.MIN_VALUE 。值可以是負數。請參閱 SmartLifeCycle 。 |
<int-websocket:inbound-channel-adapter>
屬性
以下列表顯示了 <int-websocket:outbound-channel-adapter>
元素的可用屬性
<int-websocket:inbound-channel-adapter
id="" (1)
channel="" (2)
error-channel="" (3)
container="" (4)
default-protocol-handler="" (5)
protocol-handlers="" (6)
message-converters="" (7)
merge-with-default-converters="" (8)
send-timeout="" (9)
payload-type="" (10)
use-broker="" (11)
auto-startup="" (12)
phase=""/> (13)
1 | 元件 bean 名稱。如果您未設定 channel 屬性,則會建立一個 DirectChannel 並以 id 屬性作為 bean 名稱註冊到應用程式上下文中。在這種情況下,端點會以 id 加上 .adapter 作為 bean 名稱進行註冊。 |
2 | 標識附加到此介面卡的通道。 |
3 | ErrorMessage 例項應傳送到的 MessageChannel bean 引用。 |
4 | 請參閱 <int-websocket:outbound-channel-adapter> 中的相同選項。 |
5 | 請參閱 <int-websocket:outbound-channel-adapter> 中的相同選項。 |
6 | 請參閱 <int-websocket:outbound-channel-adapter> 中的相同選項。 |
7 | 請參閱 <int-websocket:outbound-channel-adapter> 中的相同選項。 |
8 | 請參閱 <int-websocket:outbound-channel-adapter> 中的相同選項。 |
9 | 如果通道可能阻塞,傳送訊息到通道時等待的最長時間(以毫秒為單位)。例如,如果 QueueChannel 的最大容量已滿,它可能會阻塞直到有空間可用。 |
10 | 從傳入的 WebSocketMessage 轉換的目標載荷(payload)的 Java 型別完全限定名。預設為 java.lang.String 。 |
11 | 指示此介面卡是否將 non-MESSAGE 型別的 WebSocketMessage 例項和具有 Broker 目標的訊息傳送到應用程式上下文中的 AbstractBrokerMessageHandler 。當此屬性為 true 時,需要 Broker 中繼(Broker Relay)配置。此屬性僅在伺服器端使用。在客戶端,此屬性被忽略。預設為 false 。 |
12 | 請參閱 <int-websocket:outbound-channel-adapter> 中的相同選項。 |
13 | 請參閱 <int-websocket:outbound-channel-adapter> 中的相同選項。 |
使用 ClientStompEncoder
從版本 4.3.13 開始,Spring Integration 提供了 ClientStompEncoder
(作為標準 StompEncoder
的擴充套件),用於 WebSocket 通道介面卡的客戶端。為了正確進行客戶端訊息準備,您必須將 ClientStompEncoder
的例項注入到 StompSubProtocolHandler
中。預設的 StompSubProtocolHandler
有一個問題,它是為伺服器端設計的,因此它會將 SEND
stompCommand
頭更新為 MESSAGE
(這是 STOMP 協議對伺服器端的要求)。如果客戶端不以正確的 SEND
web socket 幀傳送其訊息,某些 STOMP Broker 不會接受它們。在這種情況下,ClientStompEncoder
的作用是在將訊息編碼為 byte[]
之前,覆蓋 stompCommand
頭並將其設定為 SEND
值。
動態 WebSocket 端點註冊
從版本 5.5 開始,WebSocket 伺服器端點(基於 ServerWebSocketContainer
的通道介面卡)現在可以在執行時註冊(和移除)— ServerWebSocketContainer
對映的 paths
透過 HandlerMapping
暴露到 DispatcherServlet
中,並對 WebSocket 客戶端可見。動態和執行時整合流 支援有助於以透明的方式註冊這些端點
@Autowired
IntegrationFlowContext integrationFlowContext;
@Autowired
HandshakeHandler handshakeHandler;
...
ServerWebSocketContainer serverWebSocketContainer =
new ServerWebSocketContainer("/dynamic")
.setHandshakeHandler(this.handshakeHandler);
WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
new WebSocketInboundChannelAdapter(serverWebSocketContainer);
QueueChannel dynamicRequestsChannel = new QueueChannel();
IntegrationFlow serverFlow =
IntegrationFlow.from(webSocketInboundChannelAdapter)
.channel(dynamicRequestsChannel)
.get();
IntegrationFlowContext.IntegrationFlowRegistration dynamicServerFlow =
this.integrationFlowContext.registration(serverFlow)
.addBean(serverWebSocketContainer)
.register();
...
dynamicServerFlow.destroy();
在動態流注冊上呼叫 .addBean(serverWebSocketContainer) 非常重要,以便將 ServerWebSocketContainer 例項新增到 ApplicationContext 中用於端點註冊。當動態流注冊被銷燬時,相關的 ServerWebSocketContainer 例項也會被銷燬,相應的端點註冊(包括 URL 路徑對映)也是如此。 |
動態 Websocket 端點只能透過 Spring Integration 機制註冊:當使用常規的 Spring @EnableWebsocket 時,Spring Integration 配置會退出,並且不會註冊用於動態端點的基礎設施。 |