AMQP 支援的訊息通道

有兩種訊息通道實現可用。一種是 點對點(point-to-point),另一種是 釋出-訂閱(publish-subscribe)。這兩種通道都為底層的 AmqpTemplateSimpleMessageListenerContainer 提供了廣泛的配置屬性(如本章前面關於通道介面卡和閘道器所示)。但是,我們在此展示的示例配置極少。請查閱 XML 模式以檢視可用屬性。

點對點通道可能如下例所示

<int-amqp:channel id="p2pChannel"/>

在底層,上述示例會導致宣告一個名為 si.p2pChannelQueue,並且此通道會向該 Queue 傳送訊息(從技術上講,是透過將訊息傳送到路由鍵與此 Queue 名稱匹配的無名直連交換機)。此通道還會在此 Queue 上註冊一個消費者。如果您希望通道是“可輪詢的”而不是訊息驅動的,請將 message-driven 標誌設定為 false,如下例所示

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>

釋出-訂閱通道可能如下所示

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>

在底層,上述示例會導致宣告一個名為 si.fanout.pubSubChannel 的扇形交換機,並且此通道會向該扇形交換機發送訊息。此通道還會宣告一個伺服器命名的獨佔、自動刪除、非持久化 Queue,並將其繫結到扇形交換機,同時在該 Queue 上註冊一個消費者以接收訊息。釋出-訂閱通道沒有“可輪詢”選項。它必須是訊息驅動的。

從版本 4.1 開始,AMQP 支援的訊息通道(與 channel-transacted 結合使用)支援 template-channel-transacted,用於分離 AbstractMessageListenerContainerRabbitTemplate事務 配置。請注意,以前 channel-transacted 預設為 true。現在,對於 AbstractMessageListenerContainer,它預設為 false

在 4.3 版本之前,AMQP 支援的通道僅支援帶有 Serializable 訊息負載和訊息頭的訊息。整個訊息會被轉換(序列化)併發送到 RabbitMQ。現在,您可以將 extract-payload 屬性(或使用 Java 配置時設定 setExtractPayload())設定為 true。當此標誌為 true 時,訊息負載會被轉換,訊息頭會被對映,其方式類似於您使用通道介面卡時。這種安排允許 AMQP 支援的通道與不可序列化的訊息負載一起使用(可能與另一個訊息轉換器一起使用,例如 Jackson2JsonMessageConverter)。有關預設對映訊息頭的更多資訊,請參閱 AMQP 訊息頭。您可以透過提供使用 outbound-header-mapperinbound-header-mapper 屬性的自定義對映器來修改對映。您現在還可以指定 default-delivery-mode,當沒有 amqp_deliveryMode 訊息頭時,它用於設定傳輸模式。預設情況下,Spring AMQP MessageProperties 使用 PERSISTENT 傳輸模式。

與其他持久化支援的通道一樣,AMQP 支援的通道旨在提供訊息持久化以避免訊息丟失。它們並非旨在將工作分發給其他對等應用程式。為此,請改用通道介面卡。
從 5.0 版本開始,可輪詢通道現在會阻塞輪詢器執行緒,直到達到指定的 receiveTimeout(預設值為 1 秒)。以前,與其他 PollableChannel 實現不同,如果訊息不可用,無論接收超時時間如何,執行緒都會立即返回到排程器。阻塞比使用 basicGet() 檢索訊息(沒有超時)稍微昂貴一些,因為必須為接收每條訊息建立一個消費者。要恢復以前的行為,請將輪詢器的 receiveTimeout 設定為 0。

使用 Java 配置

以下示例展示瞭如何使用 Java 配置來配置通道

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("foo");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("bar");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("baz");
    factoryBean.setPubSub(false);
    return factoryBean;
}

使用 Java DSL 進行配置

以下示例展示瞭如何使用 Java DSL 來配置通道

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.pollableChannel(connectionFactory)
                    .queueName("foo"))
            ...
            .get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.channel(connectionFactory)
                    .queueName("bar"))
            ...
            .get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.publishSubscribeChannel(connectionFactory)
                    .queueName("baz"))
            ...
            .get();
}
© . This site is unofficial and not affiliated with VMware.