使用 RabbitMQ Stream 外掛

2.4 版本初步引入了對 RabbitMQ Stream Plugin Java Client 的支援,用於 RabbitMQ Stream Plugin

  • RabbitStreamTemplate

  • StreamListenerContainer

新增 spring-rabbit-stream 依賴到您的專案

maven
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit-stream</artifactId>
  <version>3.2.5</version>
</dependency>
gradle
compile 'org.springframework.amqp:spring-rabbit-stream:3.2.5'

您可以使用一個 RabbitAdmin bean 來正常配置佇列,使用 QueueBuilder.stream() 方法來指定佇列型別。例如

@Bean
Queue stream() {
    return QueueBuilder.durable("stream.queue1")
            .stream()
            .build();
}

然而,這僅在您同時使用非流元件(例如 SimpleMessageListenerContainerDirectMessageListenerContainer)時才有效,因為 admin 在 AMQP 連線開啟時會觸發宣告定義的 bean。如果您的應用程式僅使用流元件,或者您希望使用高階流配置功能,則應配置一個 StreamAdmin 代替

@Bean
StreamAdmin streamAdmin(Environment env) {
    return new StreamAdmin(env, sc -> {
        sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
        sc.stream("stream.queue2").create();
    });
}

有關 StreamCreator 的更多資訊,請參閱 RabbitMQ 文件。

傳送訊息

RabbitStreamTemplate 提供了 RabbitTemplate (AMQP) 功能的子集。

RabbitStreamOperations
public interface RabbitStreamOperations extends AutoCloseable {

	CompletableFuture<Boolean> send(Message message);

	CompletableFuture<Boolean> convertAndSend(Object message);

	CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);

	CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);

	MessageBuilder messageBuilder();

	MessageConverter messageConverter();

	StreamMessageConverter streamMessageConverter();

	@Override
	void close() throws AmqpException;

}

RabbitStreamTemplate 實現具有以下建構函式和屬性

RabbitStreamTemplate
public RabbitStreamTemplate(Environment environment, String streamName) {
}

public void setMessageConverter(MessageConverter messageConverter) {
}

public void setStreamConverter(StreamMessageConverter streamConverter) {
}

public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}

MessageConverterconvertAndSend 方法中用於將物件轉換為 Spring AMQP Message

StreamMessageConverter 用於將 Spring AMQP Message 轉換為原生流 Message

您也可以直接傳送原生流 Message ;使用 messageBuilder() 方法可以訪問 Producer 的訊息構建器。

ProducerCustomizer 提供了一種在構建生產者之前對其進行自定義的機制。

請參考 Java 客戶端文件 關於自定義 EnvironmentProducer 的內容。

接收訊息

非同步訊息接收由 StreamListenerContainer (以及使用 @RabbitListener 時使用的 StreamRabbitListenerContainerFactory)提供。

監聽器容器需要一個 Environment 以及一個單獨的流名稱。

您可以使用經典的 MessageListener 來接收 Spring AMQP Message ,或者使用新的介面來接收原生流 Message

public interface StreamMessageListener extends MessageListener {

	void onStreamMessage(Message message, Context context);

}

請參閱 訊息監聽器容器配置 以獲取有關支援屬性的資訊。

類似於模板,容器有一個 ConsumerCustomizer 屬性。

請參考 Java 客戶端文件 關於自定義 EnvironmentConsumer 的內容。

使用 @RabbitListener 時,配置一個 StreamRabbitListenerContainerFactory ;目前,大多數 @RabbitListener 屬性(concurrency 等)會被忽略。僅支援 idqueuesautoStartupcontainerFactory。此外,queues 只能包含一個流名稱。

示例

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
    template.setProducerCustomizer((name, builder) -> builder.name("test"));
    return template;
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
    return new StreamRabbitListenerContainerFactory(env);
}

@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
    ...
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
    StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
    factory.setNativeListener(true);
    factory.setConsumerCustomizer((id, builder) -> {
        builder.name("myConsumer")
                .offset(OffsetSpecification.first())
                .manualTrackingStrategy();
    });
    return factory;
}

@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
    ...
    context.storeOffset();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue1")
            .stream()
            .build();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue2")
            .stream()
            .build();
}

2.4.5 版本向 StreamListenerContainer (及其工廠)添加了 adviceChain 屬性。還提供了一個新的工廠 bean,用於建立一個無狀態重試攔截器,該攔截器帶有一個可選的 StreamMessageRecoverer ,用於消費原始流訊息時使用。

@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
    StreamRetryOperationsInterceptorFactoryBean rfb =
            new StreamRetryOperationsInterceptorFactoryBean();
    rfb.setRetryOperations(retryTemplate);
    rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
        ...
    });
    return rfb;
}
此容器不支援有狀態重試。

超級流

超級流是一個分割槽流的抽象概念,透過將多個流佇列繫結到一個具有引數 x-super-stream: true 的交換機來實現。

配置

為方便起見,可以透過定義一個型別為 SuperStream 的 bean 來配置一個超級流。

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3);
}

RabbitAdmin 會檢測到此 bean 並宣告交換機(my.super.stream)和 3 個佇列(分割槽)—— my.super-stream-n ,其中 n012,繫結路由鍵等於 n

如果您也希望透過 AMQP 釋出到此交換機,可以提供自定義路由鍵

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
					.mapToObj(j -> "rk-" + j)
					.collect(Collectors.toList()));
}

路由鍵的數量必須與分割槽數量相等。

生產到超級流

您必須向 RabbitStreamTemplate 新增一個 superStreamRoutingFunction

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
    template.setSuperStreamRouting(message -> {
        // some logic to return a String for the client's hashing algorithm
    });
    return template;
}

您也可以使用 RabbitTemplate 透過 AMQP 釋出。

使用單活躍消費者消費超級流

在監聽器容器上呼叫 superStream 方法,以在超級流上啟用單活躍消費者。

@Bean
StreamListenerContainer container(Environment env, String name) {
    StreamListenerContainer container = new StreamListenerContainer(env);
    container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
    container.setupMessageListener(msg -> {
        ...
    });
    container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
    return container;
}
目前,當併發數大於 1 時,實際的併發數會進一步受 Environment 控制;要實現完全併發,請將 environment 的 maxConsumersByConnection 設定為 1。請參閱 配置 Environment

Micrometer 觀測

自 3.0.5 版本起,現在支援使用 Micrometer 進行觀測,包括針對 RabbitStreamTemplate 和流監聽器容器。該容器現在也支援 Micrometer 定時器(當未啟用觀測時)。

observationEnabled 設定在每個元件上以啟用觀測;這將停用 Micrometer 定時器 ,因為定時器現在會隨每次觀測進行管理。使用註解監聽器時,將 observationEnabled 設定在容器工廠上。

請參考 Micrometer Tracing 以獲取更多資訊。

要向定時器/追蹤新增標籤,請向模板或監聽器容器分別配置一個自定義的 RabbitStreamTemplateObservationConventionRabbitStreamListenerObservationConvention

預設實現會為模板觀測新增 name 標籤,併為容器新增 listener.id 標籤。

您可以繼承 DefaultRabbitStreamTemplateObservationConventionDefaultStreamRabbitListenerObservationConvention ,或者提供全新的實現。

請參閱 Micrometer 觀測文件 獲取更多詳細資訊。