RabbitMQ Stream 外掛的初始生產者支援

現在提供了對 RabbitMQ Stream 外掛 的基本支援。要啟用此功能,您必須將 spring-rabbit-stream jar 新增到類路徑中 - 它必須與 spring-amqpspring-rabbit 版本相同。

當您將 producerType 屬性設定為 STREAM_SYNCSTREAM_ASYNC 時,不支援上述生產者屬性。

要配置繫結器以使用流 ProducerType,Spring Boot 將從應用程式屬性配置一個 Environment @Bean。您可以選擇新增一個自定義器來定製訊息處理器。

@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
    return (hand, dest) -> {
        RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
        handler.setConfirmTimeout(5000);
        ((RabbitStreamTemplate) handler.getStreamOperations()).setProducerCustomizer(
                (name, builder) -> {
                    ...
                });
    };
}

有關配置環境和生產者構建器的資訊,請參閱 RabbitMQ Stream Java 客戶端文件

對 RabbitMQ 超級流的生產者支援

有關超級流的資訊,請參閱 超級流

使用超級流可以在超級流的每個分割槽上實現自動的彈性伸縮,每個分割槽只有一個活躍消費者。使用 Spring Cloud Stream,您可以透過 AMQP 或使用流客戶端釋出到超級流。

超級流必須已經存在;生產者繫結不支援建立超級流。

透過 AMQP 釋出到超級流

spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

使用流客戶端釋出到超級流

spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.producer-type=stream-async
spring.cloud.stream.rabbit.bindings.output.producer.super-stream=true
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

使用流客戶端時,如果您設定了 confirmAckChannel,則成功傳送的訊息副本將傳送到該通道。

© . This site is unofficial and not affiliated with VMware.