AmqpTemplate
與Spring Framework及相關專案提供的許多其他高階抽象一樣,Spring AMQP也提供了一個扮演核心角色的“模板”。定義主要操作的介面稱為AmqpTemplate。這些操作涵蓋了傳送和接收訊息的一般行為。換句話說,它們並非任何實現所獨有——因此名稱中帶有“AMQP”。另一方面,該介面的實現與AMQP協議的實現緊密相關。與JMS本身是一個介面級API不同,AMQP是一個線級協議。該協議的實現提供了自己的客戶端庫,因此模板介面的每個實現都依賴於特定的客戶端庫。目前,只有一個實現:RabbitTemplate。在接下來的示例中,我們經常使用AmqpTemplate。但是,當您檢視配置示例或任何例項化模板或呼叫setter的程式碼片段時,您可以看到實現型別(例如,RabbitTemplate)。
另請參閱非同步Rabbit模板。
新增重試功能
從1.3版本開始,您可以配置RabbitTemplate使用RetryTemplate來幫助處理代理連線問題。有關更多資訊,請參閱Spring Framework中的核心重試支援。以下只是一個示例,它使用了指數退避策略和預設的SimpleRetryPolicy,該策略在將異常拋給呼叫者之前進行三次嘗試。
以下示例在Java中使用了@Configuration註解。
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryPolicy retryPolicy = RetryPolicy.builder()
.delay(Duration.ofMillis(500))
.multiplier(2.0)
.maxDelay(Duration.ofSeconds(10))
.build();
template.setRetryTemplate(new RetryTemplate(retryPolicy));
return template;
}
從1.4版本開始,除了retryTemplate屬性之外,RabbitTemplate還支援recoveryCallback選項。它作為RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback)的第二個引數使用。
RecoveryCallback有些受限,因為重試上下文只包含lastThrowable欄位。對於更復雜的用例,您應該使用外部RetryTemplate,以便透過上下文的屬性將額外資訊傳遞給RecoveryCallback。以下示例展示瞭如何實現: |
retryTemplate.execute(
new RetryCallback<Object, Exception>() {
@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute("message", message);
return rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}, new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
Object message = context.getAttribute("message");
Throwable t = context.getLastThrowable();
// Do something with message
return null;
}
});
}
在這種情況下,您將不會向RabbitTemplate注入RetryTemplate。
釋出是非同步的——如何檢測成功和失敗
釋出訊息是一種非同步機制,預設情況下,無法路由的訊息會被RabbitMQ丟棄。對於成功的釋出,您可以收到非同步確認,如關聯釋出者確認和返回中所述。考慮兩種失敗場景:
-
釋出到交換機但沒有匹配的目標佇列。
-
釋出到不存在的交換機。
第一種情況由釋出者返回覆蓋,如關聯釋出者確認和返回中所述。
對於第二種情況,訊息被丟棄,不會生成返回。底層通道會因異常而關閉。預設情況下,此異常會被記錄,但您可以透過向CachingConnectionFactory註冊ChannelListener來獲取此類事件的通知。以下示例展示瞭如何新增ConnectionListener:
this.connectionFactory.addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
}
@Override
public void onShutDown(ShutdownSignalException signal) {
...
}
});
您可以檢查訊號的reason屬性以確定發生的問題。
要在傳送執行緒上檢測異常,您可以在RabbitTemplate上設定setChannelTransacted(true),異常將在txCommit()上被檢測到。但是,事務會顯著降低效能,因此在僅為此用例啟用事務之前,請仔細考慮這一點。
關聯釋出者確認和返回
AmqpTemplate的RabbitTemplate實現支援釋出者確認和返回。
對於返回的訊息,模板的mandatory屬性必須設定為true,或者特定訊息的mandatory-expression必須評估為true。此功能需要一個CachingConnectionFactory,其publisherReturns屬性設定為true(參見釋出者確認和返回)。客戶端透過呼叫setReturnsCallback(ReturnsCallback callback)註冊RabbitTemplate.ReturnsCallback來接收返回。回撥必須實現以下方法:
void returnedMessage(ReturnedMessage returned);
ReturnedMessage具有以下屬性:
-
message- 返回的訊息本身 -
replyCode- 指示返回原因的程式碼 -
replyText- 返回的文字原因 - 例如NO_ROUTE -
exchange- 訊息傳送到的交換機 -
routingKey- 使用的路由鍵
每個RabbitTemplate只支援一個ReturnsCallback。另請參閱回覆超時。
對於釋出者確認(也稱為釋出者回執),模板需要一個CachingConnectionFactory,其publisherConfirm屬性設定為ConfirmType.CORRELATED。客戶端透過呼叫setConfirmCallback(ConfirmCallback callback)註冊RabbitTemplate.ConfirmCallback來接收確認。回撥必須實現此方法:
void confirm(CorrelationData correlationData, boolean ack, String cause);
CorrelationData是客戶端在傳送原始訊息時提供的物件。ack為ack時為true,為nack時為false。對於nack例項,如果生成nack時可用,cause可能包含nack的原因。一個例子是向不存在的交換機發送訊息。在這種情況下,代理會關閉通道。關閉的原因包含在cause中。cause是在1.4版本中新增的。
一個RabbitTemplate只支援一個ConfirmCallback。
當 Rabbit 模板傳送操作完成後,通道會關閉。這會阻止在連線工廠快取已滿時接收確認或返回(當快取中有空間時,通道不會物理關閉,返回和確認正常進行)。當快取已滿時,框架會延遲關閉長達五秒,以便有時間接收確認和返回。使用確認時,當收到最後一個確認時通道會關閉。只使用返回時,通道會保持開啟狀態整整五秒。我們通常建議將連線工廠的channelCacheSize設定為足夠大的值,以便釋出訊息的通道返回到快取而不是關閉。您可以使用 RabbitMQ 管理外掛監控通道使用情況。如果您看到通道快速開啟和關閉,您應該考慮增加快取大小以減少伺服器開銷。 |
在2.1版本之前,為釋出者確認啟用的通道在收到確認之前會返回到快取中。其他一些程序可能會檢出該通道並執行一些導致通道關閉的操作——例如將訊息釋出到不存在的交換機。這可能導致確認丟失。2.1及更高版本在確認未完成時不再將通道返回到快取。RabbitTemplate在每次操作後對通道執行邏輯close()。通常,這意味著通道上一次只有一個未完成的確認。 |
從2.2版本開始,回撥在連線工廠的一個executor執行緒上呼叫。這是為了避免如果您在回撥中執行Rabbit操作時可能發生的死鎖。在以前的版本中,回撥直接在amqp-client連線I/O執行緒上呼叫;如果您執行某些RPC操作(例如開啟新通道),這會導致死鎖,因為I/O執行緒會阻塞等待結果,但結果需要由I/O執行緒本身處理。在那些版本中,有必要將工作(例如傳送訊息)移交給回撥中的另一個執行緒。現在不再需要這樣做,因為框架現在將回調呼叫移交給執行器。 |
| 只要返回回撥在60秒或更短時間內執行,接收返回訊息早於ack的保證仍然得到維護。確認的交付安排在返回回撥退出後或60秒後(以先發生者為準)。 |
CorrelationData物件有一個CompletableFuture,您可以使用它來獲取結果,而不是在模板上使用ConfirmCallback。以下示例顯示瞭如何配置CorrelationData例項:
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...
由於它是一個CompletableFuture<Confirm>,您可以在結果準備好時呼叫get()來獲取結果,或者使用whenComplete()進行非同步回撥。Confirm物件是一個簡單的bean,包含兩個屬性:ack和reason(用於nack例項)。對於代理生成的nack例項,reason不填充。對於框架生成的nack例項(例如,在ack例項未決時關閉連線),reason會被填充。
此外,當同時啟用確認和返回時,如果訊息無法路由到任何佇列,則CorrelationData的return屬性將填充返回的訊息。保證在未來設定ack之前,返回訊息屬性已設定。CorrelationData.getReturn()返回一個帶有屬性的ReturnMessage:
-
message(返回的訊息)
-
replyCode
-
replyText
-
exchange
-
routingKey
另請參閱範圍操作,以獲取等待發布者確認的更簡單機制。
作用域操作
通常,當使用模板時,一個Channel會從快取中檢出(或建立),用於操作,然後返回到快取中以供重用。在多執行緒環境中,不能保證下一個操作會使用相同的通道。但是,有時您可能希望對通道的使用有更多的控制,並確保許多操作都在同一個通道上執行。
從2.0版本開始,提供了一個名為invoke的新方法,帶有OperationsCallback。在回撥範圍內以及在提供的RabbitOperations引數上執行的任何操作都使用相同的專用Channel,該通道將在結束時關閉(不會返回到快取中)。如果通道是PublisherCallbackChannel,則在收到所有確認後返回到快取中(參見關聯釋出者確認和返回)。
@FunctionalInterface
public interface OperationsCallback<T> {
T doInRabbit(RabbitOperations operations);
}
您可能需要這樣做的原因之一是,如果您希望在底層Channel上使用waitForConfirms()方法。由於通道通常是快取和共享的,如前所述,此方法以前未透過Spring API公開。RabbitTemplate現在提供了waitForConfirms(long timeout)和waitForConfirmsOrDie(long timeout),它們委託給在OperationsCallback範圍內使用的專用通道。出於顯而易見的原因,這些方法不能在該範圍之外使用。
請注意,其他地方提供了更高級別的抽象,允許您將確認與請求關聯起來(參見關聯釋出者確認和返回)。如果您只想等到代理確認交付,您可以使用以下示例中顯示的技術:
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
});
如果您希望RabbitAdmin操作在OperationsCallback範圍內在同一通道上呼叫,則admin必須已使用與invoke操作相同的RabbitTemplate構建。
如果模板操作已在現有事務的範圍內執行(例如,當在事務監聽器容器執行緒上執行並對事務模板執行操作時),則前面的討論就無關緊要了。在這種情況下,操作在該通道上執行,並在執行緒返回到容器時提交。在這種情況下,無需使用invoke。 |
以這種方式使用確認時,為將確認與請求關聯而設定的大部分基礎設施實際上是不需要的(除非也啟用了返回)。從2.2版本開始,連線工廠支援一個名為publisherConfirmType的新屬性。當將其設定為ConfirmType.SIMPLE時,將避免基礎設施,並且確認處理可以更高效。
此外,RabbitTemplate在傳送訊息的MessageProperties中設定publisherSequenceNumber屬性。如果您希望檢查(或記錄或以其他方式使用)特定的確認,您可以使用過載的invoke方法,如以下示例所示:
public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
com.rabbitmq.client.ConfirmCallback nacks);
這些ConfirmCallback物件(用於ack和nack例項)是Rabbit客戶端回撥,而不是模板回撥。 |
以下示例記錄了ack和nack例項:
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
}, (tag, multiple) -> {
log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
log.info("Nack: " + tag + ":" + multiple);
}));
| 作用域操作繫結到執行緒。有關多執行緒環境中嚴格排序的討論,請參閱多執行緒環境中的嚴格訊息排序。 |
多執行緒環境中的嚴格訊息排序
作用域操作中的討論僅適用於在同一執行緒上執行操作的情況。
考慮以下情況:
-
thread-1向佇列傳送訊息並將工作移交給thread-2 -
thread-2向同一個佇列傳送訊息
由於RabbitMQ的非同步特性和快取通道的使用;不確定是否會使用相同的通道,因此不能保證訊息到達佇列的順序。(在大多數情況下,它們會按順序到達,但亂序交付的機率不為零)。為了解決此用例,您可以使用大小為1的有界通道快取(以及channelCheckoutTimeout)來確保訊息始終在同一通道上釋出,並且順序將得到保證。為此,如果您有連線工廠的其他用途,例如消費者,您應該為模板使用專用連線工廠,或者配置模板以使用嵌入在主連線工廠中的釋出者連線工廠(參見使用單獨的連線)。
這最好透過一個簡單的Spring Boot應用程式來說明:
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
publisherCF.setChannelCacheSize(1);
publisherCF.setChannelCheckoutTimeout(1000L);
return ccf;
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
Service(RabbitTemplate template, TaskExecutor exec) {
template.setUsePublisherConnection(true);
this.template = template;
this.exec = exec;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
}
void secondaryService(String toSend) {
LOG.info("Publishing from secondary service");
this.template.convertAndSend("queue", toSend);
}
}
即使釋出是在兩個不同的執行緒上執行的,它們都將使用相同的通道,因為快取被限制為單個通道。
從2.3.7版本開始,ThreadChannelConnectionFactory支援使用prepareContextSwitch和switchContext方法將執行緒的通道傳輸到另一個執行緒。第一個方法返回一個上下文,該上下文傳遞給呼叫第二個方法的第二個執行緒。一個執行緒可以繫結一個非事務性通道或一個事務性通道(或兩者都有);您不能單獨傳輸它們,除非您使用兩個連線工廠。示例如下:
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
ThreadChannelConnectionFactory tccf() {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
private final ThreadChannelConnectionFactory connFactory;
Service(RabbitTemplate template, TaskExecutor exec,
ThreadChannelConnectionFactory tccf) {
this.template = template;
this.exec = exec;
this.connFactory = tccf;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
Object context = this.connFactory.prepareSwitchContext();
this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
}
void secondaryService(String toSend, Object threadContext) {
LOG.info("Publishing from secondary service");
this.connFactory.switchContext(threadContext);
this.template.convertAndSend("queue", toSend);
this.connFactory.closeThreadChannel();
}
}
一旦呼叫了prepareSwitchContext,如果當前執行緒執行任何其他操作,它們將在新通道上執行。當不再需要執行緒繫結的通道時,關閉它是很重要的。 |
訊息整合
從1.4版本開始,RabbitMessagingTemplate(基於RabbitTemplate構建)提供了與Spring Framework訊息抽象(即org.springframework.messaging.Message)的整合。這允許您使用spring-messaging的Message<?>抽象來發送和接收訊息。此抽象被其他Spring專案使用,例如Spring Integration和Spring的STOMP支援。涉及兩個訊息轉換器:一個用於在spring-messaging的Message<?>和Spring AMQP的Message抽象之間進行轉換,另一個用於在Spring AMQP的Message抽象和底層RabbitMQ客戶端庫所需的格式之間進行轉換。預設情況下,訊息負載由提供的RabbitTemplate例項的訊息轉換器進行轉換。或者,您可以注入一個帶有其他負載轉換器的自定義MessagingMessageConverter,示例如下:
MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);
驗證使用者ID
從1.6版本開始,模板現在支援user-id-expression(使用Java配置時為userIdExpression)。如果傳送訊息,則在評估此表示式後設置使用者ID屬性(如果尚未設定)。評估的根物件是要傳送的訊息。
以下示例展示瞭如何使用user-id-expression屬性:
<rabbit:template ... user-id-expression="'guest'" />
<rabbit:template ... user-id-expression="@myConnectionFactory.username" />
第一個示例是文字表達式。第二個從應用程式上下文中的連線工廠bean獲取username屬性。
使用單獨的連線
從2.0.2版本開始,您可以將usePublisherConnection屬性設定為true,以便在可能的情況下使用與監聽器容器不同的連線。這是為了避免當生產者因任何原因被阻塞時消費者也被阻塞。為此,連線工廠維護第二個內部連線工廠;預設情況下,它與主工廠型別相同,但如果您希望為釋出使用不同的工廠型別,則可以明確設定。如果rabbit模板在由監聽器容器啟動的事務中執行,則無論此設定如何,都將使用容器的通道。
通常,您不應將RabbitAdmin與將此屬性設定為true的模板一起使用。請使用接受連線工廠的RabbitAdmin建構函式。如果您使用接受模板的其他建構函式,請確保模板的屬性為false。這是因為,通常,admin用於為監聽器容器宣告佇列。使用將此屬性設定為true的模板將意味著獨佔佇列(例如AnonymousQueue)將在與監聽器容器使用的連線不同的連線上宣告。在這種情況下,容器無法使用這些佇列。 |