WebSocket 支援

從版本 4.1 開始,Spring Integration 支援 WebSocket。它基於 Spring Framework 的 web-socket 模組的架構、基礎設施和 API。因此,許多 Spring WebSocket 的元件(例如 SubProtocolHandlerWebSocketClient)和配置選項(例如 @EnableWebSocketMessageBroker)可以在 Spring Integration 中重用。更多資訊請參閱 Spring Framework WebSocket 支援 一章在 Spring Framework 參考手冊中。

您需要在專案中包含此依賴項

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-websocket</artifactId>
    <version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-websocket:6.4.4"

對於伺服器端,必須顯式包含 org.springframework:spring-webmvc 依賴項。

Spring Framework 的 WebSocket 基礎設施基於 Spring 訊息基礎構建,並提供了一個基本的訊息框架,該框架基於 Spring Integration 使用的相同的 MessageChannel 實現和 MessageHandler 實現(以及一些 POJO 方法的註解對映)。因此,Spring Integration 可以直接參與 WebSocket 流,即使沒有 WebSocket 介面卡。為此,您可以使用適當的註解配置 Spring Integration @MessagingGateway,如下例所示

@MessagingGateway
@Controller
public interface WebSocketGateway {

    @MessageMapping("/greeting")
    @SendToUser("/queue/answer")
    @Gateway(requestChannel = "greetingChannel")
    String greeting(String payload);

}

概覽

由於 WebSocket 協議本質上是流式的,並且我們可以同時向 WebSocket 傳送和接收訊息,因此無論是在客戶端還是伺服器端,我們都可以處理適當的 WebSocketSession。為了封裝連線管理和 WebSocketSession 註冊,提供了 IntegrationWebSocketContainer,其包含 ClientWebSocketContainerServerWebSocketContainer 實現。得益於 WebSocket API 及其在 Spring Framework 中的實現(包含許多擴充套件),伺服器端和客戶端(當然,從 Java 的角度來看)都使用相同的類。因此,大多數連線和 WebSocketSession 註冊選項在兩端是相同的。這使我們可以重用許多配置項和基礎設施鉤子來構建伺服器端和客戶端的 WebSocket 應用程式。以下示例展示了元件如何同時服務於這兩個目的

//Client side
@Bean
public WebSocketClient webSocketClient() {
    return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient())));
}

@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
    return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint");
}

//Server side
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
    return new ServerWebSocketContainer("/endpoint").withSockJs();
}

IntegrationWebSocketContainer 旨在實現雙向訊息傳遞,可以在入站和出站通道介面卡之間共享(見下文);在使用單向(傳送或接收)WebSocket 訊息傳遞時,也可以僅被其中一個引用。它可以在不使用任何通道介面卡的情況下使用,但在此情況下,IntegrationWebSocketContainer 僅充當 WebSocketSession 登錄檔的作用。

ServerWebSocketContainer 實現了 WebSocketConfigurer 以便將內部的 IntegrationWebSocketContainer.IntegrationWebSocketHandler 註冊為 Endpoint。它在目標供應商 WebSocket Container 的 ServletWebSocketHandlerRegistry 中,根據提供的 paths 和其他伺服器 WebSocket 選項(如 HandshakeHandlerSockJS fallback)進行註冊。此註冊是透過基礎設施元件 WebSocketIntegrationConfigurationInitializer 完成的,它執行與 @EnableWebSocket 註解相同的功能。這意味著,透過使用 @EnableIntegration(或應用程式上下文中的任何 Spring Integration 名稱空間),您可以省略 @EnableWebSocket 宣告,因為 Spring Integration 基礎設施會檢測所有 WebSocket 端點。

從版本 6.1 開始,可以使用提供的 URI 配置 ClientWebSocketContainer,而不是使用 uriTemplateuriVariables 的組合。當 URI 的某些部分需要自定義編碼時,這非常有用。請參閱 UriComponentsBuilder API 以獲取便利。

WebSocket 入站通道介面卡

WebSocketInboundChannelAdapter 實現了 WebSocketSession 互動的接收部分。您必須為其提供一個 IntegrationWebSocketContainer,介面卡會將自身註冊為 WebSocketListener,以處理傳入訊息和 WebSocketSession 事件。

IntegrationWebSocketContainer 中只能註冊一個 WebSocketListener

對於 WebSocket 子協議,WebSocketInboundChannelAdapter 可以配置 SubProtocolHandlerRegistry 作為第二個建構函式引數。介面卡將委託給 SubProtocolHandlerRegistry,以確定接受的 WebSocketSession 適用的 SubProtocolHandler,並根據子協議實現將 WebSocketMessage 轉換為 Message

預設情況下,WebSocketInboundChannelAdapter 僅依賴於原始的 PassThruSubProtocolHandler 實現,該實現將 WebSocketMessage 轉換為 Message

WebSocketInboundChannelAdapter 只接受併發送給底層整合流(integration flow)擁有 SimpMessageType.MESSAGE 或空的 simpMessageType 頭的 Message 例項。所有其他 Message 型別透過由 SubProtocolHandler 實現(例如 StompSubProtocolHandler)發出的 ApplicationEvent 例項處理。

在伺服器端,如果存在 @EnableWebSocketMessageBroker 配置,您可以將 WebSocketInboundChannelAdapter 配置為啟用 useBroker = true 選項。在這種情況下,所有 non-MESSAGE 型別的 Message 將被委託給提供的 AbstractBrokerMessageHandler 處理。此外,如果 Broker 中繼(broker relay)配置了目標字首,那些與 Broker 目標匹配的訊息將被路由到 AbstractBrokerMessageHandler,而不是 WebSocketInboundChannelAdapteroutputChannel

如果 useBroker = false 且接收到的訊息是 SimpMessageType.CONNECT 型別,WebSocketInboundChannelAdapter 會立即向 WebSocketSession 傳送 SimpMessageType.CONNECT_ACK 訊息,而不會將其傳送到通道。

Spring 的 WebSocket 支援只允許配置一個 Broker 中繼(broker relay)。因此,我們不需要 AbstractBrokerMessageHandler 引用。它會在 Application Context 中被檢測到。

有關更多配置選項,請參閱 WebSockets 名稱空間支援

WebSocket 出站通道介面卡

WebSocketOutboundChannelAdapter

  1. 接受來自其 MessageChannel 的 Spring Integration 訊息

  2. MessageHeaders 中確定 WebSocketSession 的 id

  3. 從提供的 IntegrationWebSocketContainer 中檢索 WebSocketSession

  4. WebSocketMessage 的轉換和傳送工作委託給提供的 SubProtocolHandlerRegistry 中相應的 SubProtocolHandler

在客戶端,不需要 WebSocketSession id 訊息頭,因為 ClientWebSocketContainer 只處理單個連線及其對應的 WebSocketSession

要使用 STOMP 子協議,您應該使用 StompSubProtocolHandler 配置此介面卡。然後,您可以使用 StompHeaderAccessor.create(StompCommand…​)MessageBuilder,或者僅僅使用 HeaderEnricher(請參閱 Header Enricher),將任何 STOMP 訊息型別傳送到此介面卡。

本章的其餘部分主要介紹其他配置選項。

WebSockets 名稱空間支援

Spring Integration WebSocket 名稱空間包含本章其餘部分描述的幾個元件。要將其包含在您的配置中,請在您的應用程式上下文配置檔案中使用以下名稱空間宣告

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/websocket
    https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">
    ...
</beans>

<int-websocket:client-container> 屬性

以下列表顯示了 <int-websocket:client-container> 元素的可用屬性

<int-websocket:client-container
                  id=""                             (1)
                  client=""                         (2)
                  uri=""                            (3)
                  uri-variables=""                  (4)
                  origin=""                         (5)
                  send-time-limit=""                (6)
                  send-buffer-size-limit=""         (7)
                  send-buffer-overflow-strategy=""  (8)
                  auto-startup=""                   (9)
                  phase="">                        (10)
                <int-websocket:http-headers>
                  <entry key="" value=""/>
                </int-websocket:http-headers>      (11)
</int-websocket:client-container>
1 元件 bean 名稱。
2 WebSocketClient bean 引用。
3 目標 WebSocket 服務的 uriuriTemplate。如果將其用作帶有 URI 變數佔位符的 uriTemplate,則 uri-variables 屬性是必需的。
4 uri 屬性值中 URI 變數佔位符的逗號分隔值。這些值根據其在 uri 中的順序替換到佔位符中。請參閱 UriComponents.expand(Object…​uriVariableValues)
5 Origin 握手 HTTP 頭值。
6 WebSocket 會話的“傳送”超時限制。預設為 10000
7 WebSocket 會話的“傳送”訊息大小限制。預設為 524288
8 WebSocket 會話傳送緩衝區溢位策略,用於確定會話的出站訊息緩衝區達到 send-buffer-size-limit 時的行為。有關可能的值和更多詳細資訊,請參閱 ConcurrentWebSocketSessionDecorator.OverflowStrategy
9 布林值,指示此端點是否應自動啟動。預設為 false,假定此容器是從 WebSocket 入站介面卡 啟動的。
10 此端點應在其內部啟動和停止的生命週期階段。值越低,此端點啟動越早,停止越晚。預設為 Integer.MAX_VALUE。值可以是負數。請參閱 SmartLifeCycle
11 用於握手請求的 HttpHeaders Map。

<int-websocket:server-container> 屬性

以下列表顯示了 <int-websocket:server-container> 元素的可用屬性

<int-websocket:server-container
          id=""                             (1)
          path=""                           (2)
          handshake-handler=""              (3)
          handshake-interceptors=""         (4)
          decorator-factories=""            (5)
          send-time-limit=""                (6)
          send-buffer-size-limit=""         (7)
          send-buffer-overflow-strategy=""  (8)
          allowed-origins="">               (9)
          <int-websocket:sockjs
            client-library-url=""          (10)
            stream-bytes-limit=""          (11)
            session-cookie-needed=""       (12)
            heartbeat-time=""              (13)
            disconnect-delay=""            (14)
            message-cache-size=""          (15)
            websocket-enabled=""           (16)
            scheduler=""                   (17)
            message-codec=""               (18)
            transport-handlers=""          (19)
            suppress-cors="true" />        (20)
</int-websocket:server-container>
1 元件 bean 名稱。
2 將特定請求對映到 WebSocketHandler 的路徑(或逗號分隔的路徑)。支援精確路徑對映 URI(例如 /myPath)和 ant 風格的路徑模式(例如 /myPath/**)。
3 HandshakeHandler bean 引用。預設為 DefaultHandshakeHandler
4 HandshakeInterceptor bean 引用列表。
5 一個或多個工廠(WebSocketHandlerDecoratorFactory)的列表,用於裝飾處理 WebSocket 訊息的處理器。這對於某些高階用例可能很有用(例如,允許 Spring Security 在相應的 HTTP 會話過期時強制關閉 WebSocket 會話)。有關更多資訊,請參閱 Spring Session 專案
6 請參閱 <int-websocket:client-container> 中的相同選項。
7 請參閱 <int-websocket:client-container> 中的相同選項。
8 WebSocket 會話傳送緩衝區溢位策略,用於確定會話的出站訊息緩衝區達到 send-buffer-size-limit 時的行為。有關可能的值和更多詳細資訊,請參閱 ConcurrentWebSocketSessionDecorator.OverflowStrategy
9 允許的 origin 頭值。您可以指定多個 origin 作為逗號分隔列表。此檢查主要針對瀏覽器客戶端設計。沒有任何東西能阻止其他型別的客戶端修改 origin 頭值。當啟用 SockJS 並限制允許的 origin 時,不使用 origin 頭進行跨域請求的傳輸型別(jsonp-pollingiframe-xhr-pollingiframe-eventsourceiframe-htmlfile)將被停用。因此,不支援 IE6 和 IE7,IE8 和 IE9 僅在不使用 cookie 的情況下支援。預設情況下,允許所有 origin。
10 沒有原生跨域通訊的傳輸方式(例如 eventsourcehtmlfile)必須在不可見的 iframe 中從“外部”域獲取一個簡單頁面,以便 iframe 中的程式碼可以在 SockJS 伺服器本地的域中執行。由於 iframe 需要載入 SockJS JavaScript 客戶端庫,此屬性允許您指定載入位置。預設情況下,它指向 d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js。但是,您也可以將其設定為指向由應用程式提供的 URL。請注意,可以指定相對 URL,在這種情況下,該 URL 必須相對於 iframe URL。例如,假設 SockJS 端點對映到 /sockjs,生成的 iframe URL 為 /sockjs/iframe.html,則相對 URL 必須以 ../../ 開頭,才能向上遍歷到 SockJS 對映上層的位置。對於基於字首的 servlet 對映,可能需要多一層遍歷。
11 在關閉之前,單個 HTTP 流請求可以傳送的最小位元組數。預設為 128K(即 128*1024 或 131072 位元組)。
12 SockJs /info 端點響應中的 cookie_needed 值。此屬性指示應用程式是否需要 JSESSIONID cookie 才能正常執行(例如,用於負載均衡或 Java Servlet 容器中的 HTTP 會話使用)。
13 伺服器在未傳送任何訊息後經過一段時間(以毫秒為單位),應向客戶端傳送心跳幀以保持連線不中斷。預設值為 25,000(25 秒)。
14 在沒有接收連線(即伺服器可以透過其向客戶端傳送資料的活動連線)後,客戶端被視為斷開連線的時間(以毫秒為單位)。預設值為 5000
15 會話在等待客戶端的下一個 HTTP 輪詢請求時可以快取的伺服器到客戶端訊息的數量。預設大小為 100
16 一些負載均衡器不支援 WebSockets。將此選項設定為 false 可在伺服器端停用 WebSocket 傳輸。預設值為 true
17 TaskScheduler bean 引用。如果未提供值,則會建立一個新的 ThreadPoolTaskScheduler 例項。此排程器例項用於排程心跳訊息。
18 SockJsMessageCodec bean 引用,用於編碼和解碼 SockJS 訊息。預設使用 Jackson2SockJsMessageCodec,這要求 classpath 中存在 Jackson 庫。
19 TransportHandler bean 引用列表。
20 是否停用為 SockJS 請求自動新增 CORS 頭。預設值為 false

<int-websocket:outbound-channel-adapter> 屬性

以下列表顯示了 <int-websocket:outbound-channel-adapter> 元素的可用屬性

<int-websocket:outbound-channel-adapter
                          id=""                             (1)
                          channel=""                        (2)
                          container=""                      (3)
                          default-protocol-handler=""       (4)
                          protocol-handlers=""              (5)
                          message-converters=""             (6)
                          merge-with-default-converters=""  (7)
                          auto-startup=""                   (8)
                          phase=""/>                        (9)
1 元件 bean 名稱。如果您未提供 channel 屬性,則會建立一個 DirectChannel 並以 id 屬性作為 bean 名稱註冊到應用程式上下文中。在這種情況下,端點會以 id 加上 .adapter 作為 bean 名稱進行註冊。並且 MessageHandler 會以 id 加上 .handler 作為 bean 別名進行註冊。
2 標識附加到此介面卡的通道。
3 IntegrationWebSocketContainer bean 的引用,它封裝了底層連線和 WebSocketSession 處理操作。必需。
4 可選的 SubProtocolHandler 例項引用。當客戶端未請求子協議或只有一個協議處理器時使用。如果未提供此引用或 protocol-handlers 列表,則預設使用 PassThruSubProtocolHandler
5 此通道介面卡的 SubProtocolHandler bean 引用列表。如果您只提供一個 bean 引用且未提供 default-protocol-handler,則該單個 SubProtocolHandler 將用作 default-protocol-handler。如果您未設定此屬性或 default-protocol-handler,則預設使用 PassThruSubProtocolHandler
6 此通道介面卡的 MessageConverter bean 引用列表。
7 布林值,指示是否應在任何自定義轉換器之後註冊預設轉換器。僅在提供了 message-converters 時使用此標誌。否則,將註冊所有預設轉換器。預設為 false。預設轉換器按順序為:StringMessageConverterByteArrayMessageConverterMappingJackson2MessageConverter(如果 classpath 中存在 Jackson 庫)。
8 布林值,指示此端點是否應自動啟動。預設為 true
9 此端點應在其內部啟動和停止的生命週期階段。值越低,此端點啟動越早,停止越晚。預設為 Integer.MIN_VALUE。值可以是負數。請參閱 SmartLifeCycle

<int-websocket:inbound-channel-adapter> 屬性

以下列表顯示了 <int-websocket:outbound-channel-adapter> 元素的可用屬性

<int-websocket:inbound-channel-adapter
                            id=""  (1)
                            channel=""  (2)
                            error-channel=""  (3)
                            container=""  (4)
                            default-protocol-handler=""  (5)
                            protocol-handlers=""  (6)
                            message-converters=""  (7)
                            merge-with-default-converters=""  (8)
                            send-timeout=""  (9)
                            payload-type=""  (10)
                            use-broker=""  (11)
                            auto-startup=""  (12)
                            phase=""/>  (13)
1 元件 bean 名稱。如果您未設定 channel 屬性,則會建立一個 DirectChannel 並以 id 屬性作為 bean 名稱註冊到應用程式上下文中。在這種情況下,端點會以 id 加上 .adapter 作為 bean 名稱進行註冊。
2 標識附加到此介面卡的通道。
3 ErrorMessage 例項應傳送到的 MessageChannel bean 引用。
4 請參閱 <int-websocket:outbound-channel-adapter> 中的相同選項。
5 請參閱 <int-websocket:outbound-channel-adapter> 中的相同選項。
6 請參閱 <int-websocket:outbound-channel-adapter> 中的相同選項。
7 請參閱 <int-websocket:outbound-channel-adapter> 中的相同選項。
8 請參閱 <int-websocket:outbound-channel-adapter> 中的相同選項。
9 如果通道可能阻塞,傳送訊息到通道時等待的最長時間(以毫秒為單位)。例如,如果 QueueChannel 的最大容量已滿,它可能會阻塞直到有空間可用。
10 從傳入的 WebSocketMessage 轉換的目標載荷(payload)的 Java 型別完全限定名。預設為 java.lang.String
11 指示此介面卡是否將 non-MESSAGE 型別的 WebSocketMessage 例項和具有 Broker 目標的訊息傳送到應用程式上下文中的 AbstractBrokerMessageHandler。當此屬性為 true 時,需要 Broker 中繼(Broker Relay)配置。此屬性僅在伺服器端使用。在客戶端,此屬性被忽略。預設為 false
12 請參閱 <int-websocket:outbound-channel-adapter> 中的相同選項。
13 請參閱 <int-websocket:outbound-channel-adapter> 中的相同選項。

使用 ClientStompEncoder

從版本 4.3.13 開始,Spring Integration 提供了 ClientStompEncoder(作為標準 StompEncoder 的擴充套件),用於 WebSocket 通道介面卡的客戶端。為了正確進行客戶端訊息準備,您必須將 ClientStompEncoder 的例項注入到 StompSubProtocolHandler 中。預設的 StompSubProtocolHandler 有一個問題,它是為伺服器端設計的,因此它會將 SEND stompCommand 頭更新為 MESSAGE(這是 STOMP 協議對伺服器端的要求)。如果客戶端不以正確的 SEND web socket 幀傳送其訊息,某些 STOMP Broker 不會接受它們。在這種情況下,ClientStompEncoder 的作用是在將訊息編碼為 byte[] 之前,覆蓋 stompCommand 頭並將其設定為 SEND 值。

動態 WebSocket 端點註冊

從版本 5.5 開始,WebSocket 伺服器端點(基於 ServerWebSocketContainer 的通道介面卡)現在可以在執行時註冊(和移除)— ServerWebSocketContainer 對映的 paths 透過 HandlerMapping 暴露到 DispatcherServlet 中,並對 WebSocket 客戶端可見。動態和執行時整合流 支援有助於以透明的方式註冊這些端點

@Autowired
IntegrationFlowContext integrationFlowContext;

@Autowired
HandshakeHandler handshakeHandler;
...
ServerWebSocketContainer serverWebSocketContainer =
       new ServerWebSocketContainer("/dynamic")
               .setHandshakeHandler(this.handshakeHandler);

WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
       new WebSocketInboundChannelAdapter(serverWebSocketContainer);

QueueChannel dynamicRequestsChannel = new QueueChannel();

IntegrationFlow serverFlow =
       IntegrationFlow.from(webSocketInboundChannelAdapter)
               .channel(dynamicRequestsChannel)
               .get();

IntegrationFlowContext.IntegrationFlowRegistration dynamicServerFlow =
       this.integrationFlowContext.registration(serverFlow)
               .addBean(serverWebSocketContainer)
               .register();
...
dynamicServerFlow.destroy();
在動態流注冊上呼叫 .addBean(serverWebSocketContainer) 非常重要,以便將 ServerWebSocketContainer 例項新增到 ApplicationContext 中用於端點註冊。當動態流注冊被銷燬時,相關的 ServerWebSocketContainer 例項也會被銷燬,相應的端點註冊(包括 URL 路徑對映)也是如此。
動態 Websocket 端點只能透過 Spring Integration 機制註冊:當使用常規的 Spring @EnableWebsocket 時,Spring Integration 配置會退出,並且不會註冊用於動態端點的基礎設施。