對 RabbitMQ Stream 外掛的初始消費者支援
現在提供了對 RabbitMQ Stream Plugin 的基本支援。要啟用此功能,您必須將 spring-rabbit-stream
jar 新增到 classpath 中——它的版本必須與 spring-amqp
和 spring-rabbit
的版本相同。
當您將 containerType 屬性設定為 stream 時,上面描述的消費者屬性不受支援;對於 super streams,僅支援 concurrency 。每個繫結只能消費一個 stream 佇列。 |
要配置 binder 使用 containerType=stream
,Spring Boot 將自動從應用屬性中配置一個 Environment
@Bean
。您可以選擇新增一個定製器來定製監聽器容器。
@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
return (cont, dest, group) -> {
StreamListenerContainer container = (StreamListenerContainer) cont;
container.setConsumerCustomizer((name, builder) -> {
builder.offset(OffsetSpecification.first());
});
// ...
};
}
傳遞給定製器的 name
引數格式為 destination + '.' + group + '.container'
。
stream 的 name()
(用於偏移量跟蹤)被設定為繫結 destination + '.' + group
。可以使用上面顯示的 ConsumerCustomizer
進行更改。如果您決定使用手動偏移量跟蹤,則 Context
作為訊息頭可用。
int count;
@Bean
public Consumer<Message<?>> input() {
return msg -> {
System.out.println(msg);
if (++count % 1000 == 0) {
Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
context.consumer().store(context.offset());
}
};
}
有關配置環境和消費者構建器的資訊,請參考 RabbitMQ Stream Java Client 文件。
RabbitMQ Super Streams 的消費者支援
有關 super streams 的資訊,請參閱 Super Streams。
使用 super streams 允許自動伸縮,每個 super stream 的分割槽上只有一個活躍消費者。
配置示例
@Bean
public Consumer<Thing> input() {
...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true
框架將建立一個名為 super
的 super stream,包含 9 個分割槽。可以部署多達 3 個此應用的例項。