WebSockets

參考文件的這一部分涵蓋了對 reactive-stack WebSocket 訊息傳遞的支援。

WebSocket 簡介

WebSocket 協議(RFC 6455)提供了一種標準化方式,用於在單個 TCP 連線上建立客戶端和伺服器之間的全雙工雙向通訊通道。它是不同於 HTTP 的 TCP 協議,但設計用於在 HTTP 之上工作,使用埠 80 和 443 並允許重用現有的防火牆規則。

WebSocket 互動始於一個 HTTP 請求,該請求使用 HTTP Upgrade 頭來升級或,在本例中,切換到 WebSocket 協議。以下示例展示了此類互動

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket (1)
Connection: Upgrade (2)
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: https://:8080
1 Upgrade 頭。
2 使用 Upgrade 連線。

與通常的 200 狀態碼不同,支援 WebSocket 的伺服器返回類似於以下的輸出

HTTP/1.1 101 Switching Protocols (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
1 協議切換

成功握手後,HTTP 升級請求底層的 TCP socket 保持開啟狀態,以便客戶端和伺服器繼續傳送和接收訊息。

關於 WebSockets 工作原理的完整介紹超出了本文件的範圍。請參閱 RFC 6455、HTML5 的 WebSocket 章或 Web 上的許多介紹和教程。

請注意,如果 WebSocket 伺服器執行在 Web 伺服器(例如 nginx)後面,你可能需要將其配置為將 WebSocket 升級請求傳遞給 WebSocket 伺服器。同樣,如果應用程式執行在雲環境中,請查閱雲提供商有關 WebSocket 支援的說明。

HTTP 對比 WebSocket

儘管 WebSocket 設計為與 HTTP 相容並以 HTTP 請求開始,但重要的是要理解這兩種協議會導致非常不同的架構和應用程式程式設計模型。

在 HTTP 和 REST 中,應用程式被建模為許多 URL。要與應用程式互動,客戶端以請求-響應方式訪問這些 URL。伺服器根據 HTTP URL、方法和頭將請求路由到適當的處理器。

相比之下,在 WebSockets 中,初始連線通常只有一個 URL。隨後,所有應用程式訊息都在同一個 TCP 連線上傳輸。這指向一種完全不同的非同步、事件驅動的訊息傳遞架構。

WebSocket 也是一種低階傳輸協議,與 HTTP 不同,它不規定訊息內容的任何語義。這意味著除非客戶端和伺服器就訊息語義達成一致,否則無法路由或處理訊息。

WebSocket 客戶端和伺服器可以透過 HTTP 握手請求上的 Sec-WebSocket-Protocol 頭協商使用更高階的訊息協議(例如 STOMP)。如果沒有此頭,他們需要制定自己的約定。

何時使用 WebSockets

WebSockets 可以使網頁變得動態和互動。然而,在許多情況下,結合 AJAX 和 HTTP streaming 或 long polling 可以提供一個簡單有效的解決方案。

例如,新聞、郵件和社交 feeds 需要動態更新,但每隔幾分鐘更新一次可能完全可以接受。另一方面,協作、遊戲和金融應用程式需要更接近即時。

延遲本身不是決定性因素。如果訊息量相對較低(例如,監控網路故障),HTTP streaming 或 polling 可以提供有效的解決方案。低延遲、高頻率和高流量的組合是使用 WebSocket 的最佳場景。

還要記住,在 Internet 上,超出你控制範圍的限制性代理可能會阻止 WebSocket 互動,原因可能是它們未配置為傳遞 Upgrade 頭,或者它們關閉長時間看起來空閒的連線。這意味著在防火牆內部使用 WebSocket 進行內部應用程式比面向公眾的應用程式更為直接。

WebSocket API

Spring Framework 提供了一個 WebSocket API,你可以用它來編寫處理 WebSocket 訊息的客戶端和伺服器端應用程式。

伺服器端

要建立 WebSocket 伺服器,首先可以建立一個 WebSocketHandler。以下示例展示瞭如何建立

  • Java

  • Kotlin

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		// ...
	}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession

class MyWebSocketHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		// ...
	}
}

然後你可以將其對映到一個 URL

  • Java

  • Kotlin

@Configuration
class WebConfig {

	@Bean
	public HandlerMapping handlerMapping() {
		Map<String, WebSocketHandler> map = new HashMap<>();
		map.put("/path", new MyWebSocketHandler());
		int order = -1; // before annotated controllers

		return new SimpleUrlHandlerMapping(map, order);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerMapping(): HandlerMapping {
		val map = mapOf("/path" to MyWebSocketHandler())
		val order = -1 // before annotated controllers

		return SimpleUrlHandlerMapping(map, order)
	}
}

如果使用 WebFlux 配置,則無需進一步操作;如果未使用 WebFlux 配置,則需要宣告一個 WebSocketHandlerAdapter,如下所示

  • Java

  • Kotlin

@Configuration
class WebConfig {

	// ...

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter();
	}
}
@Configuration
class WebConfig {

	// ...

	@Bean
	fun handlerAdapter() =  WebSocketHandlerAdapter()
}

WebSocketHandler

WebSocketHandlerhandle 方法接受 WebSocketSession 並返回 Mono<Void>,以指示會話的應用程式處理何時完成。會話透過兩個流進行處理,一個用於入站訊息,一個用於出站訊息。下表描述了處理這些流的兩個方法

WebSocketSession 方法 描述

Flux<WebSocketMessage> receive()

提供對入站訊息流的訪問,並在連線關閉時完成。

Mono<Void> send(Publisher<WebSocketMessage>)

接受出站訊息的源,寫入訊息,並返回一個 Mono<Void>,該 Mono 在源完成且寫入完成時完成。

WebSocketHandler 必須將入站和出站流組合成一個統一的流,並返回一個反映該流完成的 Mono<Void>。根據應用程式的要求,統一的流在以下情況完成

  • 入站或出站訊息流中的任一個完成時。

  • 入站流完成(即連線關閉),而出站流是無限的。

  • 在選定的點,透過 WebSocketSessionclose 方法。

當入站和出站訊息流組合在一起時,無需檢查連線是否開啟,因為 Reactive Streams 會發出結束活動的訊號。入站流接收完成或錯誤訊號,出站流接收取消訊號。

Handler 最基本的實現是處理入站流的實現。以下示例展示了此類實現

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		return session.receive()			(1)
				.doOnNext(message -> {
					// ...					(2)
				})
				.concatMap(message -> {
					// ...					(3)
				})
				.then();					(4)
	}
}
1 訪問入站訊息流。
2 對每條訊息執行某些操作。
3 執行使用訊息內容的巢狀非同步操作。
4 返回一個在接收完成時完成的 Mono<Void>
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		return session.receive()			(1)
				.doOnNext {
					// ...					(2)
				}
				.concatMap {
					// ...					(3)
				}
				.then()						(4)
	}
}
1 訪問入站訊息流。
2 對每條訊息執行某些操作。
3 執行使用訊息內容的巢狀非同步操作。
4 返回一個在接收完成時完成的 Mono<Void>
對於巢狀的非同步操作,在使用池化資料緩衝區(例如 Netty)的底層伺服器上,你可能需要呼叫 message.retain()。否則,在你有機會讀取資料之前,資料緩衝區可能已經被釋放。更多背景資訊,請參閱 資料緩衝區和編解碼器

以下實現結合了入站和出站流

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Flux<WebSocketMessage> output = session.receive()				(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.map(value -> session.textMessage("Echo " + value));	(2)

		return session.send(output);									(3)
	}
}
1 處理入站訊息流。
2 創建出站訊息,生成組合流。
3 返回一個 Mono<Void>,在我們繼續接收時不會完成。
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val output = session.receive()						(1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.map { session.textMessage("Echo $it") }	(2)

		return session.send(output)							(3)
	}
}
1 處理入站訊息流。
2 創建出站訊息,生成組合流。
3 返回一個 Mono<Void>,在我們繼續接收時不會完成。

入站和出站流可以獨立,僅在完成時匯合,如下例所示

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Mono<Void> input = session.receive()								(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.then();

		Flux<String> source = ... ;
		Mono<Void> output = session.send(source.map(session::textMessage));	(2)

		return Mono.zip(input, output).then();								(3)
	}
}
1 處理入站訊息流。
2 傳送出站訊息。
3 匯合流並返回一個 Mono<Void>,該 Mono 在任一流結束時完成。
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val input = session.receive()									(1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.then()

		val source: Flux<String> = ...
		val output = session.send(source.map(session::textMessage))		(2)

		return Mono.zip(input, output).then()							(3)
	}
}
1 處理入站訊息流。
2 傳送出站訊息。
3 匯合流並返回一個 Mono<Void>,該 Mono 在任一流結束時完成。

DataBuffer

DataBuffer 是 WebFlux 中位元組緩衝區的表示。參考文件的 Spring Core 部分在 資料緩衝區和編解碼器 中有更多關於此的內容。要理解的關鍵點是,在某些伺服器(如 Netty)上,位元組緩衝區是池化的並帶有引用計數,消費後必須釋放以避免記憶體洩漏。

在 Netty 上執行時,如果應用程式希望保留輸入資料緩衝區以確保它們不被釋放,則必須使用 DataBufferUtils.retain(dataBuffer),並在緩衝區被消費後使用 DataBufferUtils.release(dataBuffer)

握手

WebSocketHandlerAdapter 委託給 WebSocketService。預設情況下,它是 HandshakeWebSocketService 的一個例項,它對 WebSocket 請求執行基本檢查,然後使用當前伺服器的 RequestUpgradeStrategy。目前,內建支援 Reactor Netty、Tomcat、Jetty 和 Undertow。

HandshakeWebSocketService 公開了一個 sessionAttributePredicate 屬性,允許設定一個 Predicate<String> 以從 WebSession 中提取屬性並將其插入到 WebSocketSession 的屬性中。

伺服器配置

每個伺服器的 RequestUpgradeStrategy 都公開了特定於底層 WebSocket 伺服器引擎的配置。使用 WebFlux Java 配置時,你可以按照 WebFlux 配置 中的相應部分所示定製此類屬性,否則如果未使用 WebFlux 配置,請使用以下內容

  • Java

  • Kotlin

@Configuration
class WebConfig {

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter(webSocketService());
	}

	@Bean
	public WebSocketService webSocketService() {
		TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
		strategy.setMaxSessionIdleTimeout(0L);
		return new HandshakeWebSocketService(strategy);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerAdapter() =
			WebSocketHandlerAdapter(webSocketService())

	@Bean
	fun webSocketService(): WebSocketService {
		val strategy = TomcatRequestUpgradeStrategy().apply {
			setMaxSessionIdleTimeout(0L)
		}
		return HandshakeWebSocketService(strategy)
	}
}

檢查你的伺服器的升級策略以瞭解可用的選項。目前,只有 Tomcat 和 Jetty 公開了此類選項。

CORS

配置 CORS 並限制對 WebSocket 端點訪問的最簡單方法是讓你的 WebSocketHandler 實現 CorsConfigurationSource 並返回一個帶有允許的來源、頭和其他詳細資訊的 CorsConfiguration。如果無法做到這一點,你還可以在 SimpleUrlHandler 上設定 corsConfigurations 屬性,按 URL 模式指定 CORS 設定。如果兩者都指定,它們將使用 CorsConfigurationcombine 方法進行組合。

客戶端

Spring WebFlux 提供了一個 WebSocketClient 抽象,其實現包括 Reactor Netty、Tomcat、Jetty、Undertow 和標準 Java(即 JSR-356)。

Tomcat 客戶端實際上是標準 Java 客戶端的擴充套件,在 WebSocketSession 處理方面增加了一些額外功能,以利用 Tomcat 特定的 API 暫停接收訊息以進行背壓(back pressure)。

要啟動 WebSocket 會話,你可以建立一個客戶端例項並使用其 execute 方法

  • Java

  • Kotlin

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://:8080/path");
client.execute(url, session ->
		session.receive()
				.doOnNext(System.out::println)
				.then());
val client = ReactorNettyWebSocketClient()

		val url = URI("ws://:8080/path")
		client.execute(url) { session ->
			session.receive()
					.doOnNext(::println)
			.then()
		}

一些客戶端,如 Jetty,實現了 Lifecycle,在使用它們之前需要先停止和啟動。所有客戶端都具有與底層 WebSocket 客戶端配置相關的建構函式選項。