異步出站閘道器

前面討論的閘道器是同步的,這意味著傳送執行緒會被掛起,直到接收到回覆(或發生超時)。Spring Integration 4.3 版本添加了非同步閘道器,它使用了 Spring AMQP 的 AsyncRabbitTemplate。傳送訊息時,執行緒在傳送操作完成後立即返回;接收訊息時,回覆在模板的監聽容器執行緒上傳送。當閘道器在輪詢執行緒上被呼叫時,這會非常有用。執行緒會被釋放,並可用於框架中的其他任務。

以下列表顯示了 AMQP 異步出站閘道器可能的配置選項

  • Java DSL

  • Java

  • XML

@Configuration
public class AmqpAsyncApplication {

    @Bean
    public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {
        return f -> f
                .handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)
                        .routingKey("queue1")); // default exchange - route to queue 'queue1'
    }

    @MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")
    public interface MyGateway {

        String sendToRabbit(String data);

    }

}
@Configuration
public class AmqpAsyncConfig {

    @Bean
    @ServiceActivator(inputChannel = "amqpOutboundChannel")
    public AsyncAmqpOutboundGateway amqpOutbound(AsyncRabbitTemplate asyncTemplate) {
        AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate);
        outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
        return outbound;
    }

    @Bean
    public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate,
                     SimpleMessageListenerContainer replyContainer) {

        return new AsyncRabbitTemplate(rabbitTemplate, replyContainer);
    }

    @Bean
    public SimpleMessageListenerContainer replyContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
        container.setQueueNames("asyncRQ1");
        return container;
    }

    @Bean
    public MessageChannel amqpOutboundChannel() {
        return new DirectChannel();
    }

}
<int-amqp:outbound-async-gateway id="asyncOutboundGateway"    (1)
                           request-channel="myRequestChannel" (2)
                           async-template=""                  (3)
                           exchange-name=""                   (4)
                           exchange-name-expression=""        (5)
                           order="1"                          (6)
                           reply-channel=""                   (7)
                           reply-timeout=""                   (8)
                           requires-reply=""                  (9)
                           routing-key=""                     (10)
                           routing-key-expression=""          (11)
                           default-delivery-mode""            (12)
                           confirm-correlation-expression=""  (13)
                           confirm-ack-channel=""             (14)
                           confirm-nack-channel=""            (15)
                           confirm-timeout=""                 (16)
                           return-channel=""                  (17)
                           lazy-connect="true" />             (18)
1 此介面卡的唯一 ID。可選。
2 應傳送訊息到此訊息通道,以便將其轉換併發布到 AMQP 交換機。必需。
3 對已配置的 AsyncRabbitTemplate Bean 的引用。可選(預設為 asyncRabbitTemplate)。
4 應傳送訊息到此 AMQP 交換機的名稱。如果未提供,訊息將傳送到預設的無名交換機。與 'exchange-name-expression' 互斥。可選。
5 一個 SpEL 表示式,用於確定訊息應傳送到的 AMQP 交換機的名稱,訊息作為根物件。如果未提供,訊息將傳送到預設的無名交換機。與 'exchange-name' 互斥。可選。
6 當註冊了多個消費者時,此消費者的順序,從而啟用負載均衡和故障轉移。可選(預設為 Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE])。
7 從 AMQP 佇列接收並轉換後,回覆應傳送到此訊息通道。可選。
8 閘道器在傳送回覆訊息到 reply-channel 時等待的時間。這僅適用於 reply-channel 可能阻塞的情況——例如,容量已滿的 QueueChannel。預設是無限。
9 當在 AsyncRabbitTemplatereceiveTimeout 屬性指定的時間內未接收到回覆訊息且此設定為 true 時,閘道器會將錯誤訊息傳送到入站訊息的 errorChannel 訊息頭。當在 AsyncRabbitTemplatereceiveTimeout 屬性指定的時間內未接收到回覆訊息且此設定為 false 時,閘道器會將錯誤訊息傳送到預設的 errorChannel(如果可用)。預設為 true
10 傳送訊息時使用的路由鍵。預設情況下,這是一個空的 String。與 'routing-key-expression' 互斥。可選。
11 一個 SpEL 表示式,用於確定傳送訊息時使用的路由鍵,訊息作為根物件(例如,'payload.key')。預設情況下,這是一個空的 String。與 'routing-key' 互斥。可選。
12 訊息的預設投遞模式:PERSISTENTNON_PERSISTENT。如果 header-mapper 設定了投遞模式,則會被覆蓋。如果存在 Spring Integration 訊息頭 (amqp_deliveryMode),則 DefaultHeaderMapper 會設定其值。如果未提供此屬性且 header mapper 未設定,則預設值取決於 RabbitTemplate 使用的底層 Spring AMQP MessagePropertiesConverter。如果未自定義該轉換器,則預設是 PERSISTENT。可選。
13 一個表示式,用於定義關聯資料。提供此表示式後,會配置底層 AMQP 模板接收發布者確認。需要一個專用的 RabbitTemplate 和一個設定了 publisherConfirms 屬性為 trueCachingConnectionFactory。當接收到釋出者確認並提供了關聯資料時,確認會寫入到 confirm-ack-channelconfirm-nack-channel,具體取決於確認型別。確認的載荷是此表示式定義的關聯資料,並且訊息的 'amqp_publishConfirm' 頭會被設定為 true (ack) 或 false (nack)。對於 nack 例項,會提供一個額外的頭 (amqp_publishConfirmNackCause)。示例:headers['myCorrelationData'], payload。如果表示式解析為 Message<?> 例項(例如“#this”),則在 ack/nack 通道上發出的訊息將基於該訊息,並新增額外的頭。另請參見 釋出者確認和返回的替代機制。可選。
14 積極 (ack) 釋出者確認將傳送到此通道。載荷是 confirm-correlation-expression 定義的關聯資料。要求底層的 AsyncRabbitTemplate 將其 enableConfirms 屬性設定為 true。另請參見 釋出者確認和返回的替代機制。可選(預設為 nullChannel)。
15 自 4.2 版本起。消極 (nack) 釋出者確認將傳送到此通道。載荷是 confirm-correlation-expression 定義的關聯資料。要求底層的 AsyncRabbitTemplate 將其 enableConfirms 屬性設定為 true。另請參見 釋出者確認和返回的替代機制。可選(預設為 nullChannel)。
16 設定此值後,如果在指定毫秒時間內未收到釋出者確認,閘道器將合成一個消極確認 (nack)。待確認項每隔此值的 50% 進行檢查,因此實際傳送 nack 的時間將在該值的 1 倍到 1.5 倍之間。另請參見 釋出者確認和返回的替代機制。預設無(不會生成 nacks)。
17 退回的訊息將傳送到此通道。提供此項後,底層 AMQP 模板會被配置為將無法投遞的訊息返回到閘道器。訊息將根據從 AMQP 接收的資料構建,幷包含以下附加頭:amqp_returnReplyCode, amqp_returnReplyText, amqp_returnExchangeamqp_returnRoutingKey。要求底層的 AsyncRabbitTemplate 將其 mandatory 屬性設定為 true。另請參見 釋出者確認和返回的替代機制。可選。
18 當設定為 false 時,端點會在應用上下文初始化期間嘗試連線到 broker。這樣做可以透過在 broker 關閉時記錄錯誤訊息來快速檢測錯誤的配置。“快速失敗”。當設定為 true(預設)時,連線會在傳送第一條訊息時建立(如果它尚未因其他元件而存在)。

另請參見 非同步服務啟用器 以獲取更多資訊。

RabbitTemplate

當您使用確認和返回時,建議連線到 AsyncRabbitTemplateRabbitTemplate 是專用的。否則,可能會遇到意外的副作用。