RabbitMQ Stream 外掛的初始生產者支援
現在提供了對 RabbitMQ Stream 外掛 的基本支援。要啟用此功能,您必須將 spring-rabbit-stream jar 新增到類路徑中 - 它必須與 spring-amqp 和 spring-rabbit 版本相同。
當您將 producerType 屬性設定為 STREAM_SYNC 或 STREAM_ASYNC 時,不支援上述生產者屬性。 |
要配置繫結器以使用流 ProducerType,Spring Boot 將從應用程式屬性配置一個 Environment @Bean。您可以選擇新增一個自定義器來定製訊息處理器。
@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
return (hand, dest) -> {
RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
handler.setConfirmTimeout(5000);
((RabbitStreamTemplate) handler.getStreamOperations()).setProducerCustomizer(
(name, builder) -> {
...
});
};
}
有關配置環境和生產者構建器的資訊,請參閱 RabbitMQ Stream Java 客戶端文件。
對 RabbitMQ 超級流的生產者支援
有關超級流的資訊,請參閱 超級流。
使用超級流可以在超級流的每個分割槽上實現自動的彈性伸縮,每個分割槽只有一個活躍消費者。使用 Spring Cloud Stream,您可以透過 AMQP 或使用流客戶端釋出到超級流。
| 超級流必須已經存在;生產者繫結不支援建立超級流。 |
透過 AMQP 釋出到超級流
spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
使用流客戶端釋出到超級流
spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.producer-type=stream-async
spring.cloud.stream.rabbit.bindings.output.producer.super-stream=true
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
使用流客戶端時,如果您設定了 confirmAckChannel,則成功傳送的訊息副本將傳送到該通道。