AmqpTemplate

與Spring Framework及相關專案提供的許多其他高階抽象一樣,Spring AMQP也提供了一個扮演核心角色的“模板”。定義主要操作的介面稱為AmqpTemplate。這些操作涵蓋了傳送和接收訊息的一般行為。換句話說,它們並非任何實現所獨有——因此名稱中帶有“AMQP”。另一方面,該介面的實現與AMQP協議的實現緊密相關。與JMS本身是一個介面級API不同,AMQP是一個線級協議。該協議的實現提供了自己的客戶端庫,因此模板介面的每個實現都依賴於特定的客戶端庫。目前,只有一個實現:RabbitTemplate。在接下來的示例中,我們經常使用AmqpTemplate。但是,當您檢視配置示例或任何例項化模板或呼叫setter的程式碼片段時,您可以看到實現型別(例如,RabbitTemplate)。

如前所述,AmqpTemplate介面定義了傳送和接收訊息的所有基本操作。我們將在傳送訊息接收訊息中分別探討訊息的傳送和接收。

另請參閱非同步Rabbit模板

新增重試功能

從1.3版本開始,您可以配置RabbitTemplate使用RetryTemplate來幫助處理代理連線問題。有關更多資訊,請參閱Spring Framework中的核心重試支援。以下只是一個示例,它使用了指數退避策略和預設的SimpleRetryPolicy,該策略在將異常拋給呼叫者之前進行三次嘗試。

以下示例在Java中使用了@Configuration註解。

public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    RetryPolicy retryPolicy = RetryPolicy.builder()
            .delay(Duration.ofMillis(500))
            .multiplier(2.0)
            .maxDelay(Duration.ofSeconds(10))
            .build();
    template.setRetryTemplate(new RetryTemplate(retryPolicy));
    return template;
}

從1.4版本開始,除了retryTemplate屬性之外,RabbitTemplate還支援recoveryCallback選項。它作為RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback)的第二個引數使用。

RecoveryCallback有些受限,因為重試上下文只包含lastThrowable欄位。對於更復雜的用例,您應該使用外部RetryTemplate,以便透過上下文的屬性將額外資訊傳遞給RecoveryCallback。以下示例展示瞭如何實現:
retryTemplate.execute(
    new RetryCallback<Object, Exception>() {

        @Override
        public Object doWithRetry(RetryContext context) throws Exception {
            context.setAttribute("message", message);
            return rabbitTemplate.convertAndSend(exchange, routingKey, message);
        }

    }, new RecoveryCallback<Object>() {

        @Override
        public Object recover(RetryContext context) throws Exception {
            Object message = context.getAttribute("message");
            Throwable t = context.getLastThrowable();
            // Do something with message
            return null;
        }
    });
}

在這種情況下,您將不會RabbitTemplate注入RetryTemplate

釋出是非同步的——如何檢測成功和失敗

釋出訊息是一種非同步機制,預設情況下,無法路由的訊息會被RabbitMQ丟棄。對於成功的釋出,您可以收到非同步確認,如關聯釋出者確認和返回中所述。考慮兩種失敗場景:

  • 釋出到交換機但沒有匹配的目標佇列。

  • 釋出到不存在的交換機。

第一種情況由釋出者返回覆蓋,如關聯釋出者確認和返回中所述。

對於第二種情況,訊息被丟棄,不會生成返回。底層通道會因異常而關閉。預設情況下,此異常會被記錄,但您可以透過向CachingConnectionFactory註冊ChannelListener來獲取此類事件的通知。以下示例展示瞭如何新增ConnectionListener

this.connectionFactory.addConnectionListener(new ConnectionListener() {

    @Override
    public void onCreate(Connection connection) {
    }

    @Override
    public void onShutDown(ShutdownSignalException signal) {
        ...
    }

});

您可以檢查訊號的reason屬性以確定發生的問題。

要在傳送執行緒上檢測異常,您可以在RabbitTemplate上設定setChannelTransacted(true),異常將在txCommit()上被檢測到。但是,事務會顯著降低效能,因此在僅為此用例啟用事務之前,請仔細考慮這一點。

關聯釋出者確認和返回

AmqpTemplateRabbitTemplate實現支援釋出者確認和返回。

對於返回的訊息,模板的mandatory屬性必須設定為true,或者特定訊息的mandatory-expression必須評估為true。此功能需要一個CachingConnectionFactory,其publisherReturns屬性設定為true(參見釋出者確認和返回)。客戶端透過呼叫setReturnsCallback(ReturnsCallback callback)註冊RabbitTemplate.ReturnsCallback來接收返回。回撥必須實現以下方法:

void returnedMessage(ReturnedMessage returned);

ReturnedMessage具有以下屬性:

  • message - 返回的訊息本身

  • replyCode - 指示返回原因的程式碼

  • replyText - 返回的文字原因 - 例如 NO_ROUTE

  • exchange - 訊息傳送到的交換機

  • routingKey - 使用的路由鍵

每個RabbitTemplate只支援一個ReturnsCallback。另請參閱回覆超時

對於釋出者確認(也稱為釋出者回執),模板需要一個CachingConnectionFactory,其publisherConfirm屬性設定為ConfirmType.CORRELATED。客戶端透過呼叫setConfirmCallback(ConfirmCallback callback)註冊RabbitTemplate.ConfirmCallback來接收確認。回撥必須實現此方法:

void confirm(CorrelationData correlationData, boolean ack, String cause);

CorrelationData是客戶端在傳送原始訊息時提供的物件。ackack時為true,為nack時為false。對於nack例項,如果生成nack時可用,cause可能包含nack的原因。一個例子是向不存在的交換機發送訊息。在這種情況下,代理會關閉通道。關閉的原因包含在cause中。cause是在1.4版本中新增的。

一個RabbitTemplate只支援一個ConfirmCallback

當 Rabbit 模板傳送操作完成後,通道會關閉。這會阻止在連線工廠快取已滿時接收確認或返回(當快取中有空間時,通道不會物理關閉,返回和確認正常進行)。當快取已滿時,框架會延遲關閉長達五秒,以便有時間接收確認和返回。使用確認時,當收到最後一個確認時通道會關閉。只使用返回時,通道會保持開啟狀態整整五秒。我們通常建議將連線工廠的channelCacheSize設定為足夠大的值,以便釋出訊息的通道返回到快取而不是關閉。您可以使用 RabbitMQ 管理外掛監控通道使用情況。如果您看到通道快速開啟和關閉,您應該考慮增加快取大小以減少伺服器開銷。
在2.1版本之前,為釋出者確認啟用的通道在收到確認之前會返回到快取中。其他一些程序可能會檢出該通道並執行一些導致通道關閉的操作——例如將訊息釋出到不存在的交換機。這可能導致確認丟失。2.1及更高版本在確認未完成時不再將通道返回到快取。RabbitTemplate在每次操作後對通道執行邏輯close()。通常,這意味著通道上一次只有一個未完成的確認。
從2.2版本開始,回撥在連線工廠的一個executor執行緒上呼叫。這是為了避免如果您在回撥中執行Rabbit操作時可能發生的死鎖。在以前的版本中,回撥直接在amqp-client連線I/O執行緒上呼叫;如果您執行某些RPC操作(例如開啟新通道),這會導致死鎖,因為I/O執行緒會阻塞等待結果,但結果需要由I/O執行緒本身處理。在那些版本中,有必要將工作(例如傳送訊息)移交給回撥中的另一個執行緒。現在不再需要這樣做,因為框架現在將回調呼叫移交給執行器。
只要返回回撥在60秒或更短時間內執行,接收返回訊息早於ack的保證仍然得到維護。確認的交付安排在返回回撥退出後或60秒後(以先發生者為準)。

CorrelationData物件有一個CompletableFuture,您可以使用它來獲取結果,而不是在模板上使用ConfirmCallback。以下示例顯示瞭如何配置CorrelationData例項:

CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...

由於它是一個CompletableFuture<Confirm>,您可以在結果準備好時呼叫get()來獲取結果,或者使用whenComplete()進行非同步回撥。Confirm物件是一個簡單的bean,包含兩個屬性:ackreason(用於nack例項)。對於代理生成的nack例項,reason不填充。對於框架生成的nack例項(例如,在ack例項未決時關閉連線),reason會被填充。

此外,當同時啟用確認和返回時,如果訊息無法路由到任何佇列,則CorrelationDatareturn屬性將填充返回的訊息。保證在未來設定ack之前,返回訊息屬性已設定。CorrelationData.getReturn()返回一個帶有屬性的ReturnMessage

  • message(返回的訊息)

  • replyCode

  • replyText

  • exchange

  • routingKey

另請參閱範圍操作,以獲取等待發布者確認的更簡單機制。

作用域操作

通常,當使用模板時,一個Channel會從快取中檢出(或建立),用於操作,然後返回到快取中以供重用。在多執行緒環境中,不能保證下一個操作會使用相同的通道。但是,有時您可能希望對通道的使用有更多的控制,並確保許多操作都在同一個通道上執行。

從2.0版本開始,提供了一個名為invoke的新方法,帶有OperationsCallback。在回撥範圍內以及在提供的RabbitOperations引數上執行的任何操作都使用相同的專用Channel,該通道將在結束時關閉(不會返回到快取中)。如果通道是PublisherCallbackChannel,則在收到所有確認後返回到快取中(參見關聯釋出者確認和返回)。

@FunctionalInterface
public interface OperationsCallback<T> {

    T doInRabbit(RabbitOperations operations);

}

您可能需要這樣做的原因之一是,如果您希望在底層Channel上使用waitForConfirms()方法。由於通道通常是快取和共享的,如前所述,此方法以前未透過Spring API公開。RabbitTemplate現在提供了waitForConfirms(long timeout)waitForConfirmsOrDie(long timeout),它們委託給在OperationsCallback範圍內使用的專用通道。出於顯而易見的原因,這些方法不能在該範圍之外使用。

請注意,其他地方提供了更高級別的抽象,允許您將確認與請求關聯起來(參見關聯釋出者確認和返回)。如果您只想等到代理確認交付,您可以使用以下示例中顯示的技術:

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
});

如果您希望RabbitAdmin操作在OperationsCallback範圍內在同一通道上呼叫,則admin必須已使用與invoke操作相同的RabbitTemplate構建。

如果模板操作已在現有事務的範圍內執行(例如,當在事務監聽器容器執行緒上執行並對事務模板執行操作時),則前面的討論就無關緊要了。在這種情況下,操作在該通道上執行,並在執行緒返回到容器時提交。在這種情況下,無需使用invoke

以這種方式使用確認時,為將確認與請求關聯而設定的大部分基礎設施實際上是不需要的(除非也啟用了返回)。從2.2版本開始,連線工廠支援一個名為publisherConfirmType的新屬性。當將其設定為ConfirmType.SIMPLE時,將避免基礎設施,並且確認處理可以更高效。

此外,RabbitTemplate在傳送訊息的MessageProperties中設定publisherSequenceNumber屬性。如果您希望檢查(或記錄或以其他方式使用)特定的確認,您可以使用過載的invoke方法,如以下示例所示:

public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
        com.rabbitmq.client.ConfirmCallback nacks);
這些ConfirmCallback物件(用於acknack例項)是Rabbit客戶端回撥,而不是模板回撥。

以下示例記錄了acknack例項:

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
}, (tag, multiple) -> {
        log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
        log.info("Nack: " + tag + ":" + multiple);
}));
作用域操作繫結到執行緒。有關多執行緒環境中嚴格排序的討論,請參閱多執行緒環境中的嚴格訊息排序

多執行緒環境中的嚴格訊息排序

作用域操作中的討論僅適用於在同一執行緒上執行操作的情況。

考慮以下情況:

  • thread-1向佇列傳送訊息並將工作移交給thread-2

  • thread-2向同一個佇列傳送訊息

由於RabbitMQ的非同步特性和快取通道的使用;不確定是否會使用相同的通道,因此不能保證訊息到達佇列的順序。(在大多數情況下,它們會按順序到達,但亂序交付的機率不為零)。為了解決此用例,您可以使用大小為1的有界通道快取(以及channelCheckoutTimeout)來確保訊息始終在同一通道上釋出,並且順序將得到保證。為此,如果您有連線工廠的其他用途,例如消費者,您應該為模板使用專用連線工廠,或者配置模板以使用嵌入在主連線工廠中的釋出者連線工廠(參見使用單獨的連線)。

這最好透過一個簡單的Spring Boot應用程式來說明:

@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	TaskExecutor exec() {
		ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
		exec.setCorePoolSize(10);
		return exec;
	}

	@Bean
	CachingConnectionFactory ccf() {
		CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
		CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
		publisherCF.setChannelCacheSize(1);
		publisherCF.setChannelCheckoutTimeout(1000L);
		return ccf;
	}

	@RabbitListener(queues = "queue")
	void listen(String in) {
		log.info(in);
	}

	@Bean
	Queue queue() {
		return new Queue("queue");
	}


	@Bean
	public ApplicationRunner runner(Service service, TaskExecutor exec) {
		return args -> {
			exec.execute(() -> service.mainService("test"));
		};
	}

}

@Component
class Service {

	private static final Logger LOG = LoggerFactory.getLogger(Service.class);

	private final RabbitTemplate template;

	private final TaskExecutor exec;

	Service(RabbitTemplate template, TaskExecutor exec) {
		template.setUsePublisherConnection(true);
		this.template = template;
		this.exec = exec;
	}

	void mainService(String toSend) {
		LOG.info("Publishing from main service");
		this.template.convertAndSend("queue", toSend);
		this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
	}

	void secondaryService(String toSend) {
		LOG.info("Publishing from secondary service");
		this.template.convertAndSend("queue", toSend);
	}

}

即使釋出是在兩個不同的執行緒上執行的,它們都將使用相同的通道,因為快取被限制為單個通道。

從2.3.7版本開始,ThreadChannelConnectionFactory支援使用prepareContextSwitchswitchContext方法將執行緒的通道傳輸到另一個執行緒。第一個方法返回一個上下文,該上下文傳遞給呼叫第二個方法的第二個執行緒。一個執行緒可以繫結一個非事務性通道或一個事務性通道(或兩者都有);您不能單獨傳輸它們,除非您使用兩個連線工廠。示例如下:

@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	TaskExecutor exec() {
		ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
		exec.setCorePoolSize(10);
		return exec;
	}

	@Bean
	ThreadChannelConnectionFactory tccf() {
		ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
		rabbitConnectionFactory.setHost("localhost");
		return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
	}

	@RabbitListener(queues = "queue")
	void listen(String in) {
		log.info(in);
	}

	@Bean
	Queue queue() {
		return new Queue("queue");
	}


	@Bean
	public ApplicationRunner runner(Service service, TaskExecutor exec) {
		return args -> {
			exec.execute(() -> service.mainService("test"));
		};
	}

}

@Component
class Service {

	private static final Logger LOG = LoggerFactory.getLogger(Service.class);

	private final RabbitTemplate template;

	private final TaskExecutor exec;

	private final ThreadChannelConnectionFactory connFactory;

	Service(RabbitTemplate template, TaskExecutor exec,
			ThreadChannelConnectionFactory tccf) {

		this.template = template;
		this.exec = exec;
		this.connFactory = tccf;
	}

	void mainService(String toSend) {
		LOG.info("Publishing from main service");
		this.template.convertAndSend("queue", toSend);
		Object context = this.connFactory.prepareSwitchContext();
		this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
	}

	void secondaryService(String toSend, Object threadContext) {
		LOG.info("Publishing from secondary service");
		this.connFactory.switchContext(threadContext);
		this.template.convertAndSend("queue", toSend);
		this.connFactory.closeThreadChannel();
	}

}
一旦呼叫了prepareSwitchContext,如果當前執行緒執行任何其他操作,它們將在新通道上執行。當不再需要執行緒繫結的通道時,關閉它是很重要的。

訊息整合

從1.4版本開始,RabbitMessagingTemplate(基於RabbitTemplate構建)提供了與Spring Framework訊息抽象(即org.springframework.messaging.Message)的整合。這允許您使用spring-messagingMessage<?>抽象來發送和接收訊息。此抽象被其他Spring專案使用,例如Spring Integration和Spring的STOMP支援。涉及兩個訊息轉換器:一個用於在spring-messaging的Message<?>和Spring AMQP的Message抽象之間進行轉換,另一個用於在Spring AMQP的Message抽象和底層RabbitMQ客戶端庫所需的格式之間進行轉換。預設情況下,訊息負載由提供的RabbitTemplate例項的訊息轉換器進行轉換。或者,您可以注入一個帶有其他負載轉換器的自定義MessagingMessageConverter,示例如下:

MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);

驗證使用者ID

從1.6版本開始,模板現在支援user-id-expression(使用Java配置時為userIdExpression)。如果傳送訊息,則在評估此表示式後設置使用者ID屬性(如果尚未設定)。評估的根物件是要傳送的訊息。

以下示例展示瞭如何使用user-id-expression屬性:

<rabbit:template ... user-id-expression="'guest'" />

<rabbit:template ... user-id-expression="@myConnectionFactory.username" />

第一個示例是文字表達式。第二個從應用程式上下文中的連線工廠bean獲取username屬性。

使用單獨的連線

從2.0.2版本開始,您可以將usePublisherConnection屬性設定為true,以便在可能的情況下使用與監聽器容器不同的連線。這是為了避免當生產者因任何原因被阻塞時消費者也被阻塞。為此,連線工廠維護第二個內部連線工廠;預設情況下,它與主工廠型別相同,但如果您希望為釋出使用不同的工廠型別,則可以明確設定。如果rabbit模板在由監聽器容器啟動的事務中執行,則無論此設定如何,都將使用容器的通道。

通常,您不應將RabbitAdmin與將此屬性設定為true的模板一起使用。請使用接受連線工廠的RabbitAdmin建構函式。如果您使用接受模板的其他建構函式,請確保模板的屬性為false。這是因為,通常,admin用於為監聽器容器宣告佇列。使用將此屬性設定為true的模板將意味著獨佔佇列(例如AnonymousQueue)將在與監聽器容器使用的連線不同的連線上宣告。在這種情況下,容器無法使用這些佇列。
© . This site is unofficial and not affiliated with VMware.