WebSockets 支援
從 4.1 版本開始,Spring Integration 支援 WebSocket。它基於 Spring Framework 的 web-socket 模組的架構、基礎設施和 API。因此,許多 Spring WebSocket 元件(如 SubProtocolHandler 或 WebSocketClient)和配置選項(如 @EnableWebSocketMessageBroker)可以在 Spring Integration 中重用。有關更多資訊,請參閱 Spring Framework 參考手冊中的 Spring Framework WebSocket 支援 章。
專案需要此依賴項
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-websocket</artifactId>
<version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-websocket:7.0.0"
對於伺服器端,必須顯式包含 org.springframework:spring-webmvc 依賴項。
Spring Framework WebSocket 基礎設施基於 Spring 訊息傳遞基礎,並提供了一個基於 Spring Integration 使用的相同 MessageChannel 實現和 MessageHandler 實現(以及一些 POJO 方法註解對映)的基本訊息傳遞框架。因此,Spring Integration 可以直接參與 WebSocket 流,即使沒有 WebSocket 介面卡。為此,您可以配置一個帶有適當註解的 Spring Integration @MessagingGateway,如以下示例所示
@MessagingGateway
@Controller
public interface WebSocketGateway {
@MessageMapping("/greeting")
@SendToUser("/queue/answer")
@Gateway(requestChannel = "greetingChannel")
String greeting(String payload);
}
概述
由於 WebSocket 協議本質上是流式傳輸的,我們可以同時向 WebSocket 傳送和接收訊息,因此我們可以處理適當的 WebSocketSession,無論是在客戶端還是伺服器端。為了封裝連線管理和 WebSocketSession 登錄檔,提供了 IntegrationWebSocketContainer,其中包含 ClientWebSocketContainer 和 ServerWebSocketContainer 實現。得益於 WebSocket API 及其在 Spring Framework 中的實現(具有許多擴充套件),伺服器端和客戶端都使用相同的類(當然,從 Java 的角度來看)。因此,大多數連線和 WebSocketSession 登錄檔選項在兩端都是相同的。這使我們能夠重用許多配置項和基礎設施鉤子來在伺服器端和客戶端構建 WebSocket 應用程式。以下示例顯示了元件如何同時服務於這兩個目的
//Client side
@Bean
public WebSocketClient webSocketClient() {
return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient())));
}
@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint");
}
//Server side
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
return new ServerWebSocketContainer("/endpoint").withSockJs();
}
IntegrationWebSocketContainer 旨在實現雙向訊息傳遞,並且可以在入站和出站通道介面卡之間共享(見下文),在使用單向(傳送或接收)WebSocket 訊息傳遞時,只能由其中一個引用。它可以在沒有任何通道介面卡的情況下使用,但在這種情況下,IntegrationWebSocketContainer 僅充當 WebSocketSession 登錄檔。
ServerWebSocketContainer 實現了 WebSocketConfigurer,用於將內部 IntegrationWebSocketContainer.IntegrationWebSocketHandler 註冊為 Endpoint。它在所提供的 paths 和其他伺服器 WebSocket 選項(如 HandshakeHandler 或 SockJS 回退)下,在目標供應商 WebSocket 容器的 ServletWebSocketHandlerRegistry 中進行此操作。此註冊是透過基礎設施 WebSocketIntegrationConfigurationInitializer 元件實現的,其功能與 @EnableWebSocket 註解相同。這意味著,透過使用 @EnableIntegration(或應用程式上下文中的任何 Spring Integration 名稱空間),您可以省略 @EnableWebSocket 宣告,因為 Spring Integration 基礎設施會檢測所有 WebSocket 端點。 |
從 6.1 版本開始,ClientWebSocketContainer 可以使用提供的 URI 進行配置,而不是 uriTemplate 和 uriVariables 組合。這在 URI 的某些部分需要自定義編碼的情況下非常有用。請參閱 UriComponentsBuilder API 以瞭解方便性。
WebSocket 入站通道介面卡
WebSocketInboundChannelAdapter 實現了 WebSocketSession 互動的接收部分。您必須為其提供一個 IntegrationWebSocketContainer,並且介面卡將自身註冊為 WebSocketListener 以處理傳入訊息和 WebSocketSession 事件。
在 IntegrationWebSocketContainer 中只能註冊一個 WebSocketListener。 |
對於 WebSocket 子協議,WebSocketInboundChannelAdapter 可以配置 SubProtocolHandlerRegistry 作為第二個建構函式引數。介面卡委託給 SubProtocolHandlerRegistry 來確定已接受的 WebSocketSession 的適當 SubProtocolHandler,並根據子協議實現將 WebSocketMessage 轉換為 Message。
預設情況下,WebSocketInboundChannelAdapter 僅依賴於原始的 PassThruSubProtocolHandler 實現,該實現將 WebSocketMessage 轉換為 Message。 |
WebSocketInboundChannelAdapter 僅接受併發送到底層整合流中具有 SimpMessageType.MESSAGE 或空 simpMessageType 頭的 Message 例項。所有其他 Message 型別都透過從 SubProtocolHandler 實現(例如 StompSubProtocolHandler)發出的 ApplicationEvent 例項處理。
在伺服器端,如果存在 @EnableWebSocketMessageBroker 配置,您可以使用 useBroker = true 選項配置 WebSocketInboundChannelAdapter。在這種情況下,所有 non-MESSAGE Message 型別都委託給提供的 AbstractBrokerMessageHandler。此外,如果代理中繼配置了目標字首,則與代理目標匹配的訊息將路由到 AbstractBrokerMessageHandler,而不是 WebSocketInboundChannelAdapter 的 outputChannel。
如果 useBroker = false 並且接收到的訊息是 SimpMessageType.CONNECT 型別,則 WebSocketInboundChannelAdapter 會立即向 WebSocketSession 傳送 SimpMessageType.CONNECT_ACK 訊息,而不會將其傳送到通道。
Spring 的 WebSocket 支援只允許配置一個代理中繼。因此,我們不需要 AbstractBrokerMessageHandler 引用。它會在 Application Context 中檢測到。 |
有關更多配置選項,請參閱 WebSocket 名稱空間支援。
WebSocket 出站通道介面卡
WebSocketOutboundChannelAdapter
-
從其
MessageChannel接受 Spring Integration 訊息 -
從
MessageHeaders中確定WebSocketSession的id -
從提供的
IntegrationWebSocketContainer中檢索WebSocketSession -
將
WebSocketMessage的轉換和傳送工作委託給提供的SubProtocolHandlerRegistry中適當的SubProtocolHandler。
在客戶端,不需要 WebSocketSession 的 id 訊息頭,因為 ClientWebSocketContainer 只處理單個連線及其 WebSocketSession。
要使用 STOMP 子協議,您應該使用 StompSubProtocolHandler 配置此介面卡。然後,您可以使用 StompHeaderAccessor.create(StompCommand…) 和 MessageBuilder 向此介面卡傳送任何 STOMP 訊息型別,或者只使用 HeaderEnricher(請參閱 Header Enricher)。
本章的其餘部分主要介紹附加配置選項。
WebSocket 名稱空間支援
Spring Integration WebSocket 名稱空間包含本章其餘部分描述的幾個元件。要將其包含在您的配置中,請在您的應用程式上下文配置檔案中使用以下名稱空間宣告
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/websocket
https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">
...
</beans>
<int-websocket:client-container> 屬性
以下列表顯示了 <int-websocket:client-container> 元素可用的屬性
<int-websocket:client-container
id="" (1)
client="" (2)
uri="" (3)
uri-variables="" (4)
origin="" (5)
send-time-limit="" (6)
send-buffer-size-limit="" (7)
send-buffer-overflow-strategy="" (8)
auto-startup="" (9)
phase=""> (10)
<int-websocket:http-headers>
<entry key="" value=""/>
</int-websocket:http-headers> (11)
</int-websocket:client-container>
| 1 | 元件 bean 名稱。 |
| 2 | WebSocketClient bean 引用。 |
| 3 | 目標 WebSocket 服務的 uri 或 uriTemplate。如果將其用作帶有 URI 變數佔位符的 uriTemplate,則需要 uri-variables 屬性。 |
| 4 | uri 屬性值中 URI 變數佔位符的逗號分隔值。這些值根據其在 uri 中的順序替換到佔位符中。請參閱 UriComponents.expand(Object…uriVariableValues)。 |
| 5 | Origin 握手 HTTP 標頭值。 |
| 6 | WebSocket 會話 send 超時限制。預設為 10000。 |
| 7 | WebSocket 會話 send 訊息大小限制。預設為 524288。 |
| 8 | 當會話的出站訊息緩衝區達到 send-buffer-size-limit 時,WebSocket 會話傳送緩衝區溢位策略決定了其行為。有關可能的值和更多詳細資訊,請參見 ConcurrentWebSocketSessionDecorator.OverflowStrategy。 |
| 9 | 布林值,指示此端點是否應自動啟動。預設為 false,假設此容器是從 WebSocket 入站介面卡 啟動的。 |
| 10 | 此端點應在其內部啟動和停止的生命週期階段。值越低,此端點啟動越早,停止越晚。預設值為 Integer.MAX_VALUE。值可以為負數。請參閱 SmartLifeCycle。 |
| 11 | 用於握手請求的 HttpHeaders 的 Map。 |
<int-websocket:server-container> 屬性
以下列表顯示了 <int-websocket:server-container> 元素可用的屬性
<int-websocket:server-container
id="" (1)
path="" (2)
handshake-handler="" (3)
handshake-interceptors="" (4)
decorator-factories="" (5)
send-time-limit="" (6)
send-buffer-size-limit="" (7)
send-buffer-overflow-strategy="" (8)
allowed-origins=""> (9)
<int-websocket:sockjs
client-library-url="" (10)
stream-bytes-limit="" (11)
session-cookie-needed="" (12)
heartbeat-time="" (13)
disconnect-delay="" (14)
message-cache-size="" (15)
websocket-enabled="" (16)
scheduler="" (17)
message-codec="" (18)
transport-handlers="" (19)
suppress-cors="true" /> (20)
</int-websocket:server-container>
| 1 | 元件 bean 名稱。 |
| 2 | 一個路徑(或逗號分隔的路徑),它將特定請求對映到 WebSocketHandler。支援精確路徑對映 URI(例如 /myPath)和 ant 風格路徑模式(例如 /myPath/**)。 |
| 3 | HandshakeHandler bean 引用。預設為 DefaultHandshakeHandler。 |
| 4 | HandshakeInterceptor bean 引用的列表。 |
| 5 | 一個或多個工廠(WebSocketHandlerDecoratorFactory)的列表,用於裝飾處理 WebSocket 訊息的處理程式。這對於某些高階用例可能很有用(例如,允許 Spring Security 在相應的 HTTP 會話過期時強制關閉 WebSocket 會話)。有關更多資訊,請參閱 Spring Session Project。 |
| 6 | 請參閱 <int-websocket:client-container> 上的相同選項。 |
| 7 | 請參閱 <int-websocket:client-container> 上的相同選項。 |
| 8 | 當會話的出站訊息緩衝區達到 send-buffer-size-limit 時,WebSocket 會話傳送緩衝區溢位策略決定了其行為。有關可能的值和更多詳細資訊,請參見 ConcurrentWebSocketSessionDecorator.OverflowStrategy。 |
| 9 | 允許的源頭部值。您可以將多個源指定為逗號分隔列表。此檢查主要用於瀏覽器客戶端。沒有任何東西可以阻止其他型別的客戶端修改源頭部值。當啟用 SockJS 並且允許的源受到限制時,不使用源頭部進行跨域請求的傳輸型別(jsonp-polling、iframe-xhr-polling、iframe-eventsource 和 iframe-htmlfile)將被停用。因此,不支援 IE6 和 IE7,IE8 和 IE9 僅在沒有 cookie 的情況下受支援。預設情況下,允許所有源。 |
| 10 | 沒有原生跨域通訊的傳輸(例如 eventsource 和 htmlfile)必須從“外部”域獲取一個簡單頁面到不可見的 iframe 中,以便 iframe 中的程式碼可以從 SockJS 伺服器的本地域執行。由於 iframe 需要載入 SockJS JavaScript 客戶端庫,此屬性允許您指定從何處載入它。預設情況下,它指向 d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js。但是,您也可以將其設定為指向應用程式提供的 URL。請注意,可以指定相對 URL,在這種情況下,URL 必須相對於 iframe URL。例如,假設 SockJS 端點對映到 /sockjs 且生成的 iframe URL 是 /sockjs/iframe.html,則相對 URL 必須以 ../../ 開頭才能遍歷到 SockJS 對映上方的位置。對於基於字首的 servlet 對映,您可能需要多一個遍歷。 |
| 11 | 在單個 HTTP 流請求關閉之前可以傳送的最小位元組數。預設為 128K(即 128*1024 或 131072 位元組)。 |
| 12 | SockJs /info 端點響應中的 cookie_needed 值。此屬性指示應用程式是否需要 JSESSIONID cookie 才能正常執行(例如,用於負載均衡或 Java Servlet 容器中用於 HTTP 會話)。 |
| 13 | 伺服器沒有傳送任何訊息後,伺服器應該向客戶端傳送心跳幀以保持連線不中斷的時間(以毫秒為單位)。預設值為 25,000(25 秒)。 |
| 14 | 在客戶端被認為斷開連線之前(即沒有接收連線的情況下,伺服器可以透過該連線向客戶端傳送資料),客戶端沒有接收連線的時間(以毫秒為單位)。預設值為 5000。 |
| 15 | 會話在等待客戶端的下一個 HTTP 輪詢請求時可以快取的伺服器到客戶端訊息的數量。預設大小為 100。 |
| 16 | 一些負載均衡器不支援 WebSocket。將此選項設定為 false 可在伺服器端停用 WebSocket 傳輸。預設值為 true。 |
| 17 | TaskScheduler bean 引用。如果沒有提供值,則會建立一個新的 ThreadPoolTaskScheduler 例項。此排程程式例項用於排程心跳訊息。 |
| 18 | 用於編碼和解碼 SockJS 訊息的 SockJsMessageCodec bean 引用。預設情況下,使用 Jackson2SockJsMessageCodec,這要求 classpath 中存在 Jackson 庫。 |
| 19 | TransportHandler bean 引用的列表。 |
| 20 | 是否停用 SockJS 請求自動新增 CORS 標頭。預設值為 false。 |
<int-websocket:outbound-channel-adapter> 屬性
以下列表顯示了 <int-websocket:outbound-channel-adapter> 元素可用的屬性
<int-websocket:outbound-channel-adapter
id="" (1)
channel="" (2)
container="" (3)
default-protocol-handler="" (4)
protocol-handlers="" (5)
message-converters="" (6)
merge-with-default-converters="" (7)
auto-startup="" (8)
phase=""/> (9)
| 1 | 元件 bean 名稱。如果您不提供 channel 屬性,則會建立一個 DirectChannel,並使用此 id 屬性作為 bean 名稱在應用程式上下文中註冊。在這種情況下,端點將以 bean 名稱 id 加 .adapter 註冊。並且 MessageHandler 將以 bean 別名 id 加 .handler 註冊。 |
| 2 | 標識附加到此介面卡的通道。 |
| 3 | 對 IntegrationWebSocketContainer bean 的引用,它封裝了低階連線和 WebSocketSession 處理操作。必需。 |
| 4 | 對 SubProtocolHandler 例項的可選引用。當客戶端未請求子協議或它是單個協議處理程式時使用。如果未提供此引用或 protocol-handlers 列表,則預設使用 PassThruSubProtocolHandler。 |
| 5 | 此通道介面卡的 SubProtocolHandler bean 引用列表。如果您只提供一個 bean 引用並且不提供 default-protocol-handler,則該單個 SubProtocolHandler 將用作 default-protocol-handler。如果您未設定此屬性或 default-protocol-handler,則預設使用 PassThruSubProtocolHandler。 |
| 6 | 此通道介面卡的 MessageConverter bean 引用列表。 |
| 7 | 布林值,指示是否應在任何自定義轉換器之後註冊預設轉換器。此標誌僅在提供 message-converters 時使用。否則,所有預設轉換器都將註冊。預設為 false。預設轉換器(按順序)是:StringMessageConverter、ByteArrayMessageConverter 和 MappingJackson2MessageConverter(如果 classpath 中存在 Jackson 庫)。 |
| 8 | 布林值,指示此端點是否應自動啟動。預設為 true。 |
| 9 | 此端點應在其內部啟動和停止的生命週期階段。值越低,此端點啟動越早,停止越晚。預設值為 Integer.MIN_VALUE。值可以為負數。請參閱 SmartLifeCycle。 |
<int-websocket:inbound-channel-adapter> 屬性
以下列表顯示了 <int-websocket:outbound-channel-adapter> 元素可用的屬性
<int-websocket:inbound-channel-adapter
id="" (1)
channel="" (2)
error-channel="" (3)
container="" (4)
default-protocol-handler="" (5)
protocol-handlers="" (6)
message-converters="" (7)
merge-with-default-converters="" (8)
send-timeout="" (9)
payload-type="" (10)
use-broker="" (11)
auto-startup="" (12)
phase=""/> (13)
| 1 | 元件 bean 名稱。如果您未設定 channel 屬性,則會建立一個 DirectChannel,並以此 id 屬性作為 bean 名稱在應用程式上下文中註冊。在這種情況下,端點將以 bean 名稱 id 加 .adapter 註冊。 |
| 2 | 標識附加到此介面卡的通道。 |
| 3 | 應向其傳送 ErrorMessage 例項的 MessageChannel bean 引用。 |
| 4 | 請參閱 <int-websocket:outbound-channel-adapter> 上的相同選項。 |
| 5 | 請參閱 <int-websocket:outbound-channel-adapter> 上的相同選項。 |
| 6 | 請參閱 <int-websocket:outbound-channel-adapter> 上的相同選項。 |
| 7 | 請參閱 <int-websocket:outbound-channel-adapter> 上的相同選項。 |
| 8 | 請參閱 <int-websocket:outbound-channel-adapter> 上的相同選項。 |
| 9 | 如果通道可能阻塞,則在向通道傳送訊息時等待的最長時間(以毫秒為單位)。例如,如果 QueueChannel 達到其最大容量,它可能會阻塞直到有可用空間。 |
| 10 | 目標 payload 的 Java 型別的完全限定名稱,用於從傳入的 WebSocketMessage 進行轉換。預設為 java.lang.String。 |
| 11 | 指示此介面卡是否將 non-MESSAGE WebSocketMessage 例項和具有代理目標的訊息傳送到應用程式上下文中的 AbstractBrokerMessageHandler。當此屬性為 true 時,需要 Broker Relay 配置。此屬性僅在伺服器端使用。在客戶端,它被忽略。預設為 false。 |
| 12 | 請參閱 <int-websocket:outbound-channel-adapter> 上的相同選項。 |
| 13 | 請參閱 <int-websocket:outbound-channel-adapter> 上的相同選項。 |
使用 ClientStompEncoder
從 4.3.13 版本開始,Spring Integration 提供了 ClientStompEncoder(作為標準 StompEncoder 的擴充套件),用於 WebSocket 通道介面卡的客戶端。為了正確準備客戶端訊息,您必須將 ClientStompEncoder 例項注入到 StompSubProtocolHandler 中。預設 StompSubProtocolHandler 的一個問題是它最初是為伺服器端設計的,因此它將 SEND stompCommand 頭部更新為 MESSAGE(STOMP 協議對伺服器端的要求)。如果客戶端沒有在正確的 SEND WebSocket 幀中傳送其訊息,某些 STOMP 代理將不接受它們。在這種情況下,ClientStompEncoder 的目的是覆蓋 stompCommand 頭部並將其設定為 SEND 值,然後再將訊息編碼為 byte[]。
動態 WebSocket 端點註冊
從 5.5 版本開始,WebSocket 伺服器端點(基於 ServerWebSocketContainer 的通道介面卡)現在可以在執行時註冊(和刪除)—— ServerWebSocketContainer 對映的 paths 透過 HandlerMapping 暴露到 DispatcherServlet 中,並可供 WebSocket 客戶端訪問。動態和執行時整合流 支援有助於以透明方式註冊這些端點
@Autowired
IntegrationFlowContext integrationFlowContext;
@Autowired
HandshakeHandler handshakeHandler;
...
ServerWebSocketContainer serverWebSocketContainer =
new ServerWebSocketContainer("/dynamic")
.setHandshakeHandler(this.handshakeHandler);
WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
new WebSocketInboundChannelAdapter(serverWebSocketContainer);
QueueChannel dynamicRequestsChannel = new QueueChannel();
IntegrationFlow serverFlow =
IntegrationFlow.from(webSocketInboundChannelAdapter)
.channel(dynamicRequestsChannel)
.get();
IntegrationFlowContext.IntegrationFlowRegistration dynamicServerFlow =
this.integrationFlowContext.registration(serverFlow)
.addBean(serverWebSocketContainer)
.register();
...
dynamicServerFlow.destroy();
在動態流注冊上呼叫 .addBean(serverWebSocketContainer) 以將 ServerWebSocketContainer 例項新增到 ApplicationContext 以進行端點註冊非常重要。當動態流注冊被銷燬時,相關的 ServerWebSocketContainer 例項也會被銷燬,以及相應的端點註冊,包括 URL 路徑對映。 |
動態 WebSocket 端點只能透過 Spring Integration 機制註冊:當使用常規 Spring @EnableWebsocket 時,Spring Integration 配置會回退,並且不會註冊用於動態端點的基礎設施。 |