ZeroMQ 支援

Spring Integration 提供元件來支援應用程式中的 ZeroMQ 通訊。該實現基於 JeroMQ 庫良好支援的 Java API。所有元件都封裝了 ZeroMQ socket 的生命週期並在內部管理它們的執行緒,從而使這些元件的互動是無鎖和執行緒安全的。

您需要將此依賴項包含到您的專案中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-zeromq</artifactId>
    <version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-zeromq:6.4.4"

ZeroMQ Proxy

ZeroMqProxy 是內建的 ZMQ.proxy() 函式 的 Spring 友好型包裝器。它封裝了 socket 生命週期和執行緒管理。此 proxy 的客戶端仍然可以使用標準的 ZeroMQ socket 連線和互動 API。除了標準的 ZContext 外,它還需要一種眾所周知的 ZeroMQ proxy 模式:SUB/PUB、PULL/PUSH 或 ROUTER/DEALER。這樣,針對 proxy 的前端和後端使用了適當的 ZeroMQ socket 型別對。詳情請參閱 ZeroMqProxy.Type

ZeroMqProxy 實現了 SmartLifecycle,用於建立、繫結和配置 socket,並在(如果存在)Executor 的專用執行緒中啟動 ZMQ.proxy()。前端和後端 socket 的繫結是透過 tcp:// 協議完成的,繫結到所有可用的網路介面上的指定埠。否則,它們會繫結到隨機埠,隨後可以透過相應的 getFrontendPort()getBackendPort() API 方法獲取這些埠。

控制 socket 以 SocketType.PAIR 型別透過程序間傳輸暴露在 "inproc://" + beanName + ".control" 地址上;可以透過 getControlAddress() 獲取。它應該與同一應用程式中使用另一個 SocketType.PAIR socket 結合使用,以傳送 ZMQ.PROXY_TERMINATEZMQ.PROXY_PAUSE 和/或 ZMQ.PROXY_RESUME 命令。當呼叫其生命週期的 stop() 方法時,ZeroMqProxy 會執行 ZMQ.PROXY_TERMINATE 命令,以優雅地終止 ZMQ.proxy() 迴圈並關閉所有繫結的 socket。

setExposeCaptureSocket(boolean) 選項會使此元件繫結一個額外的、帶有 SocketType.PUB 的程序間 socket,以捕獲和釋出前端和後端 socket 之間的所有通訊,正如 ZMQ.proxy() 實現所述。此 socket 繫結到 "inproc://" + beanName + ".capture" 地址,並且不期望任何特定的訂閱來進行過濾。

前端和後端 socket 可以透過附加屬性進行定製,例如讀寫超時或安全性。這種定製可透過 setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)setBackendSocketConfigurer(Consumer<ZMQ.Socket>) 回撥函式分別實現。

ZeroMqProxy 可以作為簡單的 bean 提供,如下所示

@Bean
ZeroMqProxy zeroMqProxy() {
    ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
    proxy.setExposeCaptureSocket(true);
    proxy.setFrontendPort(6001);
    proxy.setBackendPort(6002);
    return proxy;
}

所有客戶端節點應透過 tcp:// 連線到此 proxy 的主機,並使用其感興趣的相應埠。

ZeroMQ 訊息通道

ZeroMqChannel 是一個 SubscribableChannel,它使用一對 ZeroMQ socket 來連線釋出者和訂閱者進行訊息互動。它可以在 PUB/SUB 模式下工作(預設為 PUSH/PULL);它也可以用作本地程序間通道(使用 PAIR socket)——在這種情況下不提供 connectUrl。在分散式模式下,它必須連線到外部管理的 ZeroMQ proxy,在那裡它可以與連線到同一 proxy 的其他類似通道交換訊息。連線 url 選項是一個標準的 ZeroMQ 連線字串,包含協議、主機以及 ZeroMQ proxy 前端和後端 socket 的一對用冒號分隔的埠。為了方便起見,如果 ZeroMqProxy 例項與通道配置在同一個應用程式中,則可以向通道提供 ZeroMqProxy 例項而不是連線字串。

傳送和接收 socket 都在各自的專用執行緒中進行管理,使得此通道具有併發友好性。這樣,我們可以在不同的執行緒中釋出訊息到 ZeroMqChannel 並從中消費訊息,而無需同步。

預設情況下,ZeroMqChannel 使用 EmbeddedJsonHeadersMessageMapper 來使用 Jackson JSON 處理器將 Message (包括頭資訊) 序列化/反序列化為 byte[]。此邏輯可以透過 setMessageMapper(BytesMessageMapper) 進行配置。

傳送和接收 socket 可以透過各自的 setSendSocketConfigurer(Consumer<ZMQ.Socket>)setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>) 回撥函式定製任何選項(讀寫超時、安全性等)。

ZeroMqChannel 的內部邏輯基於透過 Project Reactor 的 FluxMono 運算子實現的響應式流。這提供了更容易的執行緒控制,並允許從通道進行無鎖併發釋出和消費。本地 PUB/SUB 邏輯實現為 Flux.publish() 運算子,允許連線到此通道的所有本地訂閱者接收相同的已釋出訊息,就像連線到 PUB socket 的分散式訂閱者一樣。

以下是一個 ZeroMqChannel 配置的簡單示例

@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
    ZeroMqChannel channel = new ZeroMqChannel(context, true);
    channel.setConnectUrl("tcp://:6001:6002");
    channel.setConsumeDelay(Duration.ofMillis(100));
    return channel;
}

ZeroMQ 入站通道介面卡

ZeroMqMessageProducer 是一個具有響應式語義的 MessageProducerSupport 實現。它以非阻塞方式持續從 ZeroMQ socket 讀取資料,並將訊息釋出到一個無限的 Flux 中,該 FluxFluxMessageChannel 訂閱,如果輸出通道不是響應式的,則在 start() 方法中顯式訂閱。當 socket 上沒有接收到資料時,在下一次讀取嘗試之前會應用一個 consumeDelay(預設為 1 秒)。

ZeroMqMessageProducer 僅支援 SocketType.PAIRSocketType.PULLSocketType.SUB。此元件可以連線到遠端 socket,或者透過提供的或隨機埠繫結到 TCP 協議。在元件啟動且 ZeroMQ socket 繫結後,實際埠可以透過 getBoundPort() 獲取。socket 選項(例如,安全性或寫入超時)可以透過 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) 回撥進行配置。

如果 receiveRaw 選項設定為 true,則從 socket 消費的 ZMsg 會原樣作為生成 Message 的 payload 傳送:由下游流負責解析和轉換 ZMsg。否則,使用 InboundMessageMapper 將消費的資料轉換為 Message。如果收到的 ZMsg 是多幀的,則第一幀被視為此 ZeroMQ 訊息釋出到的 ZeroMqHeaders.TOPIC 頭。

如果 unwrapTopic 選項設定為 false,則認為入站訊息由兩幀組成:主題和 ZeroMQ 訊息。否則,預設情況下,認為 ZMsg 由三幀組成:第一幀包含主題,最後一幀包含訊息,中間有一個空幀。

對於 SocketType.SUBZeroMqMessageProducer 使用提供的 topics 選項進行訂閱;預設為訂閱所有主題。可以使用 subscribeToTopics()unsubscribeFromTopics() @ManagedOperation 在執行時調整訂閱。

以下是 ZeroMqMessageProducer 配置的示例

@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
    ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
    messageProducer.setOutputChannel(outputChannel);
    messageProducer.setTopics("some");
    messageProducer.setReceiveRaw(true);
    messageProducer.setBindPort(7070);
    messageProducer.setConsumeDelay(Duration.ofMillis(100));
    return messageProducer;
}

ZeroMQ 出站通道介面卡

ZeroMqMessageHandler 是一個 ReactiveMessageHandler 實現,用於將訊息釋出到 ZeroMQ socket。僅支援 SocketType.PAIRSocketType.PUSHSocketType.PUB。此元件可以連線到遠端 socket,或者透過提供的或隨機埠繫結到 TCP 協議。在元件啟動且 ZeroMQ socket 繫結後,實際埠可以透過 getBoundPort() 獲取。

當使用 SocketType.PUB 時,會針對請求訊息評估 topicExpression,以便在 ZeroMQ 訊息中注入主題幀(如果不是 null)。訂閱者端(SocketType.SUB)必須先接收主題幀,然後才能解析實際資料。

如果 wrapTopic 選項設定為 false,則 ZeroMQ 訊息幀會在注入的主題(如果存在)之後傳送。預設情況下,在主題和訊息之間會發送一個額外的空幀。

當請求訊息的 payload 是 ZMsg 時,不執行轉換或主題提取:ZMsg 原樣傳送到 socket,並且不會被銷燬以便後續重用。否則,使用 OutboundMessageMapper<byte[]> 將請求訊息(或其 payload)轉換為 ZeroMQ 幀進行釋出。預設情況下,使用帶有 ConfigurableCompositeMessageConverterConvertingBytesMessageMapper。socket 選項(例如,安全性或寫入超時)可以透過 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) 回撥進行配置。

以下是連線到 socket 的 ZeroMqMessageHandler 配置示例

@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
    ZeroMqMessageHandler messageHandler =
                  new ZeroMqMessageHandler(context, "tcp://:6060", SocketType.PUB);
    messageHandler.setTopicExpression(
                  new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
    messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}

以下是繫結到指定埠的 ZeroMqMessageHandler 配置示例

@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
    ZeroMqMessageHandler messageHandler =
                  new ZeroMqMessageHandler(context, 7070, SocketType.PUB);
    messageHandler.setTopicExpression(
                  new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
    messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}

ZeroMQ Java DSL 支援

spring-integration-zeromq 透過 ZeroMq 工廠和 IntegrationComponentSpec 實現為上述元件提供了方便的 Java DSL 流暢 API。

這是 ZeroMqChannel 的 Java DSL 示例

.channel(ZeroMq.zeroMqChannel(this.context)
            .connectUrl("tcp://:6001:6002")
            .consumeDelay(Duration.ofMillis(100)))
}

用於 ZeroMQ Java DSL 的入站通道介面卡是

IntegrationFlow.from(
            ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
                        .connectUrl("tcp://:9000")
                        .topics("someTopic")
                        .receiveRaw(true)
                        .consumeDelay(Duration.ofMillis(100)))
}

用於 ZeroMQ Java DSL 的出站通道介面卡是

.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://:9001", SocketType.PUB)
                  .topicFunction(message -> message.getHeaders().get("myTopic")))
}