RabbitMQ AMQP 1.0 支援

版本 4.0 引入了 spring-rabbitmq-client 模組,用於 RabbitMQ 上的 AMQP 1.0 協議支援。

此 artifact 基於 com.rabbitmq.client:amqp-client 庫,因此只能與 RabbitMQ 及其 AMQP 1.0 協議支援一起使用。它不能用於任何任意 AMQP 1.0 代理。為此,目前建議使用 JMS 橋接器 和相應的 Spring JMS 整合。

必須將此依賴項新增到專案中才能與 RabbitMQ AMQP 1.0 支援進行互動。

  • Maven

  • Gradle

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbitmq-client</artifactId>
  <version>4.0.0</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbitmq-client:4.0.0'

spring-rabbit(用於 AMQP 0.9.1 協議)作為傳遞依賴項引入,以重用此新客戶端中的一些通用 API,例如異常、@RabbitListener 支援。在目標專案中無需同時使用這兩個功能,但 RabbitMQ 允許 AMQP 0.9.1 和 1.0 共存。

有關 RabbitMQ AMQP 1.0 Java 客戶端的更多資訊,請參閱其文件

RabbitMQ AMQP 1.0 環境

com.rabbitmq.client.amqp.Environment 是專案中必須新增的第一項,用於連線管理和其他通用設定。它是節點或節點叢集的入口點。環境允許建立連線。它可以包含連線之間共享的基礎設施相關配置設定,例如執行緒池、指標和/或觀察。

@Bean
Environment environment() {
    return new AmqpEnvironmentBuilder()
            .connectionSettings()
            .port(5672)
            .environmentBuilder()
            .build();
}

相同的 Environment 例項可用於連線不同的 RabbitMQ 代理,然後必須在特定連線上提供連線設定。請參閱下文。

AMQP 連線工廠

引入了 org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory 抽象來管理 com.rabbitmq.client.amqp.Connection。不要將其與僅用於 AMQP 0.9.1 協議的 org.springframework.amqp.rabbit.connection.ConnectionFactory 混淆。SingleAmqpConnectionFactory 實現用於管理一個連線及其設定。相同的 Connection 可以由許多生產者、消費者和管理共享。AMQP 1.0 協議實現在 AMQP 客戶端庫內部透過連結抽象處理多路複用。Connection 具有恢復能力,並且還處理拓撲。

在大多數情況下,只需將此 bean 新增到專案中即可

@Bean
AmqpConnectionFactory connectionFactory(Environment environment) {
    return new SingleAmqpConnectionFactory(environment);
}

請參閱 SingleAmqpConnectionFactory 的 setter 方法,瞭解所有連線特定的設定。

RabbitMQ 拓撲管理

從應用程式角度來看的拓撲管理(交換機、佇列和繫結),提供了 RabbitAmqpAdmin,它是現有 AmqpAdmin 介面的實現。

@Bean
RabbitAmqpAdmin admin(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpAdmin(connectionFactory);
}

配置代理中描述的 ExchangeQueueBindingDeclarables 例項的相同 bean 定義必須用於管理拓撲。spring-rabbit 中的 RabbitAdmin 也可以做到這一點,但它是在 AMQP 0.9.1 連線上進行的,而 RabbitAmqpAdmin 是基於 AMQP 1.0 連線的,因此可以從中平穩地處理拓撲恢復,以及釋出者和消費者恢復。

RabbitAmqpAdmin 在其 start() 生命週期回撥中執行相應的 bean 掃描。initialize() 以及所有其他 RabbitMQ 實體管理方法都可以在執行時手動呼叫。在內部,RabbitAmqpAdmin 使用 com.rabbitmq.client.amqp.Connection.management() API 執行相應的拓撲操作。

RabbitAmqpTemplate

RabbitAmqpTemplateAsyncAmqpTemplate 的實現,並使用 AMQP 1.0 協議執行各種傳送/接收操作。它需要一個 AmqpConnectionFactory,並且可以配置一些預設值。即使 com.rabbitmq.client:amqp-client 庫帶有 com.rabbitmq.client.amqp.MessageRabbitAmqpTemplate 仍然公開基於眾所周知的 org.springframework.amqp.core.Message 的 API,以及所有支援類,如 MessagePropertiesMessageConverter 抽象。向/從 com.rabbitmq.client.amqp.Message 的轉換在 RabbitAmqpTemplate 內部完成。所有方法都返回一個 CompletableFuture 以最終獲取操作結果。普通物件的操作需要訊息體轉換,預設使用 SimpleMessageConverter。有關轉換的更多資訊,請參閱訊息轉換器

通常,只需一個這樣的 bean 即可執行所有可能的模板模式操作

@Bean
RabbitAmqpTemplate rabbitTemplate(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpTemplate(connectionFactory);
}

它可以配置一些預設的交換機和路由鍵或只配置佇列。RabbitAmqpTemplate 有一個用於接收操作的預設佇列,以及另一個用於請求-回覆操作的預設佇列,其中如果客戶端不存在,則會為請求建立臨時佇列。

以下是 RabbitAmqpTemplate 操作的一些示例

@Bean
DirectExchange e1() {
    return new DirectExchange("e1");
}

@Bean
Queue q1() {
    return QueueBuilder.durable("q1").deadLetterExchange("dlx1").build();
}

@Bean
Binding b1() {
    return BindingBuilder.bind(q1()).to(e1()).with("k1");
}

...

@Test
void defaultExchangeAndRoutingKey() {
    this.rabbitAmqpTemplate.setExchange("e1");
    this.rabbitAmqpTemplate.setRoutingKey("k1");
	this.rabbitAmqpTemplate.setReceiveQueue("q1");

    assertThat(this.rabbitAmqpTemplate.convertAndSend("test1"))
            .succeedsWithin(Duration.ofSeconds(10));

    assertThat(this.rabbitAmqpTemplate.receiveAndConvert())
            .succeedsWithin(Duration.ofSeconds(10))
            .isEqualTo("test1");
}

這裡我們聲明瞭一個 e1 交換機、q1 佇列,並將其與 k1 路由鍵繫結到該交換機。然後我們使用 RabbitAmqpTemplate 的預設設定將訊息釋出到上述交換機,並使用相應的路由鍵,並將 q1 作為接收操作的預設佇列。這些方法有過載變體,可以傳送到特定的交換機或佇列(用於傳送和接收)。使用 ParameterizedTypeReference<T>receiveAndConvert() 操作需要將 SmartMessageConverter 注入到 RabbitAmqpTemplate 中。

下一個示例演示了使用 RabbitAmqpTemplate 的 RPC 實現(假設與上一個示例相同的 RabbitMQ 物件)

@Test
void verifyRpc() {
    String testRequest = "rpc-request";
    String testReply = "rpc-reply";

    CompletableFuture<Object> rpcClientResult = this.template.convertSendAndReceive("e1", "k1", testRequest);

    AtomicReference<String> receivedRequest = new AtomicReference<>();
    CompletableFuture<Boolean> rpcServerResult =
            this.rabbitAmqpTemplate.<String, String>receiveAndReply("q1",
                     payload -> {
                         receivedRequest.set(payload);
                         return testReply;
                     });

    assertThat(rpcServerResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(true);
    assertThat(rpcClientResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(testReply);
    assertThat(receivedRequest.get()).isEqualTo(testRequest);
}

關聯和 replyTo 佇列在內部管理。伺服器端可以透過下面描述的 @RabbitListener POJO 方法實現。

RabbitMQ AMQP 1.0 消費者

與許多其他用於消費者端的訊息實現一樣,spring-rabbitmq-client 模組附帶 RabbitAmqpListenerContainer,它本質上是眾所周知的 MessageListenerContainer 的實現。它的功能與 DirectMessageListenerContainer 完全相同,但適用於 RabbitMQ AMQP 1.0 支援。它需要一個 AmqpConnectionFactory 和至少一個要消費的佇列。此外,必須提供 MessageListener(或 AMQP 1.0 特定的 RabbitAmqpMessageListener)。它可以配置 autoSettle = false,其含義是 AcknowledgeMode.MANUAL。在這種情況下,提供給 MessageListenerMessage 在其 MessageProperties 中包含一個 AmqpAcknowledgment 回撥,以供目標邏輯考慮。

RabbitAmqpMessageListener 具有 com.rabbitmq.client:amqp-client 抽象的契約

/**
 * Process an AMQP message.
 * @param message the message to process.
 * @param context the consumer context to settle message.
 *                Null if container is configured for {@code autoSettle}.
 */
void onAmqpMessage(Message message, Consumer.Context context);

其中第一個引數是原生接收到的 com.rabbitmq.client.amqp.Messagecontext 是訊息結算的原生回撥,類似於上面提到的 AmqpAcknowledgment 抽象。

當提供了 batchSize 選項時,RabbitAmqpMessageListener 可以批次處理和結算訊息。為此,必須實現 MessageListener.onMessageBatch() 契約。batchReceiveDuration 選項用於安排強制釋放未滿批次,以避免記憶體和消費者信用額度耗盡。

通常,RabbitAmqpMessageListener 類不直接在目標專案中使用,而是選擇透過 @RabbitListener 進行 POJO 方法註釋配置,用於宣告式消費者配置。RabbitAmqpListenerContainerFactory 必須在 RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME 下注冊,並且 @RabbitListener 註釋過程將在 RabbitListenerEndpointRegistry 中註冊 RabbitAmqpMessageListener 例項。目標 POJO 方法呼叫由特定的 RabbitAmqpMessageListenerAdapter 實現處理,該實現擴充套件了 MessagingMessageListenerAdapter 並重用了其許多功能,包括請求-回覆場景(非同步或非非同步)。因此,註解驅動的偵聽器端點中描述的所有概念也適用於此 RabbitAmqpMessageListener

除了傳統的 messaging payloadheaders@RabbitListener POJO 方法契約還可以包含這些引數

  • com.rabbitmq.client.amqp.Message - 未經任何轉換的原生 AMQP 1.0 訊息;

  • org.springframework.amqp.core.Message - Spring AMQP 訊息抽象,作為原生 AMQP 1.0 訊息的轉換結果;

  • org.springframework.messaging.Message - Spring Messaging 抽象,作為 Spring AMQP 訊息的轉換結果;

  • Consumer.Context - RabbitMQ AMQP 客戶端消費者結算 API;

  • org.springframework.amqp.core.AmqpAcknowledgment - Spring AMQP 確認抽象:委託給 Consumer.Context

以下示例演示了一個簡單的 @RabbitListener,用於 RabbitMQ AMQP 1.0 互動,並進行手動結算

@Bean(RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
RabbitAmqpListenerContainerFactory rabbitAmqpListenerContainerFactory(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpListenerContainerFactory(connectionFactory);
}

final List<String> received = Collections.synchronizedList(new ArrayList<>());

CountDownLatch consumeIsDone = new CountDownLatch(11);

@RabbitListener(queues = {"q1", "q2"},
        ackMode = "#{T(org.springframework.amqp.core.AcknowledgeMode).MANUAL}",
        concurrency = "2",
        id = "testAmqpListener")
void processQ1AndQ2Data(String data, AmqpAcknowledgment acknowledgment, Consumer.Context context) {
    try {
        if ("discard".equals(data)) {
            if (!this.received.contains(data)) {
                context.discard();
            }
            else {
                throw new MessageConversionException("Test message is rejected");
            }
        }
        else if ("requeue".equals(data) && !this.received.contains(data)) {
            acknowledgment.acknowledge(AmqpAcknowledgment.Status.REQUEUE);
        }
        else {
            acknowledgment.acknowledge();
        }
        this.received.add(data);
    }
    finally {
        this.consumeIsDone.countDown();
    }
}
© . This site is unofficial and not affiliated with VMware.