RSocket
本節介紹 Spring Framework 對 RSocket 協議的支援。
概覽
RSocket 是一種應用程式協議,用於在 TCP、WebSocket 和其他位元組流傳輸上進行多路複用、雙向通訊,支援以下互動模型之一:
-
Request-Response
— 傳送一條訊息,接收一條響應。 -
Request-Stream
— 傳送一條訊息,接收一個訊息流。 -
Channel
— 在兩個方向上傳送訊息流。 -
Fire-and-Forget
— 傳送單向訊息。
一旦建立初始連線,“客戶端”與“伺服器”的區別就消失了,因為雙方變得對稱,每一方都可以發起上述任一互動。這就是為什麼在協議呼叫中,參與雙方被稱為“請求者”(requester)和“響應者”(responder),而上述互動被稱為“請求流”或簡稱為“請求”。
以下是 RSocket 協議的主要特性和優勢:
-
Reactive Streams 語義跨越網路邊界 — 對於諸如
Request-Stream
和Channel
之類的流式請求,背壓(back pressure)訊號在請求者和響應者之間傳遞,允許請求者在源頭減慢響應者,從而減少對網路層擁塞控制的依賴,並降低在網路層或任何層進行緩衝的需求。 -
請求限流 — 此特性因 LEASE 幀而得名“租約”(Leasing),該幀可以由每一端傳送,以限制另一端在給定時間內允許的總請求數。租約會定期續期。
-
會話續訂 — 這旨在應對連線丟失,並需要維護一些狀態。狀態管理對應用程式透明,並且與背壓(back pressure)很好地協同工作,背壓可以在可能時停止生產者,並減少所需的狀態量。
-
大訊息的分段與重組。
-
心跳保持(keepalive)。
RSocket 有多種語言的實現。其 Java 庫構建於 Project Reactor 之上,並使用 Reactor Netty 作為傳輸層。這意味著應用程式中 Reactive Streams Publisher 發出的訊號透過 RSocket 在網路上透明地傳播。
協議
RSocket 的優勢之一在於它在網路上傳輸時具有明確定義的行為,並且易於閱讀其規範以及一些協議擴充套件。因此,建議閱讀規範,而無需依賴語言實現和更高級別的框架 API。本節提供了一個簡潔的概覽,以建立一些背景知識。
連線
最初,客戶端透過 TCP 或 WebSocket 等一些低階流式傳輸連線到伺服器,並向伺服器傳送一個 SETUP
幀來設定連線引數。
伺服器可能會拒絕 SETUP
幀,但通常在傳送(客戶端)和接收(伺服器)之後,雙方就可以開始發起請求,除非 SETUP
指示使用租約(leasing)語義來限制請求數量,在這種情況下,雙方必須等待對方傳送 LEASE
幀才能允許發起請求。
發起請求
連線建立後,雙方都可以透過 REQUEST_RESPONSE
、REQUEST_STREAM
、REQUEST_CHANNEL
或 REQUEST_FNF
中的任一幀發起請求。這些幀中的每一個都從請求者向響應者攜帶一條訊息。
響應者可以返回帶有響應訊息的 PAYLOAD
幀,並且在 REQUEST_CHANNEL
的情況下,請求者也可以傳送帶有更多請求訊息的 PAYLOAD
幀。
當請求涉及訊息流時(例如 Request-Stream
和 Channel
),響應者必須尊重請求者發出的需求訊號。需求以訊息數量表示。初始需求在 REQUEST_STREAM
和 REQUEST_CHANNEL
幀中指定。後續需求透過 REQUEST_N
幀發出訊號。
每一方也可以透過 METADATA_PUSH
幀傳送元資料通知,這些通知不屬於任何單個請求,而是針對整個連線。
訊息格式
RSocket 訊息包含資料和元資料。元資料可用於傳送路由、安全令牌等。資料和元資料可以採用不同的格式。它們各自的 Mime 型別在 SETUP
幀中宣告,並適用於給定連線上的所有請求。
雖然所有訊息都可以包含元資料,但通常元資料(如路由)是針對每個請求的,因此僅包含在請求的第一條訊息中,即與 REQUEST_RESPONSE
、REQUEST_STREAM
、REQUEST_CHANNEL
或 REQUEST_FNF
這些幀中的一個一起傳送。
協議擴充套件定義了用於應用程式的常見元資料格式:
-
Composite Metadata -- 多個獨立格式化的元資料條目。
-
Routing — 請求的路由。
Java 實現
RSocket 的Java 實現構建於 Project Reactor 之上。TCP 和 WebSocket 的傳輸層構建於 Reactor Netty 之上。作為一個 Reactive Streams 庫,Reactor 簡化了實現協議的工作。對於應用程式而言,使用 Flux
和 Mono
並結合宣告式運算子和透明的背壓支援是自然而然的選擇。
RSocket Java 中的 API 故意保持最小和基礎。它側重於協議特性,並將應用程式程式設計模型(例如 RPC 程式碼生成或其他)作為更高層級的獨立關注點。
主要的契約 io.rsocket.RSocket 建模了四種請求互動型別,其中 Mono
代表單個訊息的承諾,Flux
代表訊息流,而 io.rsocket.Payload
是實際訊息,可作為位元組緩衝區訪問資料和元資料。RSocket
契約是對稱使用的。對於請求方,應用程式獲得一個 RSocket
來執行請求。對於響應方,應用程式實現 RSocket
來處理請求。
這並非詳盡的介紹。大多數情況下,Spring 應用程式無需直接使用其 API。然而,獨立於 Spring 檢視或實驗 RSocket 可能會很有價值。RSocket Java 倉庫包含了許多示例應用程式,展示了其 API 和協議特性。
Spring 支援
spring-messaging
模組包含以下內容:
-
RSocketRequester — 用於透過
io.rsocket.RSocket
發起請求的流暢 API,支援資料和元資料編碼/解碼。 -
註解響應者 — 使用
@MessageMapping
和@RSocketExchange
註解的處理器方法,用於響應請求。 -
RSocket 介面 — 將 RSocket 服務宣告為帶有
@RSocketExchange
方法的 Java 介面,可用作請求者或響應者。
spring-web
模組包含 Encoder
和 Decoder
實現,例如 Jackson CBOR/JSON 和 Protobuf,這些是 RSocket 應用程式可能需要的。它還包含可以用於高效路由匹配的 PathPatternParser
。
Spring Boot 2.2 支援在 TCP 或 WebSocket 上搭建 RSocket 伺服器,包括在 WebFlux 伺服器中透過 WebSocket 暴露 RSocket 的選項。此外,還提供了客戶端支援和針對 RSocketRequester.Builder
和 RSocketStrategies
的自動配置。有關更多詳細資訊,請參閱 Spring Boot 參考文件的RSocket 部分。
Spring Security 5.2 提供 RSocket 支援。
Spring Integration 5.2 提供入站和出站閘道器,用於與 RSocket 客戶端和伺服器互動。有關更多詳細資訊,請參閱 Spring Integration 參考手冊。
Spring Cloud Gateway 支援 RSocket 連線。
RSocketRequester
RSocketRequester
提供了一個流暢的 API 來執行 RSocket 請求,接受和返回用於資料和元資料的物件,而不是低階的位元組緩衝區。它可以對稱使用,既可用於從客戶端發起請求,也可用於從伺服器發起請求。
客戶端請求者
在客戶端獲取 RSocketRequester
意味著連線到伺服器,這涉及傳送一個包含連線設定的 RSocket SETUP
幀。RSocketRequester
提供了一個構建器,用於幫助準備 io.rsocket.core.RSocketConnector
,包括 SETUP
幀的連線設定。
這是使用預設設定進行連線的最基本方式:
-
Java
-
Kotlin
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)
上述程式碼不會立即連線。當發起請求時,會透明地建立並使用一個共享連線。
連線設定
RSocketRequester.Builder
提供了以下方法來定製初始的 SETUP
幀:
-
dataMimeType(MimeType)
— 設定連線上的資料 mime 型別。 -
metadataMimeType(MimeType)
— 設定連線上的元資料 mime 型別。 -
setupData(Object)
— 包含在SETUP
中的資料。 -
setupRoute(String, Object…)
— 包含在SETUP
的元資料中的路由。 -
setupMetadata(Object, MimeType)
— 包含在SETUP
中的其他元資料。
對於資料,預設的 mime 型別派生自第一個配置的 Decoder
。對於元資料,預設的 mime 型別是複合元資料(composite metadata),它允許每個請求包含多個元資料值和 mime 型別對。通常,這兩者都不需要更改。
SETUP
幀中的資料和元資料是可選的。在伺服器端,可以使用@ConnectMapping 方法來處理連線的開始和 SETUP
幀的內容。元資料可用於連線級別的安全性。
策略
RSocketRequester.Builder
接受 RSocketStrategies
來配置請求者。你需要使用它來提供用於資料和元資料值(反)序列化的編碼器和解碼器。預設情況下,只註冊了 spring-core
中針對 String
、byte[]
和 ByteBuffer
的基本編解碼器。新增 spring-web
將提供更多可以註冊的編解碼器,如下所示:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000)
RSocketStrategies
被設計為可重用。在某些場景下,例如,客戶端和伺服器在同一個應用程式中,最好在 Spring 配置中宣告它。
客戶端響應者
RSocketRequester.Builder
可用於配置響應來自伺服器的請求的響應者。
你可以使用註解處理器作為客戶端響應者,基於與伺服器端相同的底層架構,但透過程式設計方式註冊,如下所示:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher()) (1)
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder)) (3)
.tcp("localhost", 7000);
1 | 如果存在 spring-web ,使用 PathPatternRouteMatcher 進行高效路由匹配。 |
2 | 從包含 @MessageMapping 和/或 @ConnectMapping 方法的類建立響應者。 |
3 | 註冊響應者。 |
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher()) (1)
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) } (3)
.tcp("localhost", 7000)
1 | 如果存在 spring-web ,使用 PathPatternRouteMatcher 進行高效路由匹配。 |
2 | 從包含 @MessageMapping 和/或 @ConnectMapping 方法的類建立響應者。 |
3 | 註冊響應者。 |
注意,上述方法僅是為程式設計方式註冊客戶端響應者而設計的快捷方式。對於其他場景,如果客戶端響應者在 Spring 配置中,你仍然可以將 RSocketMessageHandler
宣告為 Spring Bean,然後按如下方式應用:
-
Java
-
Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)
對於上述情況,你可能還需要在 RSocketMessageHandler
中使用 setHandlerPredicate
來切換檢測客戶端響應者的不同策略,例如,基於自定義註解 @RSocketClientResponder
而非預設的 @Controller
。在客戶端和伺服器共存或同一個應用程式中有多個客戶端的場景中,這是必需的。
另請參見註解響應者,瞭解更多關於程式設計模型的資訊。
高階
RSocketRequesterBuilder
提供了一個回撥,用於暴露底層的 io.rsocket.core.RSocketConnector
,以便進一步配置 keepalive 間隔、會話續訂、攔截器等選項。你可以在該級別按如下方式配置選項:
-
Java
-
Kotlin
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
.rsocketConnector {
//...
}
.tcp("localhost", 7000)
伺服器請求者
從伺服器向連線的客戶端發起請求,只需從伺服器獲取與該連線客戶端對應的請求者即可。
在 註解響應器 中,@ConnectMapping
和 @MessageMapping
方法支援 RSocketRequester
引數。可以使用它來訪問連線的請求器。請注意,@ConnectMapping
方法本質上是 SETUP
幀的處理程式,必須在請求開始之前進行處理。因此,最開始的請求必須與處理解耦。例如:
-
Java
-
Kotlin
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { (1)
// ...
});
return ... (2)
}
1 | 非同步啟動請求,獨立於處理。 |
2 | 執行處理並返回完成的 Mono<Void> 。 |
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
// ...
}
}
/// ... (2)
}
1 | 非同步啟動請求,獨立於處理。 |
2 | 在 suspend 函式中執行處理。 |
請求
-
Java
-
Kotlin
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlux(AirportLocation.class); (3)
1 | 指定要包含在請求訊息元資料中的路由。 |
2 | 為請求訊息提供資料。 |
3 | 宣告預期的響應。 |
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlow<AirportLocation>() (3)
1 | 指定要包含在請求訊息元資料中的路由。 |
2 | 為請求訊息提供資料。 |
3 | 宣告預期的響應。 |
互動型別是根據輸入和輸出的基數隱式確定的。上面的例子是 Request-Stream
,因為它傳送一個值並接收一個值的流。在大多數情況下,只要輸入和輸出的選擇與 RSocket 互動型別以及響應器期望的輸入和輸出型別匹配,您就不需要考慮這個問題。唯一無效的組合示例是多對一。
data(Object)
方法也接受任何 Reactive Streams Publisher
,包括 Flux
和 Mono
,以及任何在 ReactiveAdapterRegistry
中註冊的值生產者。對於像 Flux
這樣生成相同型別值的多值 Publisher
,考慮使用其中一個過載的 data
方法,以避免對每個元素進行型別檢查和 Encoder
查詢:
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
data(Object)
步驟是可選的。對於不傳送資料的請求,可以跳過它:
-
Java
-
Kotlin
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()
如果使用 組合元資料(預設)並且這些值受註冊的 Encoder
支援,則可以新增額外的元資料值。例如:
-
Java
-
Kotlin
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
對於 Fire-and-Forget
,使用返回 Mono<Void>
的 send()
方法。請注意,Mono
僅表示訊息已成功傳送,而不表示已處理。
對於 Metadata-Push
,使用返回 Mono<Void>
的 sendMetadata()
方法。
註解響應器
RSocket 響應器可以作為 @MessageMapping
和 @ConnectMapping
方法來實現。@MessageMapping
方法處理單個請求,而 @ConnectMapping
方法處理連線級別的事件(setup 和 metadata push)。註解響應器是雙向支援的,用於伺服器端響應和客戶端響應。
伺服器端響應器
要在伺服器端使用註解響應器,將 RSocketMessageHandler
新增到 Spring 配置中,以檢測包含 @MessageMapping
和 @ConnectMapping
方法的 @Controller
bean:
-
Java
-
Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.routeMatcher(new PathPatternRouteMatcher());
return handler;
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
routeMatcher = PathPatternRouteMatcher()
}
}
然後透過 Java RSocket API 啟動 RSocket 伺服器,並將 RSocketMessageHandler
用於響應器,如下所示:
-
Java
-
Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
CloseableChannel server =
RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.block();
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val server = RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.awaitSingle()
RSocketMessageHandler
預設支援 組合 和 路由 元資料。如果需要切換到不同的 mime 型別或註冊額外的元資料 mime 型別,可以設定其 MetadataExtractor。
您需要設定支援的元資料和資料格式所需的 Encoder
和 Decoder
例項。您可能需要 spring-web
模組來獲取編解碼器實現。
預設情況下,SimpleRouteMatcher
使用 AntPathMatcher
進行路由匹配。我們建議插入 spring-web
中的 PathPatternRouteMatcher
以實現高效的路由匹配。RSocket 路由可以是分層的,但不是 URL 路徑。預設情況下,兩個路由匹配器都配置使用 "." 作為分隔符,並且不像 HTTP URL 那樣進行 URL 解碼。
RSocketMessageHandler
可以透過 RSocketStrategies
進行配置,如果您需要在同一程序中在客戶端和伺服器之間共享配置,這可能會很有用:
-
Java
-
Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.routeMatcher(new PathPatternRouteMatcher())
.build();
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
rSocketStrategies = rsocketStrategies()
}
@Bean
fun rsocketStrategies() = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.routeMatcher(PathPatternRouteMatcher())
.build()
}
客戶端響應器
客戶端的註解響應器需要在 RSocketRequester.Builder
中進行配置。詳情請參閱 客戶端響應器。
@MessageMapping
-
Java
-
Kotlin
@Controller
public class RadarsController {
@MessageMapping("locate.radars.within")
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
@Controller
class RadarsController {
@MessageMapping("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
上面的 @MessageMapping
方法響應路由為 "locate.radars.within" 的 Request-Stream 互動。它支援靈活的方法簽名,可以選擇使用以下方法引數:
方法引數 | 描述 |
---|---|
|
請求的 payload。這可以是具體的值,也可以是非同步型別(如 注意: 是否使用此註解是可選的。一個不是簡單型別且不是其他受支援引數的方法引數,被假定為預期的 payload。 |
|
用於向遠端端發起請求的請求器。 |
|
根據對映模式中的變數從路由中提取的值,例如 |
|
按照 MetadataExtractor 中所述註冊用於提取的元資料值。 |
|
按照 MetadataExtractor 中所述註冊用於提取的所有元資料值。 |
返回值預計為一個或多個物件,這些物件將被序列化為響應 payload。這可以是非同步型別(如 Mono
或 Flux
),具體值,或者 void
或無值的非同步型別(如 Mono<Void>
)。
@MessageMapping
方法支援的 RSocket 互動型別取決於輸入(即 @Payload
引數)和輸出的基數,其中基數表示以下含義:
基數 | 描述 |
---|---|
1 |
具體值,或單值非同步型別,例如 |
多 |
多值非同步型別,例如 |
0 |
對於輸入,這意味著方法沒有 對於輸出,這是 |
下表顯示了所有輸入和輸出基數組合以及相應的互動型別:
輸入基數 | 輸出基數 | 互動型別 |
---|---|---|
0, 1 |
0 |
Fire-and-Forget, Request-Response |
0, 1 |
1 |
Request-Response |
0, 1 |
多 |
Request-Stream |
多 |
0, 1, 多 |
Request-Channel |
@RSocketExchange
作為 @MessageMapping
的替代方案,您也可以使用 @RSocketExchange
方法處理請求。這些方法在 RSocket 介面 上宣告,可以透過 RSocketServiceProxyFactory
用作請求器,或由響應器實現。
例如,作為響應器處理請求:
-
Java
-
Kotlin
public interface RadarsService {
@RSocketExchange("locate.radars.within")
Flux<AirportLocation> radars(MapRequest request);
}
@Controller
public class RadarsController implements RadarsService {
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
interface RadarsService {
@RSocketExchange("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation>
}
@Controller
class RadarsController : RadarsService {
override fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
@RSocketExchange
和 @MessageMapping
之間存在一些差異,因為前者需要保持適用於請求器和響應器。例如,雖然 @MessageMapping
可以宣告用於處理任意數量的路由且每個路由可以是模式,但 @RSocketExchange
必須宣告一個具體的單一路由。在支援的與元資料相關的方法引數方面也存在細微差異,請參閱 @MessageMapping
和 RSocket 介面 檢視支援引數的列表。
@RSocketExchange
可以用在型別級別,為給定 RSocket 服務介面的所有路由指定一個公共字首。
@ConnectMapping
@ConnectMapping
在 RSocket 連線開始時處理 SETUP
幀,以及透過 METADATA_PUSH
幀進行的後續元資料推送通知,即 io.rsocket.RSocket
中的 metadataPush(Payload)
。
@ConnectMapping
方法支援與 @MessageMapping
相同的引數,但基於 SETUP
和 METADATA_PUSH
幀中的元資料和資料。@ConnectMapping
可以包含模式以將處理範圍縮小到元資料中包含路由的特定連線,或者如果未宣告模式,則匹配所有連線。
@ConnectMapping
方法不能返回資料,必須宣告返回值為 void
或 Mono<Void>
。如果處理新連線時返回錯誤,則連線將被拒絕。處理不應被阻塞以等待向連線的 RSocketRequester
發起請求。詳情請參閱 伺服器端請求器。
MetadataExtractor
響應器必須解釋元資料。組合元資料 允許獨立格式化的元資料值(例如,用於路由、安全、跟蹤),每個值都有自己的 mime 型別。應用程式需要一種方法來配置支援的元資料 mime 型別,以及一種訪問提取值的方法。
MetadataExtractor
是一個契約,用於接收序列化的元資料並返回解碼後的名稱-值對,這些名稱-值對可以透過名稱像頭部一樣訪問,例如透過註解處理方法中的 @Header
。
DefaultMetadataExtractor
可以提供 Decoder
例項來解碼元資料。它開箱即用地內建支援 "message/x.rsocket.routing.v0",它將其解碼為 String
並儲存在 "route" 鍵下。對於任何其他 mime 型別,您需要提供一個 Decoder
並按如下方式註冊 mime 型別:
-
Java
-
Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")
組合元資料很適合組合獨立的元資料值。但是,請求器可能不支援組合元資料,或者可能選擇不使用它。為此,DefaultMetadataExtractor
可能需要自定義邏輯將解碼後的值對映到輸出 Map。以下是使用 JSON 作為元資料的示例:
-
Java
-
Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
MimeType.valueOf("application/vnd.myapp.metadata+json"),
new ParameterizedTypeReference<Map<String,String>>() {},
(jsonMap, outputMap) -> {
outputMap.putAll(jsonMap);
});
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
outputMap.putAll(jsonMap)
}
透過 RSocketStrategies
配置 MetadataExtractor
時,可以讓 RSocketStrategies.Builder
使用配置好的解碼器建立 extractor,然後簡單地使用回撥來定製註冊,如下所示:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.metadataExtractorRegistry(registry -> {
registry.metadataToExtract(fooMimeType, Foo.class, "foo");
// ...
})
.build();
import org.springframework.messaging.rsocket.metadataToExtract
val strategies = RSocketStrategies.builder()
.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
registry.metadataToExtract<Foo>(fooMimeType, "foo")
// ...
}
.build()
RSocket 介面
Spring Framework 允許您將 RSocket 服務定義為包含 @RSocketExchange
方法的 Java 介面。您可以將這樣的介面傳遞給 RSocketServiceProxyFactory
以建立一個代理,該代理透過 RSocketRequester 執行請求。您也可以將該介面實現為處理請求的響應器。
首先建立包含 @RSocketExchange
方法的介面:
interface RadarService {
@RSocketExchange("radars")
Flux<AirportLocation> getRadars(@Payload MapRequest request);
// more RSocket exchange methods...
}
現在您可以建立一個在呼叫方法時執行請求的代理:
RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();
RadarService service = factory.createClient(RadarService.class);
您還可以實現該介面作為響應器來處理請求。請參閱 註解響應器。
方法引數
註解的 RSocket 交換方法支援靈活的方法簽名,帶有以下方法引數:
方法引數 | 描述 |
---|---|
|
新增路由變數以與 |
|
設定請求的輸入 payload。這可以是具體值,或可以透過 |
|
輸入 payload 中元資料條目的值。這可以是任何 |
|
元資料條目的 |
返回值
註解的 RSocket 交換方法支援具體值或可以透過 ReactiveAdapterRegistry
轉換為 Reactive Streams Publisher
的任何值生產者作為返回值。
預設情況下,具有同步(阻塞)方法簽名的 RSocket 服務方法的行為取決於底層 RSocket ClientTransport
的響應超時設定以及 RSocket keep-alive 設定。RSocketServiceProxyFactory.Builder
確實提供了一個 blockTimeout
選項,也允許您配置阻塞等待響應的最長時間,但我們建議在 RSocket 級別配置超時值以獲得更多控制。