AmqpTemplate
與 Spring Framework 和相關專案提供的許多其他高階抽象一樣,Spring AMQP 提供了一個起核心作用的“模板”。定義主要操作的介面稱為 AmqpTemplate
。這些操作涵蓋了傳送和接收訊息的通用行為。換句話說,它們不是任何特定實現的獨有功能——因此名稱中帶有“AMQP”。另一方面,該介面的實現與 AMQP 協議的實現相關聯。與 JMS 不同,JMS 本身是介面級別的 API,AMQP 是線級別協議。該協議的實現提供了自己的客戶端庫,因此模板介面的每個實現都依賴於特定的客戶端庫。目前,只有一個實現:RabbitTemplate
。在隨後的示例中,我們經常使用 AmqpTemplate
。然而,當您檢視配置示例或任何例項化模板或呼叫 setter 的程式碼片段時,您會看到實現的型別(例如,RabbitTemplate
)。
新增重試功能
從 1.3 版本開始,您現在可以配置 RabbitTemplate
使用 RetryTemplate
來幫助處理與 Broker 連線相關的問題。有關完整資訊,請參閱 spring-retry 專案。以下僅是一個示例,它使用指數回退策略和預設的 SimpleRetryPolicy
,在向呼叫者丟擲異常之前會嘗試三次。
以下示例使用 XML 名稱空間
<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>
以下示例在 Java 中使用 @Configuration
註解
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}
從 1.4 版本開始,除了 retryTemplate
屬性外,RabbitTemplate
還支援 recoveryCallback
選項。它被用作 RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback)
的第二個引數。
RecoveryCallback 有一定的限制,因為它只包含重試上下文中的 lastThrowable 欄位。對於更復雜的用例,您應該使用外部的 RetryTemplate ,以便透過上下文屬性向 RecoveryCallback 傳遞額外資訊。以下示例展示瞭如何實現: |
retryTemplate.execute(
new RetryCallback<Object, Exception>() {
@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute("message", message);
return rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}, new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
Object message = context.getAttribute("message");
Throwable t = context.getLastThrowable();
// Do something with message
return null;
}
});
}
在這種情況下,您不會將 RetryTemplate
注入到 RabbitTemplate
中。
釋出是非同步的 — 如何檢測成功和失敗
釋出訊息是一種非同步機制,預設情況下,無法路由的訊息會被 RabbitMQ 丟棄。對於成功的釋出,您可以收到非同步確認,如關聯的釋出者確認和返回中所述。考慮兩種失敗場景:
-
釋出到一個 Exchange,但沒有匹配的目的地佇列。
-
釋出到一個不存在的 Exchange。
第一種情況由釋出者返回涵蓋,如關聯的釋出者確認和返回中所述。
對於第二種情況,訊息會被丟棄並且不生成返回。底層 Channel 會因異常而關閉。預設情況下,此異常會被記錄日誌,但您可以透過向 CachingConnectionFactory
註冊一個 ChannelListener
來獲取此類事件的通知。以下示例展示瞭如何新增 ConnectionListener
:
this.connectionFactory.addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
}
@Override
public void onShutDown(ShutdownSignalException signal) {
...
}
});
您可以檢查 signal 的 reason
屬性以確定發生的問題。
要在傳送執行緒上檢測異常,您可以在 RabbitTemplate
上設定 setChannelTransacted(true)
,異常將在 txCommit()
時檢測到。然而,事務會顯著降低效能,因此在僅為此一個用例啟用事務之前請仔細考慮。
關聯的釋出者確認和返回
AmqpTemplate
的 RabbitTemplate
實現支援釋出者確認和返回。
對於返回的訊息,模板的 mandatory
屬性必須設定為 true
,或者針對特定訊息的 mandatory-expression
必須評估為 true
。此功能需要一個將其 publisherReturns
屬性設定為 true
的 CachingConnectionFactory
(參見釋出者確認和返回)。透過呼叫 setReturnsCallback(ReturnsCallback callback)
並註冊 RabbitTemplate.ReturnsCallback
,返回會發送給客戶端。該回調必須實現以下方法:
void returnedMessage(ReturnedMessage returned);
ReturnedMessage
包含以下屬性:
-
message
- 返回的訊息本身 -
replyCode
- 指示返回原因的程式碼 -
replyText
- 返回的文字原因 - 例如NO_ROUTE
-
exchange
- 訊息傳送到的 Exchange -
routingKey
- 使用的 Routing Key
每個 RabbitTemplate
只支援一個 ReturnsCallback
。另請參見回覆超時。
對於釋出者確認(也稱為釋出者 ACK),模板需要一個將其 publisherConfirm
屬性設定為 ConfirmType.CORRELATED
的 CachingConnectionFactory
。透過呼叫 setConfirmCallback(ConfirmCallback callback)
並註冊 RabbitTemplate.ConfirmCallback
,確認會發送給客戶端。該回調必須實現此方法:
void confirm(CorrelationData correlationData, boolean ack, String cause);
CorrelationData
是客戶端傳送原始訊息時提供的一個物件。ack
對於 ack
為 true,對於 nack
為 false。對於 nack
例項,如果生成 nack
時原因可用,則 cause 可能包含 nack
的原因。一個例子是傳送訊息到不存在的 Exchange。在這種情況下,Broker 會關閉 Channel。關閉的原因包含在 cause
中。cause
是在 1.4 版本中新增的。
每個 RabbitTemplate
只支援一個 ConfirmCallback
。
當 Rabbit Template 的傳送操作完成時,Channel 會被關閉。當連線工廠快取已滿時(快取中有空間時,Channel 不會被物理關閉,返回和確認正常進行),這將阻止接收確認或返回。當快取已滿時,框架會延遲關閉最多五秒鐘,以便有時間接收確認和返回。使用確認時,當收到最後一個確認後 Channel 會被關閉。僅使用返回時,Channel 會保持開啟狀態長達五秒鐘。我們通常建議將連線工廠的 channelCacheSize 設定為一個足夠大的值,以便釋出訊息的 Channel 返回到快取中,而不是被關閉。您可以使用 RabbitMQ 管理外掛監控 Channel 使用情況。如果您看到 Channel 快速開啟和關閉,應考慮增加快取大小以減少伺服器開銷。 |
在 2.1 版本之前,啟用釋出者確認的 Channel 會在收到確認之前返回到快取。其他一些程序可能會取出該 Channel 並執行導致 Channel 關閉的操作,例如向不存在的 Exchange 釋出訊息。這可能導致確認丟失。2.1 及更高版本不再在確認未完成時將 Channel 返回到快取。RabbitTemplate 在每次操作後對 Channel 執行邏輯 close() 。一般來說,這意味著一個 Channel 上一次只有一個未完成的確認。 |
從 2.2 版本開始,回撥會在連線工廠的 executor 執行緒之一上呼叫。這是為了避免在回撥內執行 Rabbit 操作時可能發生的死鎖。在以前的版本中,回撥直接在 amqp-client 連線 I/O 執行緒上呼叫;如果您執行某些 RPC 操作(例如開啟一個新 Channel),這將導致死鎖,因為 I/O 執行緒會阻塞等待結果,但結果需要由 I/O 執行緒本身處理。在那些版本中,需要在回撥內部將工作(例如傳送訊息)交給另一個執行緒。現在不再需要這樣做,因為框架現在將回調呼叫交給執行器處理。 |
只要返回回撥在 60 秒或更短時間內執行完成,即可保證在收到 ACK 之前收到返回的訊息。確認計劃在返回回撥退出後或 60 秒後交付,以先到者為準。 |
CorrelationData
物件有一個 CompletableFuture
,您可以使用它來獲取結果,而不是在模板上使用 ConfirmCallback
。以下示例展示瞭如何配置 CorrelationData
例項:
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...
由於它是一個 CompletableFuture<Confirm>
,您可以在就緒時 get()
獲取結果,或使用 whenComplete()
進行非同步回撥。Confirm
物件是一個簡單的 bean,具有兩個屬性:ack
和 reason
(針對 nack
例項)。Broker 生成的 nack
例項不會填充 reason。框架生成的 nack
例項(例如,在 ACK 未完成時關閉連線)會填充 reason。
此外,當同時啟用確認和返回時,如果訊息無法路由到任何佇列,CorrelationData
的 return
屬性將填充返回的訊息。保證在 future 設定 ack
之前,返回訊息屬性已被設定。CorrelationData.getReturn()
返回一個包含以下屬性的 ReturnMessage
:
-
message (返回的訊息)
-
replyCode
-
replyText
-
exchange
-
routingKey
另請參見範圍操作,瞭解一種更簡單的方式等待發布者確認。
範圍操作
通常,使用模板時,會從快取中取出(或建立)一個 Channel
,用於執行操作,然後返回到快取中以便重用。在多執行緒環境中,無法保證下一個操作使用同一個 Channel。然而,有時您可能希望對 Channel 的使用有更多控制,並確保多個操作都在同一個 Channel 上執行。
從 2.0 版本開始,提供了一個名為 invoke
的新方法,帶有一個 OperationsCallback
。在回撥範圍內並使用提供的 RabbitOperations
引數執行的任何操作都將使用相同的專用 Channel
,該 Channel 最後會被關閉(而不是返回到快取)。如果 Channel 是 PublisherCallbackChannel
,它會在收到所有確認後返回到快取(參見關聯的釋出者確認和返回)。
@FunctionalInterface
public interface OperationsCallback<T> {
T doInRabbit(RabbitOperations operations);
}
您可能需要此功能的一個例子是,如果您希望使用底層 Channel
上的 waitForConfirms()
方法。如前所述,Spring API 以前沒有公開此方法,因為 Channel 通常是快取和共享的。RabbitTemplate
現在提供了 waitForConfirms(long timeout)
和 waitForConfirmsOrDie(long timeout)
,它們委託給在 OperationsCallback
範圍內使用的專用 Channel。出於顯而易見的原因,這些方法不能在該範圍之外使用。
請注意,一個更高階的抽象,允許您將確認與請求關聯,已在其他地方提供(參見關聯的釋出者確認和返回)。如果您只想等待 Broker 確認交付,可以使用以下示例所示的技術:
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
});
如果您希望 RabbitAdmin
操作在 OperationsCallback
範圍內的同一個 Channel 上呼叫,則必須使用用於 invoke
操作的同一個 RabbitTemplate
構建 Admin。
如果模板操作已在現有事務的範圍內執行(例如,在事務性監聽器容器執行緒上執行並在事務性模板上執行操作),則前面的討論就沒有意義了。在這種情況下,操作會在該 Channel 上執行,並在執行緒返回到容器時提交。在這種場景下,沒有必要使用 invoke 。 |
以這種方式使用確認時,為將確認與請求關聯而設定的大部分基礎設施實際上是不需要的(除非也啟用了返回)。從 2.2 版本開始,連線工廠支援一個名為 publisherConfirmType
的新屬性。當此屬性設定為 ConfirmType.SIMPLE
時,可以避免使用該基礎設施,從而使確認處理更加高效。
此外,RabbitTemplate
會在傳送訊息的 MessageProperties
中設定 publisherSequenceNumber
屬性。如果您希望檢查(或記錄或以其他方式使用)特定的確認,可以使用過載的 invoke
方法,如下例所示:
public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
com.rabbitmq.client.ConfirmCallback nacks);
這些 ConfirmCallback 物件(針對 ack 和 nack 例項)是 Rabbit 客戶端的回撥,而不是模板的回撥。 |
以下示例記錄 ack
和 nack
例項:
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
}, (tag, multiple) -> {
log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
log.info("Nack: " + tag + ":" + multiple);
}));
範圍操作繫結到執行緒。有關多執行緒環境中嚴格順序的討論,請參見多執行緒環境中的嚴格訊息順序。 |
多執行緒環境中的嚴格訊息順序
範圍操作中的討論僅適用於在同一執行緒上執行操作的情況。
考慮以下情況:
-
thread-1
將訊息傳送到佇列,並將工作移交給thread-2
-
thread-2
將訊息傳送到同一個佇列
由於 RabbitMQ 的非同步特性和快取 Channel 的使用,無法確定是否會使用同一個 Channel,因此訊息到達佇列的順序無法保證。(大多數情況下它們會按順序到達,但亂序交付的機率不為零)。為解決此用例,您可以使用大小為 1
的有界 Channel 快取(以及 channelCheckoutTimeout
)以確保訊息始終在同一個 Channel 上釋出,從而保證順序。為此,如果您有連線工廠的其他用途(例如消費者),您應該要麼為模板使用專用的連線工廠,要麼配置模板使用主連線工廠中嵌入的釋出者連線工廠(參見使用獨立的連線)。
用一個簡單的 Spring Boot 應用可以最好地說明這一點:
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
publisherCF.setChannelCacheSize(1);
publisherCF.setChannelCheckoutTimeout(1000L);
return ccf;
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
Service(RabbitTemplate template, TaskExecutor exec) {
template.setUsePublisherConnection(true);
this.template = template;
this.exec = exec;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
}
void secondaryService(String toSend) {
LOG.info("Publishing from secondary service");
this.template.convertAndSend("queue", toSend);
}
}
即使釋出操作在兩個不同的執行緒上執行,它們都將使用同一個 Channel,因為快取被限制為單個 Channel。
從 2.3.7 版本開始,ThreadChannelConnectionFactory
支援使用 prepareContextSwitch
和 switchContext
方法將執行緒的 Channel 轉移到另一個執行緒。第一個方法返回一個上下文,該上下文傳遞給呼叫第二個方法的第二個執行緒。一個執行緒可以繫結一個非事務性 Channel 或一個事務性 Channel(或各一個);您無法單獨轉移它們,除非您使用兩個連線工廠。示例如下:
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
ThreadChannelConnectionFactory tccf() {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
private final ThreadChannelConnectionFactory connFactory;
Service(RabbitTemplate template, TaskExecutor exec,
ThreadChannelConnectionFactory tccf) {
this.template = template;
this.exec = exec;
this.connFactory = tccf;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
Object context = this.connFactory.prepareSwitchContext();
this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
}
void secondaryService(String toSend, Object threadContext) {
LOG.info("Publishing from secondary service");
this.connFactory.switchContext(threadContext);
this.template.convertAndSend("queue", toSend);
this.connFactory.closeThreadChannel();
}
}
呼叫 prepareSwitchContext 後,如果當前執行緒再執行任何操作,它們將在新的 Channel 上執行。不再需要時關閉執行緒繫結的 Channel 非常重要。 |
訊息整合
從 1.4 版本開始,RabbitMessagingTemplate
(構建在 RabbitTemplate
之上)提供了與 Spring Framework 訊息抽象的整合——即 org.springframework.messaging.Message
。這允許您使用 spring-messaging
的 Message<?>
抽象來發送和接收訊息。此抽象被其他 Spring 專案使用,例如 Spring Integration 和 Spring 的 STOMP 支援。涉及兩種訊息轉換器:一種用於在 spring-messaging 的 Message<?>
抽象和 Spring AMQP 的 Message
抽象之間進行轉換,另一種用於在 Spring AMQP 的 Message
抽象和底層 RabbitMQ 客戶端庫所需的格式之間進行轉換。預設情況下,訊息負載由提供的 RabbitTemplate
例項的訊息轉換器轉換。或者,您可以注入一個使用其他負載轉換器的自定義 MessagingMessageConverter
,如下例所示:
MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);
驗證使用者 ID
從 1.6 版本開始,模板現在支援 user-id-expression
(使用 Java 配置時為 userIdExpression
)。如果傳送訊息,則在評估此表示式後設置(如果尚未設定)使用者 ID 屬性。評估的根物件是要傳送的訊息。
以下示例展示瞭如何使用 user-id-expression
屬性:
<rabbit:template ... user-id-expression="'guest'" />
<rabbit:template ... user-id-expression="@myConnectionFactory.username" />
第一個示例是字面表示式。第二個示例從應用上下文中的連線工廠 bean 獲取 username
屬性。
使用獨立的連線
從 2.0.2 版本開始,您可以將 usePublisherConnection
屬性設定為 true
,以便在可能的情況下使用與監聽器容器所用連線不同的連線。這是為了避免當生產者因任何原因阻塞時,消費者也被阻塞。連線工廠為此目的維護第二個內部連線工廠;預設情況下,它與主工廠型別相同,但如果希望為釋出使用不同的工廠型別,可以顯式設定。如果 Rabbit Template 在由監聽器容器啟動的事務中執行,無論此設定如何,都將使用容器的 Channel。
通常,您不應將 RabbitAdmin 與此屬性設定為 true 的模板一起使用。使用接受連線工廠的 RabbitAdmin 建構函式。如果您使用接受模板的另一個建構函式,請確保模板的此屬性為 false 。這是因為,Admin 通常用於為監聽器容器宣告佇列。使用將此屬性設定為 true 的模板將意味著獨佔佇列(例如 AnonymousQueue )將在與監聽器容器使用的連線不同的連線上宣告。在這種情況下,容器無法使用這些佇列。 |