ZeroMQ 支援
Spring Integration 提供了元件來支援應用程式中的 ZeroMQ 通訊。該實現基於 JeroMQ 庫中受良好支援的 Java API。所有元件都封裝了 ZeroMQ 套接字生命週期,併為其內部管理執行緒,使這些元件的互動無鎖且執行緒安全。
專案需要此依賴項
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-zeromq</artifactId>
<version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-zeromq:7.0.0"
ZeroMQ 代理
ZeroMqProxy 是內建 ZMQ.proxy() 函式 的 Spring 友好包裝器。它封裝了套接字生命週期和執行緒管理。此代理的客戶端仍然可以使用標準的 ZeroMQ 套接字連線和互動 API。除了標準的 ZContext,它還需要一個眾所周知的 ZeroMQ 代理模式:SUB/PUB、PULL/PUSH 或 ROUTER/DEALER。這樣,將使用一對適當的 ZeroMQ 套接字型別用於代理的前端和後端。詳見 ZeroMqProxy.Type。
ZeroMqProxy 實現了 SmartLifecycle,用於建立、繫結和配置套接字,並在專用的執行緒中從 Executor(如果有)啟動 ZMQ.proxy()。前端和後端套接字的繫結透過 tcp:// 協議在所有可用的網路介面上使用提供的埠完成。否則,它們將繫結到隨機埠,以後可以透過相應的 getFrontendPort() 和 getBackendPort() API 方法獲取。
控制套接字作為 SocketType.PAIR 透過 "inproc://" + beanName + ".control" 地址的執行緒間傳輸公開;可以透過 getControlAddress() 獲取。它應該與同一應用程式中的另一個 SocketType.PAIR 套接字一起使用,以傳送 ZMQ.PROXY_TERMINATE、ZMQ.PROXY_PAUSE 和/或 ZMQ.PROXY_RESUME 命令。當為其生命週期呼叫 stop() 時,ZeroMqProxy 會執行 ZMQ.PROXY_TERMINATE 命令,以終止 ZMQ.proxy() 迴圈並優雅地關閉所有繫結的套接字。
setExposeCaptureSocket(boolean) 選項導致此元件繫結一個額外的執行緒間套接字,型別為 SocketType.PUB,用於捕獲和釋出前端和後端套接字之間的所有通訊,正如 ZMQ.proxy() 實現所述。此套接字繫結到 "inproc://" + beanName + ".capture" 地址,不期望任何特定的訂閱進行過濾。
前端和後端套接字可以透過額外的屬性進行自定義,例如讀/寫超時或安全性。此自定義分別透過 setFrontendSocketConfigurer(Consumer<ZMQ.Socket>) 和 setBackendSocketConfigurer(Consumer<ZMQ.Socket>) 回撥提供。
ZeroMqProxy 可以作為簡單的 Bean 提供,如下所示
@Bean
ZeroMqProxy zeroMqProxy() {
ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
proxy.setExposeCaptureSocket(true);
proxy.setFrontendPort(6001);
proxy.setBackendPort(6002);
return proxy;
}
所有客戶端節點都應該透過 tcp:// 連線到此代理的主機,並使用其感興趣的相應埠。
ZeroMQ 訊息通道
ZeroMqChannel 是一個 SubscribableChannel,它使用一對 ZeroMQ 套接字連線釋出者和訂閱者進行訊息互動。它可以在 PUB/SUB 模式下工作(預設為 PUSH/PULL);它也可以用作本地執行緒間通道(使用 PAIR 套接字)——在這種情況下不提供 connectUrl。在分散式模式下,它必須連線到外部管理的 ZeroMQ 代理,在那裡它可以與其他連線到同一代理的類似通道交換訊息。連線 url 選項是一個標準的 ZeroMQ 連線字串,包含協議和主機,以及一對用於 ZeroMQ 代理前端和後端套接字的埠(用冒號分隔)。為方便起見,如果通道與代理配置在同一個應用程式中,則可以提供 ZeroMqProxy 例項而不是連線字串。
傳送和接收套接字都在各自的專用執行緒中管理,使該通道具有併發友好性。這樣,我們就可以在不同的執行緒中向/從 ZeroMqChannel 釋出和消費,而無需同步。
預設情況下,ZeroMqChannel 使用 EmbeddedHeadersJsonMessageMapper 透過 Jackson JSON 處理器將 Message(包括頭)序列化/反序列化為 byte[]。此邏輯可以透過 setMessageMapper(BytesMessageMapper) 進行配置。
傳送和接收套接字可以透過各自的 setSendSocketConfigurer(Consumer<ZMQ.Socket>) 和 setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>) 回撥進行任何選項(讀/寫超時、安全性等)的自定義。
ZeroMqChannel 的內部邏輯基於 Project Reactor Flux 和 Mono 運算子的反應式流。這提供了更簡單的執行緒控制,並允許對通道進行無鎖併發釋出和消費。本地 PUB/SUB 邏輯實現為 Flux.publish() 運算子,以允許該通道的所有本地訂閱者接收相同的已釋出訊息,就像分散式訂閱者接收 PUB 套接字的訊息一樣。
以下是 ZeroMqChannel 配置的一個簡單示例
@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setConnectUrl("tcp://:6001:6002");
channel.setConsumeDelay(Duration.ofMillis(100));
return channel;
}
ZeroMQ 入站通道介面卡
ZeroMqMessageProducer 是一個具有響應式語義的 MessageProducerSupport 實現。它以非阻塞方式不斷從 ZeroMQ 套接字讀取資料,並將訊息釋出到無限的 Flux,該 Flux 由 FluxMessageChannel 訂閱,或者在 start() 方法中顯式訂閱(如果輸出通道不是響應式的)。當套接字上未收到資料時,在下一次讀取嘗試之前會應用一個 consumeDelay(預設為 1 秒)。
ZeroMqMessageProducer 僅支援 SocketType.PAIR、SocketType.PULL 和 SocketType.SUB。此元件可以連線到遠端套接字,也可以繫結到 TCP 協議,並使用提供的或隨機埠。在元件啟動且 ZeroMQ 套接字繫結後,可以透過 getBoundPort() 獲取實際埠。套接字選項(例如安全性或寫入超時)可以透過 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) 回撥進行配置。
如果 receiveRaw 選項設定為 true,從套接字消費的 ZMsg 將按原樣作為生成 Message 的 payload 傳送:解析和轉換 ZMsg 的工作將由下游流完成。否則,將使用 InboundMessageMapper 將消費的資料轉換為 Message。如果收到的 ZMsg 是多幀的,則第一幀被視為此 ZeroMQ 訊息釋出到的 ZeroMqHeaders.TOPIC 頭部。
如果 unwrapTopic 選項設定為 false,則傳入訊息被認為由兩幀組成:主題和 ZeroMQ 訊息。否則,預設情況下,ZMsg 被認為由三幀組成:第一幀包含主題,最後一幀包含訊息,中間有一個空幀。
對於 SocketType.SUB,ZeroMqMessageProducer 使用提供的 topics 選項進行訂閱;預設為訂閱所有。訂閱可以在執行時使用 subscribeToTopics() 和 unsubscribeFromTopics() @ManagedOperation 進行調整。
以下是 ZeroMqMessageProducer 配置的示例
@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
messageProducer.setOutputChannel(outputChannel);
messageProducer.setTopics("some");
messageProducer.setReceiveRaw(true);
messageProducer.setBindPort(7070);
messageProducer.setConsumeDelay(Duration.ofMillis(100));
return messageProducer;
}
ZeroMQ 出站通道介面卡
ZeroMqMessageHandler 是一個 ReactiveMessageHandler 實現,用於將釋出訊息傳送到 ZeroMQ 套接字。僅支援 SocketType.PAIR、SocketType.PUSH 和 SocketType.PUB。此元件可以連線到遠端套接字,也可以繫結到 TCP 協議,並使用提供的或隨機埠。在元件啟動且 ZeroMQ 套接字繫結後,可以透過 getBoundPort() 獲取實際埠。
當使用 SocketType.PUB 時,將根據請求訊息評估 topicExpression,以將主題幀注入 ZeroMQ 訊息中(如果它不為空)。訂閱者端(SocketType.SUB)必須首先接收主題幀,然後才能解析實際資料。
如果 wrapTopic 選項設定為 false,則 ZeroMQ 訊息幀將在注入的主題之後傳送(如果存在)。預設情況下,主題和訊息之間會發送一個額外的空幀。
當請求訊息的有效載荷是 ZMsg 時,不執行任何轉換或主題提取:ZMsg 將按原樣傳送到套接字,並且不會銷燬以供將來可能重用。否則,將使用 OutboundMessageMapper<byte[]> 將請求訊息(或僅其有效載荷)轉換為要釋出的 ZeroMQ 幀。預設情況下,使用附帶 ConfigurableCompositeMessageConverter 的 ConvertingBytesMessageMapper。套接字選項(例如安全性或寫入超時)可以透過 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) 回撥進行配置。
以下是連線到套接字的 ZeroMqMessageHandler 配置示例
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, "tcp://:6060", SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedHeadersJsonMessageMapper());
}
以下是繫結到指定埠的 ZeroMqMessageHandler 配置示例
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, 7070, SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedHeadersJsonMessageMapper());
}
ZeroMQ Java DSL 支援
spring-integration-zeromq 透過 ZeroMq 工廠和上述元件的 IntegrationComponentSpec 實現提供了方便的 Java DSL 流式 API。
這是 ZeroMqChannel 的 Java DSL 示例
.channel(ZeroMq.zeroMqChannel(this.context)
.connectUrl("tcp://:6001:6002")
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的入站通道介面卡是
IntegrationFlow.from(
ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
.connectUrl("tcp://:9000")
.topics("someTopic")
.receiveRaw(true)
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的出站通道介面卡是
.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://:9001", SocketType.PUB)
.topicFunction(message -> message.getHeaders().get("myTopic")))
}