使用 RabbitMQ 流外掛
版本 2.4 引入了對 RabbitMQ Stream 外掛 Java 客戶端 對 RabbitMQ Stream 外掛 的初步支援。
-
RabbitStreamTemplate -
StreamListenerContainer
將 spring-rabbit-stream 依賴項新增到您的專案中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-stream</artifactId>
<version>4.0.0</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbit-stream:4.0.0'
您可以使用 RabbitAdmin Bean 正常配置佇列,使用 QueueBuilder.stream() 方法指定佇列型別。例如:
@Bean
Queue stream() {
return QueueBuilder.durable("stream.queue1")
.stream()
.build();
}
但是,這僅在您還使用非流元件(例如 SimpleMessageListenerContainer 或 DirectMessageListenerContainer)時才有效,因為當 AMQP 連線開啟時,admin 會觸發宣告已定義的 Bean。如果您的應用程式只使用流元件,或者您希望使用高階流配置功能,則應改為配置 StreamAdmin。
@Bean
StreamAdmin streamAdmin(Environment env) {
return new StreamAdmin(env, sc -> {
sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
sc.stream("stream.queue2").create();
});
}
有關 StreamCreator 的更多資訊,請參閱 RabbitMQ 文件。
傳送訊息
RabbitStreamTemplate 提供 RabbitTemplate (AMQP) 功能的子集。
public interface RabbitStreamOperations extends AutoCloseable {
CompletableFuture<Boolean> send(Message message);
CompletableFuture<Boolean> convertAndSend(Object message);
CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);
CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);
MessageBuilder messageBuilder();
MessageConverter messageConverter();
StreamMessageConverter streamMessageConverter();
@Override
void close() throws AmqpException;
}
RabbitStreamTemplate 實現具有以下建構函式和屬性:
public RabbitStreamTemplate(Environment environment, String streamName) {
}
public void setMessageConverter(MessageConverter messageConverter) {
}
public void setStreamConverter(StreamMessageConverter streamConverter) {
}
public void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}
MessageConverter 用於 convertAndSend 方法中,將物件轉換為 Spring AMQP Message。
StreamMessageConverter 用於將 Spring AMQP Message 轉換為原生流 Message。
您也可以直接傳送原生流 Message;messageBuilder() 方法提供對 Producer 的訊息構建器的訪問。
ProducerCustomizer 提供一種在構建生產者之前自定義生產者的機制。
有關自定義 Environment 和 Producer 的資訊,請參閱 Java 客戶端文件。
接收訊息
非同步訊息接收由 StreamListenerContainer(以及使用 @RabbitListener 時的 StreamRabbitListenerContainerFactory)提供。
監聽器容器需要一個 Environment 以及一個流名稱。
您可以使用經典的 MessageListener 接收 Spring AMQP Message,也可以使用新介面接收原生流 Message:
public interface StreamMessageListener extends MessageListener {
void onStreamMessage(Message message, Context context);
}
有關支援的屬性資訊,請參閱 訊息監聽器容器配置。
與模板類似,容器具有 ConsumerCustomizer 屬性。
有關自定義 Environment 和 Consumer 的資訊,請參閱 Java 客戶端文件。
使用 @RabbitListener 時,配置 StreamRabbitListenerContainerFactory;此時,大多數 @RabbitListener 屬性(concurrency 等)將被忽略。只支援 id、queues、autoStartup 和 containerFactory。此外,queues 只能包含一個流名稱。
示例
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
template.setProducerCustomizer((name, builder) -> builder.name("test"));
return template;
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
return new StreamRabbitListenerContainerFactory(env);
}
@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
...
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
factory.setNativeListener(true);
factory.setConsumerCustomizer((id, builder) -> {
builder.name("myConsumer")
.offset(OffsetSpecification.first())
.manualTrackingStrategy();
});
return factory;
}
@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
...
context.storeOffset();
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue1")
.stream()
.build();
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue2")
.stream()
.build();
}
版本 2.4.5 為 StreamListenerContainer(及其工廠)添加了 adviceChain 屬性。還提供了一個新的工廠 Bean,用於建立一個無狀態重試攔截器,並帶有一個可選的 StreamMessageRecoverer,用於消費原始流訊息。
@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
StreamRetryOperationsInterceptorFactoryBean rfb =
new StreamRetryOperationsInterceptorFactoryBean();
rfb.setRetryOperations(retryTemplate);
rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
...
});
return rfb;
}
| 此容器不支援有狀態重試。 |
超級流
超級流是一個分割槽流的抽象概念,透過將多個流佇列繫結到具有引數 x-super-stream: true 的交換機來實現。
配置
為方便起見,可以透過定義一個 SuperStream 型別的 Bean 來配置超級流。
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3);
}
RabbitAdmin 會檢測此 Bean 並宣告交換機 (my.super.stream) 和 3 個佇列(分割槽)- my.super-stream-n,其中 n 為 0、1、2,並繫結路由鍵等於 n。
如果您還希望透過 AMQP 釋出到交換機,則可以提供自定義路由鍵:
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
.mapToObj(j -> "rk-" + j)
.collect(Collectors.toList()));
}
鍵的數量必須等於分割槽的數量。
生產到超級流
您必須向 RabbitStreamTemplate 新增 superStreamRoutingFunction。
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
template.setSuperStreamRouting(message -> {
// some logic to return a String for the client's hashing algorithm
});
return template;
}
您還可以使用 RabbitTemplate 透過 AMQP 釋出。
使用單個活躍消費者消費超級流
在監聽器容器上呼叫 superStream 方法,以在超級流上啟用單個活躍消費者。
@Bean
StreamListenerContainer container(Environment env, String name) {
StreamListenerContainer container = new StreamListenerContainer(env);
container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
container.setupMessageListener(msg -> {
...
});
container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
return container;
}
目前,當併發數大於 1 時,實際併發數由 Environment 進一步控制;要實現完全併發,請將環境的 maxConsumersByConnection 設定為 1。請參閱 配置環境。 |
Micrometer 觀測
從版本 3.0.5 開始,RabbitStreamTemplate 和流監聽器容器支援使用 Micrometer 進行觀測。容器現在還支援 Micrometer 計時器(當未啟用觀測時)。
在每個元件上設定 observationEnabled 以啟用觀測;這將停用 Micrometer 計時器,因為計時器現在將透過每個觀測進行管理。使用註解監聽器時,在容器工廠上設定 observationEnabled。
有關更多資訊,請參閱 Micrometer Tracing。
要為計時器/跟蹤新增標籤,請分別為模板或監聽器容器配置自定義的 RabbitStreamTemplateObservationConvention 或 RabbitStreamListenerObservationConvention。
預設實現為模板觀測新增 name 標籤,為容器新增 listener.id 標籤。
您可以子類化 DefaultRabbitStreamTemplateObservationConvention 或 DefaultStreamRabbitListenerObservationConvention,或者提供全新的實現。
有關更多詳細資訊,請參閱 Micrometer 觀測文件。