RabbitMQ Stream 外掛的初始消費者支援

現在提供了對 RabbitMQ Stream 外掛 的基本支援。要啟用此功能,您必須將 spring-rabbit-stream jar 新增到類路徑中——它必須與 spring-amqpspring-rabbit 的版本相同。

當您將 containerType 屬性設定為 stream 時,上述消費者屬性不受支援;concurrency 僅對超級流支援。每個繫結只能消費一個流佇列。

要配置繫結器使用 containerType=stream,Spring Boot 將根據應用程式屬性自動配置一個 Environment @Bean。您可以選擇新增一個自定義器來定製監聽器容器。

@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
    return (cont, dest, group) -> {
        StreamListenerContainer container = (StreamListenerContainer) cont;
        container.setConsumerCustomizer((name, builder) -> {
            builder.offset(OffsetSpecification.first());
        });
        // ...
    };
}

傳遞給自定義器的 name 引數是 destination + '.' + group + '.container'

流的 name()(用於偏移量跟蹤)設定為繫結 destination + '.' + group。它可以使用上面所示的 ConsumerCustomizer 進行更改。如果您決定使用手動偏移量跟蹤,Context 可作為訊息頭提供

int count;

@Bean
public Consumer<Message<?>> input() {
    return msg -> {
        System.out.println(msg);
        if (++count % 1000 == 0) {
            Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
            context.consumer().store(context.offset());
        }
    };
}

有關配置環境和消費者構建器的資訊,請參閱 RabbitMQ Stream Java 客戶端文件

RabbitMQ 超級流的消費者支援

有關超級流的資訊,請參閱 超級流

使用超級流可以實現自動伸縮,每個超級流分割槽上都有一個活躍消費者。

配置示例

@Bean
public Consumer<Thing> input() {
    ...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true

該框架將建立一個名為 super 的超級流,包含 9 個分割槽。此應用程式最多可以部署 3 個例項。

© . This site is unofficial and not affiliated with VMware.