STOMP 客戶端
Spring 提供了基於 WebSocket 的 STOMP 客戶端和基於 TCP 的 STOMP 客戶端。
首先,你可以建立並配置 WebSocketStompClient
,如下例所示
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats
在前一個示例中,你可以將 StandardWebSocketClient
替換為 SockJsClient
,因為它也是 WebSocketClient
的實現。SockJsClient
可以使用 WebSocket 或基於 HTTP 的傳輸作為回退。更多詳細資訊,請參閱 SockJsClient
。
接下來,你可以建立連線併為 STOMP 會話提供一個處理器,如下例所示
String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);
當會話準備就緒時,處理器會收到通知,如下例所示
public class MyStompSessionHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
// ...
}
}
一旦會話建立,任何有效載荷都可以傳送,並使用配置的 MessageConverter
進行序列化,如下例所示
session.send("/topic/something", "payload");
你還可以訂閱目的地。subscribe
方法需要一個用於處理訂閱上訊息的處理器,並返回一個 Subscription
控制代碼,你可以使用它來取消訂閱。對於接收到的每條訊息,處理器可以指定有效載荷應反序列化到的目標 Object
型別,如下例所示
session.subscribe("/topic/something", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
// ...
}
});
要啟用 STOMP 心跳,你可以為 WebSocketStompClient
配置一個 TaskScheduler
,並可選擇自定義心跳間隔(寫入不活動 10 秒觸發傳送心跳,讀取不活動 10 秒關閉連線)。
WebSocketStompClient
僅在不活動時傳送心跳,即沒有傳送其他訊息時。當使用外部 broker 時,這可能會帶來挑戰,因為非 broker 目的地的訊息表示活動,但實際上並未轉發到 broker。在這種情況下,你可以在初始化 外部 Broker 時配置一個 TaskScheduler
,它能確保即使只發送非 broker 目的地的訊息,心跳也會轉發到 broker。
當你在效能測試中使用 WebSocketStompClient 從同一臺機器模擬數千個客戶端時,考慮關閉心跳,因為每個連線都會安排自己的心跳任務,這對於在同一臺機器上執行大量客戶端來說不是最優的。 |
STOMP 協議還支援回執,客戶端必須新增一個 receipt
頭,伺服器在處理完傳送或訂閱後會回覆一個 RECEIPT 幀。為了支援這一點,StompSession
提供了 setAutoReceipt(boolean)
,它會在後續的每次傳送或訂閱事件中新增一個 receipt
頭。另外,你也可以手動將回執頭新增到 StompHeaders
中。send 和 subscribe 方法都返回一個 Receiptable
例項,你可以用它來註冊回執成功和失敗回撥。對於此功能,你必須為客戶端配置一個 TaskScheduler
以及回執過期前的時間(預設為 15 秒)。
請注意,StompSessionHandler
本身就是一個 StompFrameHandler
,它除了處理訊息處理中的異常(透過 handleException
回撥)和傳輸層錯誤(包括 ConnectionLostException
,透過 handleTransportError
回撥)外,還可以處理 ERROR 幀。
你可以使用 WebSocketStompClient
的 inboundMessageSizeLimit
和 outboundMessageSizeLimit
屬性來限制入站和出站 WebSocket 訊息的最大大小。當出站 STOMP 訊息超出限制時,它會被拆分成部分幀,接收方需要重新組裝。預設情況下,出站訊息沒有大小限制。當入站 STOMP 訊息大小超出配置限制時,會丟擲 StompConversionException
。入站訊息的預設大小限制為 64KB
。
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setInboundMessageSizeLimit(64 * 1024); // 64KB
stompClient.setOutboundMessageSizeLimit(64 * 1024); // 64KB