出站通道介面卡

以下示例展示了 AMQP 出站通道介面卡的可用屬性

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate,
        MessageChannel amqpOutboundChannel) {
    return IntegrationFlow.from(amqpOutboundChannel)
            .handle(Amqp.outboundAdapter(amqpTemplate)
                        .routingKey("queue1")) // default exchange - route to queue 'queue1'
            .get();
}
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
    AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
    outbound.setRoutingKey("queue1"); // default exchange - route to queue 'queue1'
    return outbound;
}

@Bean
public MessageChannel amqpOutboundChannel() {
    return new DirectChannel();
}
<int-amqp:outbound-channel-adapter id="outboundAmqp"             (1)
                               channel="outboundChannel"         (2)
                               amqp-template="myAmqpTemplate"    (3)
                               exchange-name=""                  (4)
                               exchange-name-expression=""       (5)
                               order="1"                         (6)
                               routing-key=""                    (7)
                               routing-key-expression=""         (8)
                               default-delivery-mode""           (9)
                               confirm-correlation-expression="" (10)
                               confirm-ack-channel=""            (11)
                               confirm-nack-channel=""           (12)
                               confirm-timeout=""                (13)
                               wait-for-confirm=""               (14)
                               return-channel=""                 (15)
                               error-message-strategy=""         (16)
                               header-mapper=""                  (17)
                               mapped-request-headers=""         (18)
                               lazy-connect="true"               (19)
                               multi-send="false"/>              (20)
1 此介面卡的唯一 ID。可選。
2 訊息應傳送到此訊息通道,以便將其轉換併發布到 AMQP 交換機。必需。
3 配置好的 AMQP 模板的 Bean 引用。可選(預設為 amqpTemplate)。
4 訊息傳送到的 AMQP 交換機名稱。如果未提供,訊息將傳送到預設的無名稱交換機。與 'exchange-name-expression' 互斥。可選。
5 用於確定訊息傳送到的 AMQP 交換機名稱的 SpEL 表示式,訊息作為根物件。如果未提供,訊息將傳送到預設的無名稱交換機。與 'exchange-name' 互斥。可選。
6 註冊多個消費者時,此消費者的順序,從而實現負載均衡和故障轉移。可選(預設為 Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE])。
7 傳送訊息時使用的固定路由鍵。預設情況下,這是一個空 String。與 'routing-key-expression' 互斥。可選。
8 用於確定傳送訊息時使用的路由鍵的 SpEL 表示式,訊息作為根物件(例如,'payload.key')。預設情況下,這是一個空 String。與 'routing-key' 互斥。可選。
9 訊息的預設投遞模式:PERSISTENT(持久)或 NON_PERSISTENT(非持久)。如果 header-mapper 設定了投遞模式,則會覆蓋此設定。如果存在 Spring Integration 訊息頭 amqp_deliveryModeDefaultHeaderMapper 將設定該值。如果未提供此屬性且 header mapper 未設定,則預設值取決於 RabbitTemplate 使用的底層 Spring AMQP MessagePropertiesConverter。如果未對其進行任何自定義,則預設值為 PERSISTENT。可選。
10 一個定義關聯資料的表示式。提供此項時,將配置底層 AMQP 模板以接收 publisher 確認。需要專用的 RabbitTemplate 和一個 CachingConnectionFactory,且其 publisherConfirms 屬性設定為 true。當收到 publisher 確認並提供關聯資料時,根據確認型別,會將其寫入 confirm-ack-channelconfirm-nack-channel。確認的 payload 是此表示式定義的關聯資料。訊息包含一個 'amqp_publishConfirm' 頭,設定為 true (`ack`) 或 false (`nack`)。示例:headers['myCorrelationData']payload。版本 4.1 引入了 amqp_publishConfirmNackCause 訊息頭,它包含 publisher 確認 'nack' 的 cause。從版本 4.2 開始,如果表示式解析為 Message<?> 例項(例如 #this),則在 ack/nack 通道上發出的訊息將基於該訊息,並新增額外頭。之前,無論型別如何,都會建立一個以關聯資料作為 payload 的新訊息。另請參閱 Publisher 確認和 Return 的替代機制。可選。
11 用於傳送肯定 (`ack`) publisher 確認的通道。payload 是由 confirm-correlation-expression 定義的關聯資料。如果表示式是 #root#this,則訊息將基於原始訊息構建,並設定 amqp_publishConfirm 頭為 true。另請參閱 Publisher 確認和 Return 的替代機制。可選(預設為 nullChannel)。
12 用於傳送否定 (`nack`) publisher 確認的通道。payload 是由 confirm-correlation-expression 定義的關聯資料(如果未配置 ErrorMessageStrategy)。如果表示式是 #root#this,則訊息將基於原始訊息構建,並設定 amqp_publishConfirm 頭為 false。如果配置了 ErrorMessageStrategy,則訊息是一個 ErrorMessage,其 payload 是 NackedAmqpMessageException。另請參閱 Publisher 確認和 Return 的替代機制。可選(預設為 nullChannel)。
13 設定此項時,如果在指定毫秒時間內未收到 publisher 確認,介面卡將合成一個否定確認 (nack)。待處理的確認每隔此值的 50% 進行檢查,因此實際傳送 nack 的時間將介於此值的 1 倍到 1.5 倍之間。另請參閱 Publisher 確認和 Return 的替代機制。預設無(不生成 nack)。
14 設定為 true 時,呼叫執行緒將阻塞,等待 publisher 確認。這需要配置用於確認的 RabbitTemplate 以及 confirm-correlation-expression。執行緒將阻塞直到 confirm-timeout(或預設 5 秒)。如果發生超時,將丟擲 MessageTimeoutException。如果啟用了 returns 並且訊息被 returned,或在等待確認期間發生任何其他異常,將丟擲 MessageHandlingException,並附帶適當的訊息。
15 用於傳送 Returned 訊息的通道。提供此項時,將配置底層 AMQP 模板將無法投遞的訊息 Returned 給介面卡。如果未配置 ErrorMessageStrategy,則訊息將根據從 AMQP 接收到的資料構建,幷包含以下額外頭:amqp_returnReplyCodeamqp_returnReplyTextamqp_returnExchangeamqp_returnRoutingKey。如果配置了 ErrorMessageStrategy,則訊息是一個 ErrorMessage,其 payload 是 ReturnedAmqpMessageException。另請參閱 Publisher 確認和 Return 的替代機制。可選。
16 引用用於構建 ErrorMessage 例項的 ErrorMessageStrategy 實現,用於傳送 Returned 或否定確認的訊息。
17 傳送 AMQP 訊息時使用的 AmqpHeaderMapper 引用。預設情況下,只有標準 AMQP 屬性(例如 contentType)會被複制到 Spring Integration MessageHeaders。任何使用者定義的頭都不會被預設的 DefaultAmqpHeaderMapper 複製到訊息中。如果提供了 'request-header-names' 則不允許使用此項。可選。
18 逗號分隔的 AMQP 頭名稱列表,用於從 MessageHeaders 對映到 AMQP Message。如果提供了 'header-mapper' 引用,則不允許使用此項。此列表中的值也可以是與頭名稱匹配的簡單模式(例如 "*""thing1*, thing2""*thing1")。
19 設定為 false 時,端點會在應用程式上下文初始化期間嘗試連線到 broker。這允許“快速失敗”檢測不良配置,但也可能導致在 broker 關閉時初始化失敗。當設定為 true(預設值)時,連線會在傳送第一條訊息時建立(如果因其他元件已建立連線而尚未存在)。
20 設定為 true 時,型別為 Iterable<Message<?>> 的 payloads 將在單個 RabbitTemplate 呼叫範圍內,作為單獨的訊息傳送到同一通道。需要一個 RabbitTemplate。當 wait-for-confirms 為 true 時,訊息傳送後會呼叫 RabbitTemplate.waitForConfirmsOrDie()。對於事務性模板,傳送操作將在新事務或已啟動的事務中執行(如果存在)。
return-channel

使用 return-channel 需要一個 RabbitTemplate,其 mandatory 屬性設定為 true,以及一個 CachingConnectionFactory,其 publisherReturns 屬性設定為 true。當使用多個啟用 returns 的出站端點時,每個端點需要一個單獨的 RabbitTemplate