異步出站閘道器

上一節討論的閘道器是同步的,即傳送執行緒會一直掛起,直到收到回覆(或發生超時)。Spring Integration 4.3 版增加了非同步閘道器,它使用 Spring AMQP 的 AsyncRabbitTemplate。傳送訊息時,執行緒在傳送操作完成後立即返回;收到訊息時,回覆將在模板的監聽器容器執行緒上傳送。當閘道器在輪詢執行緒上呼叫時,這會非常有用。該執行緒被釋放,可用於框架中的其他任務。

以下列表顯示了 AMQP 異步出站閘道器的可能配置選項

  • Java DSL

  • Java

  • XML

@Configuration
public class AmqpAsyncApplication {

    @Bean
    public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {
        return f -> f
                .handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)
                        .routingKey("queue1")); // default exchange - route to queue 'queue1'
    }

    @MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")
    public interface MyGateway {

        String sendToRabbit(String data);

    }

}
@Configuration
public class AmqpAsyncConfig {

    @Bean
    @ServiceActivator(inputChannel = "amqpOutboundChannel")
    public AsyncAmqpOutboundGateway amqpOutbound(AsyncRabbitTemplate asyncTemplate) {
        AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate);
        outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
        return outbound;
    }

    @Bean
    public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate,
                     SimpleMessageListenerContainer replyContainer) {

        return new AsyncRabbitTemplate(rabbitTemplate, replyContainer);
    }

    @Bean
    public SimpleMessageListenerContainer replyContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
        container.setQueueNames("asyncRQ1");
        return container;
    }

    @Bean
    public MessageChannel amqpOutboundChannel() {
        return new DirectChannel();
    }

}
<int-amqp:outbound-async-gateway id="asyncOutboundGateway"    (1)
                           request-channel="myRequestChannel" (2)
                           async-template=""                  (3)
                           exchange-name=""                   (4)
                           exchange-name-expression=""        (5)
                           order="1"                          (6)
                           reply-channel=""                   (7)
                           reply-timeout=""                   (8)
                           requires-reply=""                  (9)
                           routing-key=""                     (10)
                           routing-key-expression=""          (11)
                           default-delivery-mode""            (12)
                           confirm-correlation-expression=""  (13)
                           confirm-ack-channel=""             (14)
                           confirm-nack-channel=""            (15)
                           confirm-timeout=""                 (16)
                           return-channel=""                  (17)
                           lazy-connect="true" />             (18)
1 此介面卡的唯一 ID。可選。
2 訊息通道,訊息應傳送到該通道,以便將其轉換併發布到 AMQP 交換機。必需。
3 對已配置的 AsyncRabbitTemplate 的 Bean 引用。可選(預設為 asyncRabbitTemplate)。
4 訊息應傳送到的 AMQP 交換機名稱。如果未提供,訊息將傳送到預設的無名交換機。與“exchange-name-expression”互斥。可選。
5 一個 SpEL 表示式,用於評估訊息應傳送到的 AMQP 交換機名稱,訊息作為根物件。如果未提供,訊息將傳送到預設的無名交換機。與“exchange-name”互斥。可選。
6 當註冊了多個消費者時,此消費者的順序,從而實現負載均衡和故障轉移。可選(預設為 Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE])。
7 訊息通道,回覆應在從 AMQP 佇列接收並轉換後傳送到該通道。可選。
8 閘道器將回復訊息傳送到 reply-channel 時等待的時間。這僅適用於 reply-channel 可以阻塞的情況——例如,容量有限且當前已滿的 QueueChannel。預設值為無窮大。
9 當在 AsyncRabbitTemplatereceiveTimeout 屬性內未收到回覆訊息且此設定為 true 時,閘道器會將錯誤訊息傳送到入站訊息的 errorChannel 頭部。當在 AsyncRabbitTemplatereceiveTimeout 屬性內未收到回覆訊息且此設定為 false 時,閘道器會將錯誤訊息傳送到預設的 errorChannel(如果可用)。它預設為 true
10 傳送訊息時使用的路由鍵。預設情況下,這是一個空 String。與“routing-key-expression”互斥。可選。
11 一個 SpEL 表示式,用於評估傳送訊息時使用的路由鍵,訊息作為根物件(例如,payload.key)。預設情況下,這是一個空 String。與 routing-key 互斥。可選。
12 訊息的預設傳遞模式:PERSISTENTNON_PERSISTENT。如果 header-mapper 設定了傳遞模式,則覆蓋。如果存在 Spring Integration 訊息頭部(amqp_deliveryMode),則 DefaultHeaderMapper 設定該值。如果未提供此屬性且頭部對映器未設定它,則預設值取決於 RabbitTemplate 使用的底層 Spring AMQP MessagePropertiesConverter。如果未自定義,則預設值為 PERSISTENT。可選。
13 定義關聯資料的表示式。提供時,此表示式配置底層 AMQP 模板以接收發布者確認。需要專用的 RabbitTemplate 和一個將 publisherConfirms 屬性設定為 trueCachingConnectionFactory。當收到釋出者確認並提供關聯資料時,確認將寫入 confirm-ack-channelconfirm-nack-channel,具體取決於確認型別。確認的有效負載是此表示式定義的關聯資料,並且訊息的“amqp_publishConfirm”頭部設定為 trueack)或 falsenack)。對於 nack 例項,將提供一個額外的頭部(amqp_publishConfirmNackCause)。示例:headers['myCorrelationData']payload。如果表示式解析為 Message<?> 例項(例如“#this”),則在 ack/nack 通道上發出的訊息將基於該訊息,並新增額外的頭部。另請參閱 釋出者確認和返回的替代機制。可選。
14 傳送肯定(ack)釋出者確認的通道。有效負載是 confirm-correlation-expression 定義的關聯資料。要求底層 AsyncRabbitTemplateenableConfirms 屬性設定為 true。另請參閱 釋出者確認和返回的替代機制。可選(預設為 nullChannel)。
15 自 4.2 版起。傳送否定(nack)釋出者確認的通道。有效負載是 confirm-correlation-expression 定義的關聯資料。要求底層 AsyncRabbitTemplateenableConfirms 屬性設定為 true。另請參閱 釋出者確認和返回的替代機制。可選(預設為 nullChannel)。
16 設定後,如果在指定時間內(毫秒)未收到釋出者確認,閘道器將合成一個否定確認(nack)。待處理的確認每隔此值 50% 的時間檢查一次,因此傳送 nack 的實際時間將介於此值的 1 倍到 1.5 倍之間。另請參閱 釋出者確認和返回的替代機制。預設無(不會生成 nack)。
17 返回訊息傳送到的通道。提供時,底層 AMQP 模板配置為將無法傳遞的訊息返回到閘道器。訊息由從 AMQP 接收到的資料構建,並帶有以下附加頭部:amqp_returnReplyCodeamqp_returnReplyTextamqp_returnExchangeamqp_returnRoutingKey。要求底層 AsyncRabbitTemplatemandatory 屬性設定為 true。另請參閱 釋出者確認和返回的替代機制。可選。
18 當設定為 false 時,端點在應用程式上下文初始化期間嘗試連線到代理。這樣做可以透過在代理關閉時記錄錯誤訊息來實現“快速失敗”檢測錯誤配置。當設定為 true(預設值)時,連線將在傳送第一條訊息時建立(如果由於其他元件建立而尚未存在)。

另請參閱 非同步服務啟用器 以獲取更多資訊。

RabbitTemplate

當您使用確認和返回時,我們建議將連線到 AsyncRabbitTemplateRabbitTemplate 專用於此目的。否則,可能會遇到意外的副作用。

© . This site is unofficial and not affiliated with VMware.