WebSockets
參考文件的這一部分涵蓋了對響應式棧 WebSocket 訊息的支援。
WebSocket 簡介
WebSocket 協議,RFC 6455,提供了一種標準化方式,透過單一的 TCP 連線在客戶端和伺服器之間建立全雙工、雙向通訊通道。它是一個不同於 HTTP 的 TCP 協議,但設計為在 HTTP 上工作,使用埠 80 和 443,並允許重用現有防火牆規則。
WebSocket 互動始於一個 HTTP 請求,該請求使用 HTTP Upgrade 頭來升級,或者在這種情況下,切換到 WebSocket 協議。以下示例顯示了這樣的互動:
GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket (1)
Connection: Upgrade (2)
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: https://:8080
| 1 | Upgrade 頭。 |
| 2 | 使用 Upgrade 連線。 |
與通常的 200 狀態碼不同,支援 WebSocket 的伺服器返回類似於以下內容的輸出:
HTTP/1.1 101 Switching Protocols (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
| 1 | 協議切換 |
成功握手後,HTTP 升級請求底層的 TCP 套接字保持開啟狀態,客戶端和伺服器都可以繼續傳送和接收訊息。
關於 WebSocket 工作原理的完整介紹超出了本文件的範圍。請參閱 RFC 6455、HTML5 的 WebSocket 章,或網路上的許多介紹和教程。
請注意,如果 WebSocket 伺服器在 Web 伺服器(例如 nginx)後面執行,您可能需要將其配置為將 WebSocket 升級請求傳遞給 WebSocket 伺服器。同樣,如果應用程式在雲環境中執行,請檢視雲提供商關於 WebSocket 支援的說明。
HTTP 與 WebSocket
儘管 WebSocket 被設計為與 HTTP 相容並以 HTTP 請求開始,但重要的是要理解這兩種協議導致了非常不同的架構和應用程式程式設計模型。
在 HTTP 和 REST 中,應用程式被建模為許多 URL。為了與應用程式互動,客戶端以請求-響應方式訪問這些 URL。伺服器根據 HTTP URL、方法和頭將請求路由到適當的處理程式。
相比之下,在 WebSockets 中,初始連線通常只有一個 URL。隨後,所有應用程式訊息都透過同一 TCP 連線流動。這指向了一個完全不同的非同步、事件驅動的訊息傳遞架構。
WebSocket 也是一個低階傳輸協議,與 HTTP 不同,它不對訊息內容規定任何語義。這意味著,除非客戶端和伺服器就訊息語義達成一致,否則無法路由或處理訊息。
WebSocket 客戶端和伺服器可以透過 HTTP 握手請求上的 Sec-WebSocket-Protocol 頭協商使用更高級別的訊息協議(例如 STOMP)。如果沒有該頭,它們需要制定自己的約定。
何時使用 WebSockets
WebSockets 可以使網頁動態和互動。然而,在許多情況下,AJAX 和 HTTP 流或長輪詢的組合可以提供一個簡單有效的解決方案。
例如,新聞、郵件和社交源需要動態更新,但每隔幾分鐘更新一次可能完全沒問題。另一方面,協作、遊戲和金融應用程式需要更接近即時。
延遲本身並不是決定性因素。如果訊息量相對較低(例如,監控網路故障),HTTP 流或輪詢可以提供有效的解決方案。只有低延遲、高頻率和高流量的組合,才能最好地證明使用 WebSocket 的合理性。
還要記住,在網際網路上,超出您控制的限制性代理可能會阻止 WebSocket 互動,原因可能是它們未配置為傳遞 Upgrade 頭,或者它們關閉了看起來空閒的長期連線。這意味著,在防火牆內部的內部應用程式使用 WebSocket 比面向公眾的應用程式更直接。
WebSocket API
Spring Framework 提供了一個 WebSocket API,您可以使用它來編寫處理 WebSocket 訊息的客戶端和伺服器端應用程式。
伺服器
要建立 WebSocket 伺服器,您可以首先建立一個 WebSocketHandler。以下示例展示瞭如何實現:
-
Java
-
Kotlin
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// ...
}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
class MyWebSocketHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
// ...
}
}
然後您可以將其對映到 URL:
-
Java
-
Kotlin
@Configuration
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/path", new MyWebSocketHandler());
int order = -1; // before annotated controllers
return new SimpleUrlHandlerMapping(map, order);
}
}
@Configuration
class WebConfig {
@Bean
fun handlerMapping(): HandlerMapping {
val map = mapOf("/path" to MyWebSocketHandler())
val order = -1 // before annotated controllers
return SimpleUrlHandlerMapping(map, order)
}
}
如果使用 WebFlux 配置,則無需進行其他操作;否則,如果未使用 WebFlux 配置,則需要宣告一個 WebSocketHandlerAdapter,如下所示:
-
Java
-
Kotlin
@Configuration
class WebConfig {
// ...
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
@Configuration
class WebConfig {
// ...
@Bean
fun handlerAdapter() = WebSocketHandlerAdapter()
}
WebSocketHandler
WebSocketHandler 的 handle 方法接受 WebSocketSession 並返回 Mono<Void>,以指示會話的應用程式處理何時完成。會話透過兩個流進行處理,一個用於入站訊息,一個用於出站訊息。下表描述了處理這些流的兩種方法:
WebSocketSession 方法 |
描述 |
|---|---|
|
提供對入站訊息流的訪問,並在連線關閉時完成。 |
|
接收一個用於出站訊息的源,寫入訊息,並返回一個 |
WebSocketHandler 必須將入站和出站流組合成一個統一的流,並返回一個反映該流完成的 Mono<Void>。根據應用程式要求,統一流在以下情況之一完成:
-
入站或出站訊息流完成。
-
入站流完成(即連線關閉),而出站流是無限的。
-
在選定的點,透過
WebSocketSession的close方法。
當入站和出站訊息流組合在一起時,無需檢查連線是否開啟,因為 Reactive Streams 會發出結束活動的訊號。入站流接收完成或錯誤訊號,出站流接收取消訊號。
處理程式最基本的實現是處理入站流。以下示例展示了這樣的實現:
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive() (1)
.doOnNext(message -> {
// ... (2)
})
.concatMap(message -> {
// ... (3)
})
.then(); (4)
}
}
| 1 | 訪問入站訊息流。 |
| 2 | 對每條訊息執行操作。 |
| 3 | 執行使用訊息內容的巢狀非同步操作。 |
| 4 | 返回一個 Mono<Void>,當接收完成時該 Mono<Void> 完成。 |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
return session.receive() (1)
.doOnNext {
// ... (2)
}
.concatMap {
// ... (3)
}
.then() (4)
}
}
| 1 | 訪問入站訊息流。 |
| 2 | 對每條訊息執行操作。 |
| 3 | 執行使用訊息內容的巢狀非同步操作。 |
| 4 | 返回一個 Mono<Void>,當接收完成時該 Mono<Void> 完成。 |
對於巢狀的非同步操作,您可能需要在使用池化資料緩衝區(例如 Netty)的底層伺服器上呼叫 message.retain()。否則,資料緩衝區可能在您有機會讀取資料之前就被釋放。有關更多背景資訊,請參閱 資料緩衝區和編解碼器。 |
以下實現結合了入站和出站流:
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive() (1)
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.map(value -> session.textMessage("Echo " + value)); (2)
return session.send(output); (3)
}
}
| 1 | 處理入站訊息流。 |
| 2 | 創建出站訊息,生成組合流。 |
| 3 | 返回一個 Mono<Void>,該 Mono<Void> 在我們繼續接收時不會完成。 |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val output = session.receive() (1)
.doOnNext {
// ...
}
.concatMap {
// ...
}
.map { session.textMessage("Echo $it") } (2)
return session.send(output) (3)
}
}
| 1 | 處理入站訊息流。 |
| 2 | 創建出站訊息,生成組合流。 |
| 3 | 返回一個 Mono<Void>,該 Mono<Void> 在我們繼續接收時不會完成。 |
入站和出站流可以相互獨立,僅在完成時合併,如下例所示:
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive() (1)
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();
Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage)); (2)
return input.and(output); (3)
}
}
| 1 | 處理入站訊息流。 |
| 2 | 傳送出站訊息。 |
| 3 | 合併流並返回一個 Mono<Void>,當任一流結束時該 Mono<Void> 完成。 |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive() (1)
.doOnNext {
// ...
}
.concatMap {
// ...
}
.then()
val source: Flux<String> = ...
val output = session.send(source.map(session::textMessage)) (2)
return input.and(output) (3)
}
}
| 1 | 處理入站訊息流。 |
| 2 | 傳送出站訊息。 |
| 3 | 合併流並返回一個 Mono<Void>,當任一流結束時該 Mono<Void> 完成。 |
DataBuffer
DataBuffer 是 WebFlux 中位元組緩衝區的表示。參考資料的 Spring Core 部分在 資料緩衝區和編解碼器 一節中有更多關於它的內容。需要理解的關鍵是,在某些伺服器(如 Netty)上,位元組緩衝區是池化和引用計數的,必須在消耗後釋放以避免記憶體洩漏。
在 Netty 上執行時,如果應用程式希望保留輸入資料緩衝區以確保它們不被釋放,則必須使用 DataBufferUtils.retain(dataBuffer),並在消耗緩衝區後使用 DataBufferUtils.release(dataBuffer)。
握手
WebSocketHandlerAdapter 將委託給 WebSocketService。預設情況下,它是一個 HandshakeWebSocketService 例項,該例項對 WebSocket 請求執行基本檢查,然後使用 RequestUpgradeStrategy 來處理正在使用的伺服器。目前,內建支援 Reactor Netty、Tomcat 和 Jetty。
HandshakeWebSocketService 暴露了一個 sessionAttributePredicate 屬性,允許設定一個 Predicate<String> 來從 WebSession 中提取屬性並將其插入到 WebSocketSession 的屬性中。
伺服器配置
每個伺服器的 RequestUpgradeStrategy 都會公開特定於底層 WebSocket 伺服器引擎的配置。當使用 WebFlux Java 配置時,您可以自定義此類屬性,如 WebFlux 配置 的相應部分所示,否則,如果未使用 WebFlux 配置,請使用以下內容:
-
Java
-
Kotlin
@Configuration
class WebConfig {
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}
@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}
@Configuration
class WebConfig {
@Bean
fun handlerAdapter() =
WebSocketHandlerAdapter(webSocketService())
@Bean
fun webSocketService(): WebSocketService {
val strategy = TomcatRequestUpgradeStrategy().apply {
setMaxSessionIdleTimeout(0L)
}
return HandshakeWebSocketService(strategy)
}
}
請檢視您伺服器的升級策略,以瞭解可用的選項。目前,只有 Tomcat 和 Jetty 提供了此類選項。
CORS
配置 CORS 和限制 WebSocket 端點訪問的最簡單方法是讓您的 WebSocketHandler 實現 CorsConfigurationSource 並返回一個包含允許源、頭和其他詳細資訊的 CorsConfiguration。如果無法做到這一點,您還可以設定 SimpleUrlHandler 上的 corsConfigurations 屬性,透過 URL 模式指定 CORS 設定。如果兩者都指定,它們將透過使用 CorsConfiguration 上的 combine 方法進行組合。
客戶端
Spring WebFlux 提供了一個 WebSocketClient 抽象,併為 Reactor Netty、Tomcat、Jetty 和標準 Java(即 JSR-356)提供了實現。
Tomcat 客戶端實際上是標準 Java 客戶端的擴充套件,在 WebSocketSession 處理中增加了一些額外功能,以利用 Tomcat 特定的 API 暫停接收訊息以實現背壓。 |
要啟動 WebSocket 會話,您可以建立一個客戶端例項並使用其 execute 方法:
-
Java
-
Kotlin
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://:8080/path");
client.execute(url, session ->
session.receive()
.doOnNext(System.out::println)
.then());
val client = ReactorNettyWebSocketClient()
val url = URI("ws://:8080/path")
client.execute(url) { session ->
session.receive()
.doOnNext(::println)
.then()
}
一些客戶端,例如 Jetty,實現了 Lifecycle,需要在您使用它們之前停止和啟動。所有客戶端都具有與底層 WebSocket 客戶端配置相關的建構函式選項。