RabbitMQ AMQP 1.0 支援
版本 4.0 引入了 spring-rabbitmq-client 模組,用於 RabbitMQ 上的 AMQP 1.0 協議支援。
此 artifact 基於 com.rabbitmq.client:amqp-client 庫,因此只能與 RabbitMQ 及其 AMQP 1.0 協議支援一起使用。它不能用於任何任意 AMQP 1.0 代理。為此,目前建議使用 JMS 橋接器 和相應的 Spring JMS 整合。
必須將此依賴項新增到專案中才能與 RabbitMQ AMQP 1.0 支援進行互動。
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbitmq-client</artifactId>
<version>4.0.0</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbitmq-client:4.0.0'
spring-rabbit(用於 AMQP 0.9.1 協議)作為傳遞依賴項引入,以重用此新客戶端中的一些通用 API,例如異常、@RabbitListener 支援。在目標專案中無需同時使用這兩個功能,但 RabbitMQ 允許 AMQP 0.9.1 和 1.0 共存。
有關 RabbitMQ AMQP 1.0 Java 客戶端的更多資訊,請參閱其文件。
RabbitMQ AMQP 1.0 環境
com.rabbitmq.client.amqp.Environment 是專案中必須新增的第一項,用於連線管理和其他通用設定。它是節點或節點叢集的入口點。環境允許建立連線。它可以包含連線之間共享的基礎設施相關配置設定,例如執行緒池、指標和/或觀察。
@Bean
Environment environment() {
return new AmqpEnvironmentBuilder()
.connectionSettings()
.port(5672)
.environmentBuilder()
.build();
}
相同的 Environment 例項可用於連線不同的 RabbitMQ 代理,然後必須在特定連線上提供連線設定。請參閱下文。
AMQP 連線工廠
引入了 org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory 抽象來管理 com.rabbitmq.client.amqp.Connection。不要將其與僅用於 AMQP 0.9.1 協議的 org.springframework.amqp.rabbit.connection.ConnectionFactory 混淆。SingleAmqpConnectionFactory 實現用於管理一個連線及其設定。相同的 Connection 可以由許多生產者、消費者和管理共享。AMQP 1.0 協議實現在 AMQP 客戶端庫內部透過連結抽象處理多路複用。Connection 具有恢復能力,並且還處理拓撲。
在大多數情況下,只需將此 bean 新增到專案中即可
@Bean
AmqpConnectionFactory connectionFactory(Environment environment) {
return new SingleAmqpConnectionFactory(environment);
}
請參閱 SingleAmqpConnectionFactory 的 setter 方法,瞭解所有連線特定的設定。
RabbitMQ 拓撲管理
從應用程式角度來看的拓撲管理(交換機、佇列和繫結),提供了 RabbitAmqpAdmin,它是現有 AmqpAdmin 介面的實現。
@Bean
RabbitAmqpAdmin admin(AmqpConnectionFactory connectionFactory) {
return new RabbitAmqpAdmin(connectionFactory);
}
與配置代理中描述的 Exchange、Queue、Binding 和 Declarables 例項的相同 bean 定義必須用於管理拓撲。spring-rabbit 中的 RabbitAdmin 也可以做到這一點,但它是在 AMQP 0.9.1 連線上進行的,而 RabbitAmqpAdmin 是基於 AMQP 1.0 連線的,因此可以從中平穩地處理拓撲恢復,以及釋出者和消費者恢復。
RabbitAmqpAdmin 在其 start() 生命週期回撥中執行相應的 bean 掃描。initialize() 以及所有其他 RabbitMQ 實體管理方法都可以在執行時手動呼叫。在內部,RabbitAmqpAdmin 使用 com.rabbitmq.client.amqp.Connection.management() API 執行相應的拓撲操作。
RabbitAmqpTemplate
RabbitAmqpTemplate 是 AsyncAmqpTemplate 的實現,並使用 AMQP 1.0 協議執行各種傳送/接收操作。它需要一個 AmqpConnectionFactory,並且可以配置一些預設值。即使 com.rabbitmq.client:amqp-client 庫帶有 com.rabbitmq.client.amqp.Message,RabbitAmqpTemplate 仍然公開基於眾所周知的 org.springframework.amqp.core.Message 的 API,以及所有支援類,如 MessageProperties 和 MessageConverter 抽象。向/從 com.rabbitmq.client.amqp.Message 的轉換在 RabbitAmqpTemplate 內部完成。所有方法都返回一個 CompletableFuture 以最終獲取操作結果。普通物件的操作需要訊息體轉換,預設使用 SimpleMessageConverter。有關轉換的更多資訊,請參閱訊息轉換器。
通常,只需一個這樣的 bean 即可執行所有可能的模板模式操作
@Bean
RabbitAmqpTemplate rabbitTemplate(AmqpConnectionFactory connectionFactory) {
return new RabbitAmqpTemplate(connectionFactory);
}
它可以配置一些預設的交換機和路由鍵或只配置佇列。RabbitAmqpTemplate 有一個用於接收操作的預設佇列,以及另一個用於請求-回覆操作的預設佇列,其中如果客戶端不存在,則會為請求建立臨時佇列。
以下是 RabbitAmqpTemplate 操作的一些示例
@Bean
DirectExchange e1() {
return new DirectExchange("e1");
}
@Bean
Queue q1() {
return QueueBuilder.durable("q1").deadLetterExchange("dlx1").build();
}
@Bean
Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
}
...
@Test
void defaultExchangeAndRoutingKey() {
this.rabbitAmqpTemplate.setExchange("e1");
this.rabbitAmqpTemplate.setRoutingKey("k1");
this.rabbitAmqpTemplate.setReceiveQueue("q1");
assertThat(this.rabbitAmqpTemplate.convertAndSend("test1"))
.succeedsWithin(Duration.ofSeconds(10));
assertThat(this.rabbitAmqpTemplate.receiveAndConvert())
.succeedsWithin(Duration.ofSeconds(10))
.isEqualTo("test1");
}
這裡我們聲明瞭一個 e1 交換機、q1 佇列,並將其與 k1 路由鍵繫結到該交換機。然後我們使用 RabbitAmqpTemplate 的預設設定將訊息釋出到上述交換機,並使用相應的路由鍵,並將 q1 作為接收操作的預設佇列。這些方法有過載變體,可以傳送到特定的交換機或佇列(用於傳送和接收)。使用 ParameterizedTypeReference<T> 的 receiveAndConvert() 操作需要將 SmartMessageConverter 注入到 RabbitAmqpTemplate 中。
下一個示例演示了使用 RabbitAmqpTemplate 的 RPC 實現(假設與上一個示例相同的 RabbitMQ 物件)
@Test
void verifyRpc() {
String testRequest = "rpc-request";
String testReply = "rpc-reply";
CompletableFuture<Object> rpcClientResult = this.template.convertSendAndReceive("e1", "k1", testRequest);
AtomicReference<String> receivedRequest = new AtomicReference<>();
CompletableFuture<Boolean> rpcServerResult =
this.rabbitAmqpTemplate.<String, String>receiveAndReply("q1",
payload -> {
receivedRequest.set(payload);
return testReply;
});
assertThat(rpcServerResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(true);
assertThat(rpcClientResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(testReply);
assertThat(receivedRequest.get()).isEqualTo(testRequest);
}
關聯和 replyTo 佇列在內部管理。伺服器端可以透過下面描述的 @RabbitListener POJO 方法實現。
RabbitMQ AMQP 1.0 消費者
與許多其他用於消費者端的訊息實現一樣,spring-rabbitmq-client 模組附帶 RabbitAmqpListenerContainer,它本質上是眾所周知的 MessageListenerContainer 的實現。它的功能與 DirectMessageListenerContainer 完全相同,但適用於 RabbitMQ AMQP 1.0 支援。它需要一個 AmqpConnectionFactory 和至少一個要消費的佇列。此外,必須提供 MessageListener(或 AMQP 1.0 特定的 RabbitAmqpMessageListener)。它可以配置 autoSettle = false,其含義是 AcknowledgeMode.MANUAL。在這種情況下,提供給 MessageListener 的 Message 在其 MessageProperties 中包含一個 AmqpAcknowledgment 回撥,以供目標邏輯考慮。
RabbitAmqpMessageListener 具有 com.rabbitmq.client:amqp-client 抽象的契約
/**
* Process an AMQP message.
* @param message the message to process.
* @param context the consumer context to settle message.
* Null if container is configured for {@code autoSettle}.
*/
void onAmqpMessage(Message message, Consumer.Context context);
其中第一個引數是原生接收到的 com.rabbitmq.client.amqp.Message,context 是訊息結算的原生回撥,類似於上面提到的 AmqpAcknowledgment 抽象。
當提供了 batchSize 選項時,RabbitAmqpMessageListener 可以批次處理和結算訊息。為此,必須實現 MessageListener.onMessageBatch() 契約。batchReceiveDuration 選項用於安排強制釋放未滿批次,以避免記憶體和消費者信用額度耗盡。
通常,RabbitAmqpMessageListener 類不直接在目標專案中使用,而是選擇透過 @RabbitListener 進行 POJO 方法註釋配置,用於宣告式消費者配置。RabbitAmqpListenerContainerFactory 必須在 RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME 下注冊,並且 @RabbitListener 註釋過程將在 RabbitListenerEndpointRegistry 中註冊 RabbitAmqpMessageListener 例項。目標 POJO 方法呼叫由特定的 RabbitAmqpMessageListenerAdapter 實現處理,該實現擴充套件了 MessagingMessageListenerAdapter 並重用了其許多功能,包括請求-回覆場景(非同步或非非同步)。因此,註解驅動的偵聽器端點中描述的所有概念也適用於此 RabbitAmqpMessageListener。
除了傳統的 messaging payload 和 headers,@RabbitListener POJO 方法契約還可以包含這些引數
-
com.rabbitmq.client.amqp.Message- 未經任何轉換的原生 AMQP 1.0 訊息; -
org.springframework.amqp.core.Message- Spring AMQP 訊息抽象,作為原生 AMQP 1.0 訊息的轉換結果; -
org.springframework.messaging.Message- Spring Messaging 抽象,作為 Spring AMQP 訊息的轉換結果; -
Consumer.Context- RabbitMQ AMQP 客戶端消費者結算 API; -
org.springframework.amqp.core.AmqpAcknowledgment- Spring AMQP 確認抽象:委託給Consumer.Context。
以下示例演示了一個簡單的 @RabbitListener,用於 RabbitMQ AMQP 1.0 互動,並進行手動結算
@Bean(RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
RabbitAmqpListenerContainerFactory rabbitAmqpListenerContainerFactory(AmqpConnectionFactory connectionFactory) {
return new RabbitAmqpListenerContainerFactory(connectionFactory);
}
final List<String> received = Collections.synchronizedList(new ArrayList<>());
CountDownLatch consumeIsDone = new CountDownLatch(11);
@RabbitListener(queues = {"q1", "q2"},
ackMode = "#{T(org.springframework.amqp.core.AcknowledgeMode).MANUAL}",
concurrency = "2",
id = "testAmqpListener")
void processQ1AndQ2Data(String data, AmqpAcknowledgment acknowledgment, Consumer.Context context) {
try {
if ("discard".equals(data)) {
if (!this.received.contains(data)) {
context.discard();
}
else {
throw new MessageConversionException("Test message is rejected");
}
}
else if ("requeue".equals(data) && !this.received.contains(data)) {
acknowledgment.acknowledge(AmqpAcknowledgment.Status.REQUEUE);
}
else {
acknowledgment.acknowledge();
}
this.received.add(data);
}
finally {
this.consumeIsDone.countDown();
}
}