RabbitMQ Stream 外掛的初始消費者支援
現在提供了對 RabbitMQ Stream 外掛 的基本支援。要啟用此功能,您必須將 spring-rabbit-stream jar 新增到類路徑中——它必須與 spring-amqp 和 spring-rabbit 的版本相同。
當您將 containerType 屬性設定為 stream 時,上述消費者屬性不受支援;concurrency 僅對超級流支援。每個繫結只能消費一個流佇列。 |
要配置繫結器使用 containerType=stream,Spring Boot 將根據應用程式屬性自動配置一個 Environment @Bean。您可以選擇新增一個自定義器來定製監聽器容器。
@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
return (cont, dest, group) -> {
StreamListenerContainer container = (StreamListenerContainer) cont;
container.setConsumerCustomizer((name, builder) -> {
builder.offset(OffsetSpecification.first());
});
// ...
};
}
傳遞給自定義器的 name 引數是 destination + '.' + group + '.container'。
流的 name()(用於偏移量跟蹤)設定為繫結 destination + '.' + group。它可以使用上面所示的 ConsumerCustomizer 進行更改。如果您決定使用手動偏移量跟蹤,Context 可作為訊息頭提供
int count;
@Bean
public Consumer<Message<?>> input() {
return msg -> {
System.out.println(msg);
if (++count % 1000 == 0) {
Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
context.consumer().store(context.offset());
}
};
}
有關配置環境和消費者構建器的資訊,請參閱 RabbitMQ Stream Java 客戶端文件。
RabbitMQ 超級流的消費者支援
有關超級流的資訊,請參閱 超級流。
使用超級流可以實現自動伸縮,每個超級流分割槽上都有一個活躍消費者。
配置示例
@Bean
public Consumer<Thing> input() {
...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true
該框架將建立一個名為 super 的超級流,包含 9 個分割槽。此應用程式最多可以部署 3 個例項。