ZeroMQ 支援

Spring Integration 提供了元件來支援應用程式中的 ZeroMQ 通訊。該實現基於 JeroMQ 庫中受良好支援的 Java API。所有元件都封裝了 ZeroMQ 套接字生命週期,併為其內部管理執行緒,使這些元件的互動無鎖且執行緒安全。

專案需要此依賴項

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-zeromq</artifactId>
    <version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-zeromq:7.0.0"

ZeroMQ 代理

ZeroMqProxy 是內建 ZMQ.proxy() 函式 的 Spring 友好包裝器。它封裝了套接字生命週期和執行緒管理。此代理的客戶端仍然可以使用標準的 ZeroMQ 套接字連線和互動 API。除了標準的 ZContext,它還需要一個眾所周知的 ZeroMQ 代理模式:SUB/PUB、PULL/PUSH 或 ROUTER/DEALER。這樣,將使用一對適當的 ZeroMQ 套接字型別用於代理的前端和後端。詳見 ZeroMqProxy.Type

ZeroMqProxy 實現了 SmartLifecycle,用於建立、繫結和配置套接字,並在專用的執行緒中從 Executor(如果有)啟動 ZMQ.proxy()。前端和後端套接字的繫結透過 tcp:// 協議在所有可用的網路介面上使用提供的埠完成。否則,它們將繫結到隨機埠,以後可以透過相應的 getFrontendPort()getBackendPort() API 方法獲取。

控制套接字作為 SocketType.PAIR 透過 "inproc://" + beanName + ".control" 地址的執行緒間傳輸公開;可以透過 getControlAddress() 獲取。它應該與同一應用程式中的另一個 SocketType.PAIR 套接字一起使用,以傳送 ZMQ.PROXY_TERMINATEZMQ.PROXY_PAUSE 和/或 ZMQ.PROXY_RESUME 命令。當為其生命週期呼叫 stop() 時,ZeroMqProxy 會執行 ZMQ.PROXY_TERMINATE 命令,以終止 ZMQ.proxy() 迴圈並優雅地關閉所有繫結的套接字。

setExposeCaptureSocket(boolean) 選項導致此元件繫結一個額外的執行緒間套接字,型別為 SocketType.PUB,用於捕獲和釋出前端和後端套接字之間的所有通訊,正如 ZMQ.proxy() 實現所述。此套接字繫結到 "inproc://" + beanName + ".capture" 地址,不期望任何特定的訂閱進行過濾。

前端和後端套接字可以透過額外的屬性進行自定義,例如讀/寫超時或安全性。此自定義分別透過 setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)setBackendSocketConfigurer(Consumer<ZMQ.Socket>) 回撥提供。

ZeroMqProxy 可以作為簡單的 Bean 提供,如下所示

@Bean
ZeroMqProxy zeroMqProxy() {
    ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
    proxy.setExposeCaptureSocket(true);
    proxy.setFrontendPort(6001);
    proxy.setBackendPort(6002);
    return proxy;
}

所有客戶端節點都應該透過 tcp:// 連線到此代理的主機,並使用其感興趣的相應埠。

ZeroMQ 訊息通道

ZeroMqChannel 是一個 SubscribableChannel,它使用一對 ZeroMQ 套接字連線釋出者和訂閱者進行訊息互動。它可以在 PUB/SUB 模式下工作(預設為 PUSH/PULL);它也可以用作本地執行緒間通道(使用 PAIR 套接字)——在這種情況下不提供 connectUrl。在分散式模式下,它必須連線到外部管理的 ZeroMQ 代理,在那裡它可以與其他連線到同一代理的類似通道交換訊息。連線 url 選項是一個標準的 ZeroMQ 連線字串,包含協議和主機,以及一對用於 ZeroMQ 代理前端和後端套接字的埠(用冒號分隔)。為方便起見,如果通道與代理配置在同一個應用程式中,則可以提供 ZeroMqProxy 例項而不是連線字串。

傳送和接收套接字都在各自的專用執行緒中管理,使該通道具有併發友好性。這樣,我們就可以在不同的執行緒中向/從 ZeroMqChannel 釋出和消費,而無需同步。

預設情況下,ZeroMqChannel 使用 EmbeddedHeadersJsonMessageMapper 透過 Jackson JSON 處理器將 Message(包括頭)序列化/反序列化為 byte[]。此邏輯可以透過 setMessageMapper(BytesMessageMapper) 進行配置。

傳送和接收套接字可以透過各自的 setSendSocketConfigurer(Consumer<ZMQ.Socket>)setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>) 回撥進行任何選項(讀/寫超時、安全性等)的自定義。

ZeroMqChannel 的內部邏輯基於 Project Reactor FluxMono 運算子的反應式流。這提供了更簡單的執行緒控制,並允許對通道進行無鎖併發釋出和消費。本地 PUB/SUB 邏輯實現為 Flux.publish() 運算子,以允許該通道的所有本地訂閱者接收相同的已釋出訊息,就像分散式訂閱者接收 PUB 套接字的訊息一樣。

以下是 ZeroMqChannel 配置的一個簡單示例

@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
    ZeroMqChannel channel = new ZeroMqChannel(context, true);
    channel.setConnectUrl("tcp://:6001:6002");
    channel.setConsumeDelay(Duration.ofMillis(100));
    return channel;
}

ZeroMQ 入站通道介面卡

ZeroMqMessageProducer 是一個具有響應式語義的 MessageProducerSupport 實現。它以非阻塞方式不斷從 ZeroMQ 套接字讀取資料,並將訊息釋出到無限的 Flux,該 FluxFluxMessageChannel 訂閱,或者在 start() 方法中顯式訂閱(如果輸出通道不是響應式的)。當套接字上未收到資料時,在下一次讀取嘗試之前會應用一個 consumeDelay(預設為 1 秒)。

ZeroMqMessageProducer 僅支援 SocketType.PAIRSocketType.PULLSocketType.SUB。此元件可以連線到遠端套接字,也可以繫結到 TCP 協議,並使用提供的或隨機埠。在元件啟動且 ZeroMQ 套接字繫結後,可以透過 getBoundPort() 獲取實際埠。套接字選項(例如安全性或寫入超時)可以透過 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) 回撥進行配置。

如果 receiveRaw 選項設定為 true,從套接字消費的 ZMsg 將按原樣作為生成 Message 的 payload 傳送:解析和轉換 ZMsg 的工作將由下游流完成。否則,將使用 InboundMessageMapper 將消費的資料轉換為 Message。如果收到的 ZMsg 是多幀的,則第一幀被視為此 ZeroMQ 訊息釋出到的 ZeroMqHeaders.TOPIC 頭部。

如果 unwrapTopic 選項設定為 false,則傳入訊息被認為由兩幀組成:主題和 ZeroMQ 訊息。否則,預設情況下,ZMsg 被認為由三幀組成:第一幀包含主題,最後一幀包含訊息,中間有一個空幀。

對於 SocketType.SUBZeroMqMessageProducer 使用提供的 topics 選項進行訂閱;預設為訂閱所有。訂閱可以在執行時使用 subscribeToTopics()unsubscribeFromTopics() @ManagedOperation 進行調整。

以下是 ZeroMqMessageProducer 配置的示例

@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
    ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
    messageProducer.setOutputChannel(outputChannel);
    messageProducer.setTopics("some");
    messageProducer.setReceiveRaw(true);
    messageProducer.setBindPort(7070);
    messageProducer.setConsumeDelay(Duration.ofMillis(100));
    return messageProducer;
}

ZeroMQ 出站通道介面卡

ZeroMqMessageHandler 是一個 ReactiveMessageHandler 實現,用於將釋出訊息傳送到 ZeroMQ 套接字。僅支援 SocketType.PAIRSocketType.PUSHSocketType.PUB。此元件可以連線到遠端套接字,也可以繫結到 TCP 協議,並使用提供的或隨機埠。在元件啟動且 ZeroMQ 套接字繫結後,可以透過 getBoundPort() 獲取實際埠。

當使用 SocketType.PUB 時,將根據請求訊息評估 topicExpression,以將主題幀注入 ZeroMQ 訊息中(如果它不為空)。訂閱者端(SocketType.SUB)必須首先接收主題幀,然後才能解析實際資料。

如果 wrapTopic 選項設定為 false,則 ZeroMQ 訊息幀將在注入的主題之後傳送(如果存在)。預設情況下,主題和訊息之間會發送一個額外的空幀。

當請求訊息的有效載荷是 ZMsg 時,不執行任何轉換或主題提取:ZMsg 將按原樣傳送到套接字,並且不會銷燬以供將來可能重用。否則,將使用 OutboundMessageMapper<byte[]> 將請求訊息(或僅其有效載荷)轉換為要釋出的 ZeroMQ 幀。預設情況下,使用附帶 ConfigurableCompositeMessageConverterConvertingBytesMessageMapper。套接字選項(例如安全性或寫入超時)可以透過 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) 回撥進行配置。

以下是連線到套接字的 ZeroMqMessageHandler 配置示例

@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
    ZeroMqMessageHandler messageHandler =
                  new ZeroMqMessageHandler(context, "tcp://:6060", SocketType.PUB);
    messageHandler.setTopicExpression(
                  new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
    messageHandler.setMessageMapper(new EmbeddedHeadersJsonMessageMapper());
}

以下是繫結到指定埠的 ZeroMqMessageHandler 配置示例

@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
    ZeroMqMessageHandler messageHandler =
                  new ZeroMqMessageHandler(context, 7070, SocketType.PUB);
    messageHandler.setTopicExpression(
                  new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
    messageHandler.setMessageMapper(new EmbeddedHeadersJsonMessageMapper());
}

ZeroMQ Java DSL 支援

spring-integration-zeromq 透過 ZeroMq 工廠和上述元件的 IntegrationComponentSpec 實現提供了方便的 Java DSL 流式 API。

這是 ZeroMqChannel 的 Java DSL 示例

.channel(ZeroMq.zeroMqChannel(this.context)
            .connectUrl("tcp://:6001:6002")
            .consumeDelay(Duration.ofMillis(100)))
}

ZeroMQ Java DSL 的入站通道介面卡是

IntegrationFlow.from(
            ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
                        .connectUrl("tcp://:9000")
                        .topics("someTopic")
                        .receiveRaw(true)
                        .consumeDelay(Duration.ofMillis(100)))
}

ZeroMQ Java DSL 的出站通道介面卡是

.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://:9001", SocketType.PUB)
                  .topicFunction(message -> message.getHeaders().get("myTopic")))
}
© . This site is unofficial and not affiliated with VMware.