對 RabbitMQ Stream 外掛的初始消費者支援

現在提供了對 RabbitMQ Stream Plugin 的基本支援。要啟用此功能,您必須將 spring-rabbit-stream jar 新增到 classpath 中——它的版本必須與 spring-amqpspring-rabbit 的版本相同。

當您將 containerType 屬性設定為 stream 時,上面描述的消費者屬性不受支援;對於 super streams,僅支援 concurrency。每個繫結只能消費一個 stream 佇列。

要配置 binder 使用 containerType=stream,Spring Boot 將自動從應用屬性中配置一個 Environment @Bean。您可以選擇新增一個定製器來定製監聽器容器。

@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
    return (cont, dest, group) -> {
        StreamListenerContainer container = (StreamListenerContainer) cont;
        container.setConsumerCustomizer((name, builder) -> {
            builder.offset(OffsetSpecification.first());
        });
        // ...
    };
}

傳遞給定製器的 name 引數格式為 destination + '.' + group + '.container'

stream 的 name()(用於偏移量跟蹤)被設定為繫結 destination + '.' + group。可以使用上面顯示的 ConsumerCustomizer 進行更改。如果您決定使用手動偏移量跟蹤,則 Context 作為訊息頭可用。

int count;

@Bean
public Consumer<Message<?>> input() {
    return msg -> {
        System.out.println(msg);
        if (++count % 1000 == 0) {
            Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
            context.consumer().store(context.offset());
        }
    };
}

有關配置環境和消費者構建器的資訊,請參考 RabbitMQ Stream Java Client 文件

RabbitMQ Super Streams 的消費者支援

有關 super streams 的資訊,請參閱 Super Streams

使用 super streams 允許自動伸縮,每個 super stream 的分割槽上只有一個活躍消費者。

配置示例

@Bean
public Consumer<Thing> input() {
    ...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true

框架將建立一個名為 super 的 super stream,包含 9 個分割槽。可以部署多達 3 個此應用的例項。