AMQP 支援的訊息通道
有兩種訊息通道實現可用。一種是點對點,另一種是釋出-訂閱。這兩種通道都為底層的 AmqpTemplate
和 SimpleMessageListenerContainer
提供了廣泛的配置屬性(如本章前面通道介面卡和閘道器所示)。然而,我們在這裡展示的示例配置最少。請查閱 XML Schema 以檢視可用屬性。
點對點通道可能如下所示
<int-amqp:channel id="p2pChannel"/>
在內部,前面的示例會宣告一個名為 si.p2pChannel
的 Queue
,並且此通道會向該 Queue
傳送訊息(技術上,透過將訊息傳送到與此 Queue
名稱匹配的路由鍵的無名直接交換機)。此通道還會在此 Queue
上註冊一個消費者。如果您希望通道是“可輪詢的”而不是訊息驅動的,請提供 message-driven
標誌並將其值設為 false
,如下例所示
<int-amqp:channel id="p2pPollableChannel" message-driven="false"/>
釋出-訂閱通道可能如下所示
<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
在內部,前面的示例會宣告一個名為 si.fanout.pubSubChannel
的扇出交換機,並且此通道會向該扇出交換機發送訊息。此通道還會宣告一個伺服器命名的獨佔、自動刪除、非持久化 Queue
,並將其繫結到扇出交換機,同時在該 Queue
上註冊一個消費者來接收訊息。釋出-訂閱通道沒有“可輪詢”選項。它必須是訊息驅動的。
從 4.1 版本開始,AMQP 支援的訊息通道(結合 channel-transacted
)支援 template-channel-transacted
,用於分離 AbstractMessageListenerContainer
和 RabbitTemplate
的 transactional
配置。請注意,之前 channel-transacted
預設為 true
。現在,對於 AbstractMessageListenerContainer
,它預設為 false
。
在 4.3 版本之前,AMQP 支援的通道僅支援有效載荷和頭部為 Serializable
的訊息。整個訊息會被轉換(序列化)併發送到 RabbitMQ。現在,您可以將 extract-payload
屬性(或使用 Java 配置時的 setExtractPayload()
方法)設定為 true
。當此標誌為 true
時,訊息有效載荷會被轉換,頭部會被對映,方式類似於使用通道介面卡。這種配置允許 AMQP 支援的通道與非序列化有效載荷一起使用(可能與另一個訊息轉換器,例如 Jackson2JsonMessageConverter
一起使用)。有關預設對映頭部的更多資訊,請參閱 AMQP 訊息頭。您可以透過提供使用 outbound-header-mapper
和 inbound-header-mapper
屬性的自定義對映器來修改對映。您現在還可以指定一個 default-delivery-mode
,當沒有 amqp_deliveryMode
頭部時,它用於設定投遞模式。預設情況下,Spring AMQP MessageProperties
使用 PERSISTENT
投遞模式。
與其他持久化支援的通道一樣,AMQP 支援的通道旨在提供訊息持久化以避免訊息丟失。它們不用於將工作分發到其他對等應用程式。為此目的,請使用通道介面卡。 |
從 5.0 版本開始,可輪詢通道現在會阻塞輪詢器執行緒達指定的 receiveTimeout 時間(預設值為 1 秒)。之前,與其他 PollableChannel 實現不同,如果佇列中沒有可用訊息,無論接收超時時間是多少,執行緒都會立即返回給排程器。阻塞比使用 basicGet() (無超時)檢索訊息稍微昂貴一些,因為必須為接收每條訊息建立一個消費者。要恢復之前的行為,請將輪詢器的 receiveTimeout 設定為 0。 |
使用 Java 配置
以下示例展示瞭如何使用 Java 配置來配置通道
@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("foo");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("bar");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("baz");
factoryBean.setPubSub(false);
return factoryBean;
}
使用 Java DSL 配置
以下示例展示瞭如何使用 Java DSL 來配置通道
@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.pollableChannel(connectionFactory)
.queueName("foo"))
...
.get();
}
@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.channel(connectionFactory)
.queueName("bar"))
...
.get();
}
@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.publishSubscribeChannel(connectionFactory)
.queueName("baz"))
...
.get();
}