RabbitMQ Stream 外掛的初步生產者支援

現已提供對 RabbitMQ Stream 外掛 的基本支援。要啟用此功能,您必須將 spring-rabbit-stream jar 新增到類路徑中 - 它必須與 spring-amqpspring-rabbit 版本相同。

當您將 producerType 屬性設定為 STREAM_SYNCSTREAM_ASYNC 時,上述生產者屬性將不受支援。

要配置繫結器使用流 ProducerType,Spring Boot 將從應用屬性中配置一個 Environment @Bean。您可以選擇新增一個定製器來定製訊息處理器。

@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
    return (hand, dest) -> {
        RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
        handler.setConfirmTimeout(5000);
        ((RabbitStreamTemplate) handler.getStreamOperations()).setProducerCustomizer(
                (name, builder) -> {
                    ...
                });
    };
}

有關配置環境和生產者構建器的資訊,請參閱 RabbitMQ Stream Java 客戶端文件

RabbitMQ Super Streams 的生產者支援

有關超級流(Super Streams)的資訊,請參閱 Super Streams

使用超級流可以實現自動伸縮,每個超級流分割槽上只有一個活躍消費者。使用 Spring Cloud Stream,您可以透過 AMQP 或使用流客戶端釋出到超級流。

超級流必須已經存在;生產者繫結不支援建立超級流。

透過 AMQP 釋出到超級流

spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

使用流客戶端釋出到超級流

spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.producer-type=stream-async
spring.cloud.stream.rabbit.bindings.output.producer.super-stream=true
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

使用流客戶端時,如果您設定了 confirmAckChannel,成功傳送訊息的副本將被髮送到該通道。