AMQP 支援的訊息通道

有兩種訊息通道實現可用。一種是點對點,另一種是釋出-訂閱。這兩種通道都為底層的 AmqpTemplateSimpleMessageListenerContainer 提供了廣泛的配置屬性(如本章前面通道介面卡和閘道器所示)。然而,我們在這裡展示的示例配置最少。請查閱 XML Schema 以檢視可用屬性。

點對點通道可能如下所示

<int-amqp:channel id="p2pChannel"/>

在內部,前面的示例會宣告一個名為 si.p2pChannelQueue,並且此通道會向該 Queue 傳送訊息(技術上,透過將訊息傳送到與此 Queue 名稱匹配的路由鍵的無名直接交換機)。此通道還會在此 Queue 上註冊一個消費者。如果您希望通道是“可輪詢的”而不是訊息驅動的,請提供 message-driven 標誌並將其值設為 false,如下例所示

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>

釋出-訂閱通道可能如下所示

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>

在內部,前面的示例會宣告一個名為 si.fanout.pubSubChannel 的扇出交換機,並且此通道會向該扇出交換機發送訊息。此通道還會宣告一個伺服器命名的獨佔、自動刪除、非持久化 Queue,並將其繫結到扇出交換機,同時在該 Queue 上註冊一個消費者來接收訊息。釋出-訂閱通道沒有“可輪詢”選項。它必須是訊息驅動的。

從 4.1 版本開始,AMQP 支援的訊息通道(結合 channel-transacted)支援 template-channel-transacted,用於分離 AbstractMessageListenerContainerRabbitTemplatetransactional 配置。請注意,之前 channel-transacted 預設為 true。現在,對於 AbstractMessageListenerContainer,它預設為 false

在 4.3 版本之前,AMQP 支援的通道僅支援有效載荷和頭部為 Serializable 的訊息。整個訊息會被轉換(序列化)併發送到 RabbitMQ。現在,您可以將 extract-payload 屬性(或使用 Java 配置時的 setExtractPayload() 方法)設定為 true。當此標誌為 true 時,訊息有效載荷會被轉換,頭部會被對映,方式類似於使用通道介面卡。這種配置允許 AMQP 支援的通道與非序列化有效載荷一起使用(可能與另一個訊息轉換器,例如 Jackson2JsonMessageConverter 一起使用)。有關預設對映頭部的更多資訊,請參閱 AMQP 訊息頭。您可以透過提供使用 outbound-header-mapperinbound-header-mapper 屬性的自定義對映器來修改對映。您現在還可以指定一個 default-delivery-mode,當沒有 amqp_deliveryMode 頭部時,它用於設定投遞模式。預設情況下,Spring AMQP MessageProperties 使用 PERSISTENT 投遞模式。

與其他持久化支援的通道一樣,AMQP 支援的通道旨在提供訊息持久化以避免訊息丟失。它們不用於將工作分發到其他對等應用程式。為此目的,請使用通道介面卡。
從 5.0 版本開始,可輪詢通道現在會阻塞輪詢器執行緒達指定的 receiveTimeout 時間(預設值為 1 秒)。之前,與其他 PollableChannel 實現不同,如果佇列中沒有可用訊息,無論接收超時時間是多少,執行緒都會立即返回給排程器。阻塞比使用 basicGet()(無超時)檢索訊息稍微昂貴一些,因為必須為接收每條訊息建立一個消費者。要恢復之前的行為,請將輪詢器的 receiveTimeout 設定為 0。

使用 Java 配置

以下示例展示瞭如何使用 Java 配置來配置通道

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("foo");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("bar");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("baz");
    factoryBean.setPubSub(false);
    return factoryBean;
}

使用 Java DSL 配置

以下示例展示瞭如何使用 Java DSL 來配置通道

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.pollableChannel(connectionFactory)
                    .queueName("foo"))
            ...
            .get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.channel(connectionFactory)
                    .queueName("bar"))
            ...
            .get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.publishSubscribeChannel(connectionFactory)
                    .queueName("baz"))
            ...
            .get();
}