AmqpTemplate

與 Spring Framework 和相關專案提供的許多其他高階抽象一樣,Spring AMQP 提供了一個起核心作用的“模板”。定義主要操作的介面稱為 AmqpTemplate。這些操作涵蓋了傳送和接收訊息的通用行為。換句話說,它們不是任何特定實現的獨有功能——因此名稱中帶有“AMQP”。另一方面,該介面的實現與 AMQP 協議的實現相關聯。與 JMS 不同,JMS 本身是介面級別的 API,AMQP 是線級別協議。該協議的實現提供了自己的客戶端庫,因此模板介面的每個實現都依賴於特定的客戶端庫。目前,只有一個實現:RabbitTemplate。在隨後的示例中,我們經常使用 AmqpTemplate。然而,當您檢視配置示例或任何例項化模板或呼叫 setter 的程式碼片段時,您會看到實現的型別(例如,RabbitTemplate)。

如前所述,AmqpTemplate 介面定義了所有用於傳送和接收訊息的基本操作。我們將在傳送訊息接收訊息中分別探討訊息傳送和接收。

另請參見Async Rabbit Template

新增重試功能

從 1.3 版本開始,您現在可以配置 RabbitTemplate 使用 RetryTemplate 來幫助處理與 Broker 連線相關的問題。有關完整資訊,請參閱 spring-retry 專案。以下僅是一個示例,它使用指數回退策略和預設的 SimpleRetryPolicy,在向呼叫者丟擲異常之前會嘗試三次。

以下示例使用 XML 名稱空間

<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="500" />
            <property name="multiplier" value="10.0" />
            <property name="maxInterval" value="10000" />
        </bean>
    </property>
</bean>

以下示例在 Java 中使用 @Configuration 註解

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    RetryTemplate retryTemplate = new RetryTemplate();
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(500);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(10000);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    template.setRetryTemplate(retryTemplate);
    return template;
}

從 1.4 版本開始,除了 retryTemplate 屬性外,RabbitTemplate 還支援 recoveryCallback 選項。它被用作 RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback) 的第二個引數。

RecoveryCallback 有一定的限制,因為它只包含重試上下文中的 lastThrowable 欄位。對於更復雜的用例,您應該使用外部的 RetryTemplate,以便透過上下文屬性向 RecoveryCallback 傳遞額外資訊。以下示例展示瞭如何實現:
retryTemplate.execute(
    new RetryCallback<Object, Exception>() {

        @Override
        public Object doWithRetry(RetryContext context) throws Exception {
            context.setAttribute("message", message);
            return rabbitTemplate.convertAndSend(exchange, routingKey, message);
        }

    }, new RecoveryCallback<Object>() {

        @Override
        public Object recover(RetryContext context) throws Exception {
            Object message = context.getAttribute("message");
            Throwable t = context.getLastThrowable();
            // Do something with message
            return null;
        }
    });
}

在這種情況下,您會將 RetryTemplate 注入到 RabbitTemplate 中。

釋出是非同步的 — 如何檢測成功和失敗

釋出訊息是一種非同步機制,預設情況下,無法路由的訊息會被 RabbitMQ 丟棄。對於成功的釋出,您可以收到非同步確認,如關聯的釋出者確認和返回中所述。考慮兩種失敗場景:

  • 釋出到一個 Exchange,但沒有匹配的目的地佇列。

  • 釋出到一個不存在的 Exchange。

第一種情況由釋出者返回涵蓋,如關聯的釋出者確認和返回中所述。

對於第二種情況,訊息會被丟棄並且不生成返回。底層 Channel 會因異常而關閉。預設情況下,此異常會被記錄日誌,但您可以透過向 CachingConnectionFactory 註冊一個 ChannelListener 來獲取此類事件的通知。以下示例展示瞭如何新增 ConnectionListener

this.connectionFactory.addConnectionListener(new ConnectionListener() {

    @Override
    public void onCreate(Connection connection) {
    }

    @Override
    public void onShutDown(ShutdownSignalException signal) {
        ...
    }

});

您可以檢查 signal 的 reason 屬性以確定發生的問題。

要在傳送執行緒上檢測異常,您可以在 RabbitTemplate 上設定 setChannelTransacted(true),異常將在 txCommit() 時檢測到。然而,事務會顯著降低效能,因此在僅為此一個用例啟用事務之前請仔細考慮。

關聯的釋出者確認和返回

AmqpTemplateRabbitTemplate 實現支援釋出者確認和返回。

對於返回的訊息,模板的 mandatory 屬性必須設定為 true,或者針對特定訊息的 mandatory-expression 必須評估為 true。此功能需要一個將其 publisherReturns 屬性設定為 trueCachingConnectionFactory(參見釋出者確認和返回)。透過呼叫 setReturnsCallback(ReturnsCallback callback) 並註冊 RabbitTemplate.ReturnsCallback,返回會發送給客戶端。該回調必須實現以下方法:

void returnedMessage(ReturnedMessage returned);

ReturnedMessage 包含以下屬性:

  • message - 返回的訊息本身

  • replyCode - 指示返回原因的程式碼

  • replyText - 返回的文字原因 - 例如 NO_ROUTE

  • exchange - 訊息傳送到的 Exchange

  • routingKey - 使用的 Routing Key

每個 RabbitTemplate 只支援一個 ReturnsCallback。另請參見回覆超時

對於釋出者確認(也稱為釋出者 ACK),模板需要一個將其 publisherConfirm 屬性設定為 ConfirmType.CORRELATEDCachingConnectionFactory。透過呼叫 setConfirmCallback(ConfirmCallback callback) 並註冊 RabbitTemplate.ConfirmCallback,確認會發送給客戶端。該回調必須實現此方法:

void confirm(CorrelationData correlationData, boolean ack, String cause);

CorrelationData 是客戶端傳送原始訊息時提供的一個物件。ack 對於 ack 為 true,對於 nack 為 false。對於 nack 例項,如果生成 nack 時原因可用,則 cause 可能包含 nack 的原因。一個例子是傳送訊息到不存在的 Exchange。在這種情況下,Broker 會關閉 Channel。關閉的原因包含在 cause 中。cause 是在 1.4 版本中新增的。

每個 RabbitTemplate 只支援一個 ConfirmCallback

當 Rabbit Template 的傳送操作完成時,Channel 會被關閉。當連線工廠快取已滿時(快取中有空間時,Channel 不會被物理關閉,返回和確認正常進行),這將阻止接收確認或返回。當快取已滿時,框架會延遲關閉最多五秒鐘,以便有時間接收確認和返回。使用確認時,當收到最後一個確認後 Channel 會被關閉。僅使用返回時,Channel 會保持開啟狀態長達五秒鐘。我們通常建議將連線工廠的 channelCacheSize 設定為一個足夠大的值,以便釋出訊息的 Channel 返回到快取中,而不是被關閉。您可以使用 RabbitMQ 管理外掛監控 Channel 使用情況。如果您看到 Channel 快速開啟和關閉,應考慮增加快取大小以減少伺服器開銷。
在 2.1 版本之前,啟用釋出者確認的 Channel 會在收到確認之前返回到快取。其他一些程序可能會取出該 Channel 並執行導致 Channel 關閉的操作,例如向不存在的 Exchange 釋出訊息。這可能導致確認丟失。2.1 及更高版本不再在確認未完成時將 Channel 返回到快取。RabbitTemplate 在每次操作後對 Channel 執行邏輯 close()。一般來說,這意味著一個 Channel 上一次只有一個未完成的確認。
從 2.2 版本開始,回撥會在連線工廠的 executor 執行緒之一上呼叫。這是為了避免在回撥內執行 Rabbit 操作時可能發生的死鎖。在以前的版本中,回撥直接在 amqp-client 連線 I/O 執行緒上呼叫;如果您執行某些 RPC 操作(例如開啟一個新 Channel),這將導致死鎖,因為 I/O 執行緒會阻塞等待結果,但結果需要由 I/O 執行緒本身處理。在那些版本中,需要在回撥內部將工作(例如傳送訊息)交給另一個執行緒。現在不再需要這樣做,因為框架現在將回調呼叫交給執行器處理。
只要返回回撥在 60 秒或更短時間內執行完成,即可保證在收到 ACK 之前收到返回的訊息。確認計劃在返回回撥退出後或 60 秒後交付,以先到者為準。

CorrelationData 物件有一個 CompletableFuture,您可以使用它來獲取結果,而不是在模板上使用 ConfirmCallback。以下示例展示瞭如何配置 CorrelationData 例項:

CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...

由於它是一個 CompletableFuture<Confirm>,您可以在就緒時 get() 獲取結果,或使用 whenComplete() 進行非同步回撥。Confirm 物件是一個簡單的 bean,具有兩個屬性:ackreason(針對 nack 例項)。Broker 生成的 nack 例項不會填充 reason。框架生成的 nack 例項(例如,在 ACK 未完成時關閉連線)會填充 reason。

此外,當同時啟用確認和返回時,如果訊息無法路由到任何佇列,CorrelationDatareturn 屬性將填充返回的訊息。保證在 future 設定 ack 之前,返回訊息屬性已被設定。CorrelationData.getReturn() 返回一個包含以下屬性的 ReturnMessage

  • message (返回的訊息)

  • replyCode

  • replyText

  • exchange

  • routingKey

另請參見範圍操作,瞭解一種更簡單的方式等待發布者確認。

範圍操作

通常,使用模板時,會從快取中取出(或建立)一個 Channel,用於執行操作,然後返回到快取中以便重用。在多執行緒環境中,無法保證下一個操作使用同一個 Channel。然而,有時您可能希望對 Channel 的使用有更多控制,並確保多個操作都在同一個 Channel 上執行。

從 2.0 版本開始,提供了一個名為 invoke 的新方法,帶有一個 OperationsCallback。在回撥範圍內並使用提供的 RabbitOperations 引數執行的任何操作都將使用相同的專用 Channel,該 Channel 最後會被關閉(而不是返回到快取)。如果 Channel 是 PublisherCallbackChannel,它會在收到所有確認後返回到快取(參見關聯的釋出者確認和返回)。

@FunctionalInterface
public interface OperationsCallback<T> {

    T doInRabbit(RabbitOperations operations);

}

您可能需要此功能的一個例子是,如果您希望使用底層 Channel 上的 waitForConfirms() 方法。如前所述,Spring API 以前沒有公開此方法,因為 Channel 通常是快取和共享的。RabbitTemplate 現在提供了 waitForConfirms(long timeout)waitForConfirmsOrDie(long timeout),它們委託給在 OperationsCallback 範圍內使用的專用 Channel。出於顯而易見的原因,這些方法不能在該範圍之外使用。

請注意,一個更高階的抽象,允許您將確認與請求關聯,已在其他地方提供(參見關聯的釋出者確認和返回)。如果您只想等待 Broker 確認交付,可以使用以下示例所示的技術:

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
});

如果您希望 RabbitAdmin 操作在 OperationsCallback 範圍內的同一個 Channel 上呼叫,則必須使用用於 invoke 操作的同一個 RabbitTemplate 構建 Admin。

如果模板操作已在現有事務的範圍內執行(例如,在事務性監聽器容器執行緒上執行並在事務性模板上執行操作),則前面的討論就沒有意義了。在這種情況下,操作會在該 Channel 上執行,並在執行緒返回到容器時提交。在這種場景下,沒有必要使用 invoke

以這種方式使用確認時,為將確認與請求關聯而設定的大部分基礎設施實際上是不需要的(除非也啟用了返回)。從 2.2 版本開始,連線工廠支援一個名為 publisherConfirmType 的新屬性。當此屬性設定為 ConfirmType.SIMPLE 時,可以避免使用該基礎設施,從而使確認處理更加高效。

此外,RabbitTemplate 會在傳送訊息的 MessageProperties 中設定 publisherSequenceNumber 屬性。如果您希望檢查(或記錄或以其他方式使用)特定的確認,可以使用過載的 invoke 方法,如下例所示:

public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
        com.rabbitmq.client.ConfirmCallback nacks);
這些 ConfirmCallback 物件(針對 acknack 例項)是 Rabbit 客戶端的回撥,而不是模板的回撥。

以下示例記錄 acknack 例項:

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
}, (tag, multiple) -> {
        log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
        log.info("Nack: " + tag + ":" + multiple);
}));
範圍操作繫結到執行緒。有關多執行緒環境中嚴格順序的討論,請參見多執行緒環境中的嚴格訊息順序

多執行緒環境中的嚴格訊息順序

範圍操作中的討論僅適用於在同一執行緒上執行操作的情況。

考慮以下情況:

  • thread-1 將訊息傳送到佇列,並將工作移交給 thread-2

  • thread-2 將訊息傳送到同一個佇列

由於 RabbitMQ 的非同步特性和快取 Channel 的使用,無法確定是否會使用同一個 Channel,因此訊息到達佇列的順序無法保證。(大多數情況下它們會按順序到達,但亂序交付的機率不為零)。為解決此用例,您可以使用大小為 1 的有界 Channel 快取(以及 channelCheckoutTimeout)以確保訊息始終在同一個 Channel 上釋出,從而保證順序。為此,如果您有連線工廠的其他用途(例如消費者),您應該要麼為模板使用專用的連線工廠,要麼配置模板使用主連線工廠中嵌入的釋出者連線工廠(參見使用獨立的連線)。

用一個簡單的 Spring Boot 應用可以最好地說明這一點:

@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	TaskExecutor exec() {
		ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
		exec.setCorePoolSize(10);
		return exec;
	}

	@Bean
	CachingConnectionFactory ccf() {
		CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
		CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
		publisherCF.setChannelCacheSize(1);
		publisherCF.setChannelCheckoutTimeout(1000L);
		return ccf;
	}

	@RabbitListener(queues = "queue")
	void listen(String in) {
		log.info(in);
	}

	@Bean
	Queue queue() {
		return new Queue("queue");
	}


	@Bean
	public ApplicationRunner runner(Service service, TaskExecutor exec) {
		return args -> {
			exec.execute(() -> service.mainService("test"));
		};
	}

}

@Component
class Service {

	private static final Logger LOG = LoggerFactory.getLogger(Service.class);

	private final RabbitTemplate template;

	private final TaskExecutor exec;

	Service(RabbitTemplate template, TaskExecutor exec) {
		template.setUsePublisherConnection(true);
		this.template = template;
		this.exec = exec;
	}

	void mainService(String toSend) {
		LOG.info("Publishing from main service");
		this.template.convertAndSend("queue", toSend);
		this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
	}

	void secondaryService(String toSend) {
		LOG.info("Publishing from secondary service");
		this.template.convertAndSend("queue", toSend);
	}

}

即使釋出操作在兩個不同的執行緒上執行,它們都將使用同一個 Channel,因為快取被限制為單個 Channel。

從 2.3.7 版本開始,ThreadChannelConnectionFactory 支援使用 prepareContextSwitchswitchContext 方法將執行緒的 Channel 轉移到另一個執行緒。第一個方法返回一個上下文,該上下文傳遞給呼叫第二個方法的第二個執行緒。一個執行緒可以繫結一個非事務性 Channel 或一個事務性 Channel(或各一個);您無法單獨轉移它們,除非您使用兩個連線工廠。示例如下:

@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	TaskExecutor exec() {
		ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
		exec.setCorePoolSize(10);
		return exec;
	}

	@Bean
	ThreadChannelConnectionFactory tccf() {
		ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
		rabbitConnectionFactory.setHost("localhost");
		return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
	}

	@RabbitListener(queues = "queue")
	void listen(String in) {
		log.info(in);
	}

	@Bean
	Queue queue() {
		return new Queue("queue");
	}


	@Bean
	public ApplicationRunner runner(Service service, TaskExecutor exec) {
		return args -> {
			exec.execute(() -> service.mainService("test"));
		};
	}

}

@Component
class Service {

	private static final Logger LOG = LoggerFactory.getLogger(Service.class);

	private final RabbitTemplate template;

	private final TaskExecutor exec;

	private final ThreadChannelConnectionFactory connFactory;

	Service(RabbitTemplate template, TaskExecutor exec,
			ThreadChannelConnectionFactory tccf) {

		this.template = template;
		this.exec = exec;
		this.connFactory = tccf;
	}

	void mainService(String toSend) {
		LOG.info("Publishing from main service");
		this.template.convertAndSend("queue", toSend);
		Object context = this.connFactory.prepareSwitchContext();
		this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
	}

	void secondaryService(String toSend, Object threadContext) {
		LOG.info("Publishing from secondary service");
		this.connFactory.switchContext(threadContext);
		this.template.convertAndSend("queue", toSend);
		this.connFactory.closeThreadChannel();
	}

}
呼叫 prepareSwitchContext 後,如果當前執行緒再執行任何操作,它們將在新的 Channel 上執行。不再需要時關閉執行緒繫結的 Channel 非常重要。

訊息整合

從 1.4 版本開始,RabbitMessagingTemplate(構建在 RabbitTemplate 之上)提供了與 Spring Framework 訊息抽象的整合——即 org.springframework.messaging.Message。這允許您使用 spring-messagingMessage<?> 抽象來發送和接收訊息。此抽象被其他 Spring 專案使用,例如 Spring Integration 和 Spring 的 STOMP 支援。涉及兩種訊息轉換器:一種用於在 spring-messaging 的 Message<?> 抽象和 Spring AMQP 的 Message 抽象之間進行轉換,另一種用於在 Spring AMQP 的 Message 抽象和底層 RabbitMQ 客戶端庫所需的格式之間進行轉換。預設情況下,訊息負載由提供的 RabbitTemplate 例項的訊息轉換器轉換。或者,您可以注入一個使用其他負載轉換器的自定義 MessagingMessageConverter,如下例所示:

MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);

驗證使用者 ID

從 1.6 版本開始,模板現在支援 user-id-expression(使用 Java 配置時為 userIdExpression)。如果傳送訊息,則在評估此表示式後設置(如果尚未設定)使用者 ID 屬性。評估的根物件是要傳送的訊息。

以下示例展示瞭如何使用 user-id-expression 屬性:

<rabbit:template ... user-id-expression="'guest'" />

<rabbit:template ... user-id-expression="@myConnectionFactory.username" />

第一個示例是字面表示式。第二個示例從應用上下文中的連線工廠 bean 獲取 username 屬性。

使用獨立的連線

從 2.0.2 版本開始,您可以將 usePublisherConnection 屬性設定為 true,以便在可能的情況下使用與監聽器容器所用連線不同的連線。這是為了避免當生產者因任何原因阻塞時,消費者也被阻塞。連線工廠為此目的維護第二個內部連線工廠;預設情況下,它與主工廠型別相同,但如果希望為釋出使用不同的工廠型別,可以顯式設定。如果 Rabbit Template 在由監聽器容器啟動的事務中執行,無論此設定如何,都將使用容器的 Channel。

通常,您不應將 RabbitAdmin 與此屬性設定為 true 的模板一起使用。使用接受連線工廠的 RabbitAdmin 建構函式。如果您使用接受模板的另一個建構函式,請確保模板的此屬性為 false。這是因為,Admin 通常用於為監聽器容器宣告佇列。使用將此屬性設定為 true 的模板將意味著獨佔佇列(例如 AnonymousQueue)將在與監聽器容器使用的連線不同的連線上宣告。在這種情況下,容器無法使用這些佇列。