入站通道介面卡

以下列表顯示了 AMQP 入站通道介面卡可能的配置選項

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}
@Bean
public MessageChannel amqpInputChannel() {
    return new DirectChannel();
}

@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
        @Qualifier("amqpInputChannel") MessageChannel channel) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(channel);
    return adapter;
}

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
                               new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames("aName");
    container.setConcurrentConsumers(2);
    // ...
    return container;
}

@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println(message.getPayload());
        }

    };
}
<int-amqp:inbound-channel-adapter
                                  id="inboundAmqp"                (1)
                                  channel="inboundChannel"        (2)
                                  queue-names="si.test.queue"     (3)
                                  acknowledge-mode="AUTO"         (4)
                                  advice-chain=""                 (5)
                                  channel-transacted=""           (6)
                                  concurrent-consumers=""         (7)
                                  connection-factory=""           (8)
                                  error-channel=""                (9)
                                  expose-listener-channel=""      (10)
                                  header-mapper=""                (11)
                                  mapped-request-headers=""       (12)
                                  listener-container=""           (13)
                                  message-converter=""            (14)
                                  message-properties-converter="" (15)
                                  phase=""                        (16)
                                  prefetch-count=""               (17)
                                  receive-timeout=""              (18)
                                  recovery-interval=""            (19)
                                  missing-queues-fatal=""         (20)
                                  shutdown-timeout=""             (21)
                                  task-executor=""                (22)
                                  transaction-attribute=""        (23)
                                  transaction-manager=""          (24)
                                  batch-size=""                   (25)
                                  consumers-per-queue             (26)
                                  batch-mode="MESSAGES"/>         (27)
1 此介面卡的唯一 ID。可選。
2 轉換後的訊息應傳送到的訊息通道。必需。
3 應從中消費訊息的 AMQP 佇列名稱(逗號分隔列表)。必需。
4 MessageListenerContainer 的確認模式。當設定為 MANUAL 時,交付標籤和通道分別在訊息頭 amqp_deliveryTagamqp_channel 中提供。使用者應用程式負責確認。NONE 表示不確認(autoAck)。AUTO 表示介面卡的容器在下游流程完成時進行確認。可選(預設為 AUTO)。參見 入站端點確認模式
5 處理與此入站通道介面卡相關的橫切行為的額外 AOP 通知。可選。
6 標誌,指示此元件建立的通道是否是事務性的。如果為 true,它會告訴框架使用事務性通道,並根據結果以提交或回滾結束所有操作(傳送或接收),如果出現異常則表示回滾。可選(預設為 false)。
7 指定要建立的併發消費者數量。預設值為 1。我們建議提高併發消費者數量以擴充套件從佇列接收訊息的消費能力。但是請注意,一旦註冊了多個消費者,任何排序保證都將丟失。通常,對於低流量佇列,請使用一個消費者。當設定了 'consumers-per-queue' 時不允許。可選。
8 RabbitMQ ConnectionFactory 的 Bean 引用。可選(預設為 connectionFactory)。
9 錯誤訊息應傳送到的訊息通道。可選。
10 偵聽器通道(com.rabbitmq.client.Channel)是否暴露給已註冊的 ChannelAwareMessageListener。可選(預設為 true)。
11 接收 AMQP 訊息時使用的 AmqpHeaderMapper 引用。可選。預設情況下,只有標準 AMQP 屬性(如 contentType)被複制到 Spring Integration MessageHeaders。預設的 DefaultAmqpHeaderMapper 不會將 AMQP MessageProperties 中任何使用者定義的頭複製到訊息中。如果提供了 request-header-names 則不允許。
12 要從 AMQP 請求對映到 MessageHeaders 的 AMQP 頭名稱的逗號分隔列表。僅當未提供 'header-mapper' 引用時才能提供。此列表中的值也可以是與頭名稱匹配的簡單模式(例如"*"或"thing1*, thing2"或"*something")。
13 接收 AMQP 訊息時使用的 AbstractMessageListenerContainer 引用。如果提供了此屬性,則不應提供與偵聽器容器配置相關的任何其他屬性。換句話說,透過設定此引用,您必須完全負責偵聽器容器配置。唯一的例外是 MessageListener 本身。由於這實際上是此通道介面卡實現的核心職責,因此引用的偵聽器容器不得擁有自己的 MessageListener。可選。
14 接收 AMQP 訊息時使用的 MessageConverter。可選。
15 接收 AMQP 訊息時使用的 MessagePropertiesConverter。可選。
16 指定底層 AbstractMessageListenerContainer 啟動和停止的階段。啟動順序從最低到最高,關閉順序與此相反。預設情況下,此值為 Integer.MAX_VALUE,這意味著此容器儘可能晚啟動並儘可能早停止。可選。
17 告訴 AMQP 代理一次請求中向每個消費者傳送多少條訊息。通常,您可以將此值設定得很高以提高吞吐量。它應該大於或等於事務大小(參見此列表後面的 batch-size 屬性)。可選(預設為 1)。
18 接收超時,單位為毫秒。可選(預設為 1000)。
19 指定底層 AbstractMessageListenerContainer 恢復嘗試之間的間隔(單位為毫秒)。可選(預設為 5000)。
20 如果為 'true' 且代理上沒有任何佇列可用,則容器在啟動期間會丟擲致命異常並停止(如果在容器執行時佇列被刪除,則在三次嘗試被動宣告佇列後)。如果為 false,則容器不會丟擲異常並進入恢復模式,根據 recovery-interval 嘗試重新啟動。可選(預設為 true)。
21 在底層 AbstractMessageListenerContainer 停止後,以及在 AMQP 連線強制關閉之前,等待工作執行緒的時間(以毫秒為單位)。如果在關閉訊號到來時有任何工作執行緒處於活動狀態,只要它們能在此超時內完成處理,就允許它們完成。否則,連線將關閉,訊息將保持未確認狀態(如果通道是事務性的)。可選(預設為 5000)。
22 預設情況下,底層 AbstractMessageListenerContainer 使用 SimpleAsyncTaskExecutor 實現,它為每個任務啟動一個新執行緒,非同步執行。預設情況下,併發執行緒的數量是無限的。請注意,此實現不重用執行緒。考慮使用執行緒池 TaskExecutor 實現作為替代。可選(預設為 SimpleAsyncTaskExecutor)。
23 預設情況下,底層 AbstractMessageListenerContainer 建立 DefaultTransactionAttribute 的新例項(它採用 EJB 方法,對執行時異常回滾但不對已檢查異常回滾)。可選(預設為 DefaultTransactionAttribute)。
24 在底層 AbstractMessageListenerContainer 上設定對外部 PlatformTransactionManager 的 bean 引用。事務管理器與 channel-transacted 屬性協同工作。如果在框架傳送或接收訊息時已經存在事務並且 channelTransacted 標誌為 true,則訊息事務的提交或回滾將推遲到當前事務結束。如果 channelTransacted 標誌為 false,則訊息操作不適用事務語義(它會自動確認)。有關更多資訊,請參閱 使用 Spring AMQP 進行事務。可選。
25 告訴 SimpleMessageListenerContainer 在單個請求中處理多少條訊息。為了獲得最佳結果,它應該小於或等於 prefetch-count 中設定的值。當設定了 'consumers-per-queue' 時不允許。可選(預設為 1)。
26 指示底層偵聽器容器應該是 DirectMessageListenerContainer 而不是預設的 SimpleMessageListenerContainer。有關更多資訊,請參閱 Spring AMQP 參考手冊
27 當容器的 consumerBatchEnabledtrue 時,確定介面卡如何在訊息有效負載中呈現訊息批次。當設定為 MESSAGES(預設)時,有效負載是一個 List<Message<?>>,其中每條訊息都有從傳入 AMQP Message 對映的頭,並且有效負載是轉換後的 body。當設定為 EXTRACT_PAYLOADS 時,有效負載是一個 List<?>,其中元素是從 AMQP Message 正文轉換而來的。EXTRACT_PAYLOADS_WITH_HEADERS 類似於 EXTRACT_PAYLOADS,但此外,每條訊息的頭都從 MessageProperties 對映到相應索引處的 List<Map<String, Object>;頭名稱是 AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS
容器

請注意,當使用 XML 配置外部容器時,您不能使用 Spring AMQP 名稱空間來定義容器。這是因為名稱空間至少需要一個 <listener/> 元素。在此環境中,偵聽器是介面卡內部的。因此,您必須使用正常的 Spring <bean/> 定義來定義容器,如下例所示

<bean id="container"
 class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="aName.queue" />
    <property name="defaultRequeueRejected" value="false"/>
</bean>
儘管 Spring Integration JMS 和 AMQP 的支援類似,但存在重要差異。JMS 入站通道介面卡在底層使用 JmsDestinationPollingSource,並期望配置輪詢器。AMQP 入站通道介面卡使用 AbstractMessageListenerContainer 並且是訊息驅動的。在這方面,它更類似於 JMS 訊息驅動通道介面卡。

從版本 5.5 開始,AmqpInboundChannelAdapter 可以配置 org.springframework.amqp.rabbit.retry.MessageRecoverer 策略,該策略在內部呼叫重試操作時用於 RecoveryCallback。有關更多資訊,請參閱 setMessageRecoverer() JavaDoc。

@Publisher 註解也可以與 @RabbitListener 結合使用

@Configuration
@EnableIntegration
@EnableRabbit
@EnablePublisher
public static class ContextConfiguration {

    @Bean
    QueueChannel fromRabbitViaPublisher() {
        return new QueueChannel();
    }

    @RabbitListener(queuesToDeclare = @Queue("publisherQueue"))
    @Publisher("fromRabbitViaPublisher")
    @Payload("#args.payload.toUpperCase()")
    public void consumeForPublisher(String payload) {

    }

}

預設情況下,@Publisher AOP 攔截器處理方法呼叫的返回值。但是,@RabbitListener 方法的返回值被視為 AMQP 回覆訊息。因此,這種方法不能與 @Publisher 一起使用,所以建議使用 @Payload 註解和針對方法引數的相應 SpEL 表示式。有關 @Publisher 的更多資訊,請參見 註解驅動配置 部分。

當在偵聽器容器中使用排他或單活動消費者時,建議將容器屬性 forceStop 設定為 true。這將防止在停止容器後,另一個消費者在此例項完全停止之前開始消費訊息的競態條件。

批次訊息

有關批次訊息的更多資訊,請參閱 Spring AMQP 文件

要使用 Spring Integration 生成批次訊息,只需使用 BatchingRabbitTemplate 配置出站端點。

在接收批次訊息時,預設情況下,偵聽器容器會提取每個分段訊息,介面卡會為每個分段生成一個 Message<?>。從版本 5.2 開始,如果容器的 deBatchingEnabled 屬性設定為 false,則由介面卡而不是容器執行解批處理,並生成一個包含分段有效負載列表(如果適用則進行轉換)的單個 Message<List<?>>。該訊息的有效負載是分段有效負載的列表(如果適用,則進行轉換)。

預設的 BatchingStrategySimpleBatchingStrategy,但這可以在介面卡上被覆蓋。

當需要重試操作的恢復時,org.springframework.amqp.rabbit.retry.MessageBatchRecoverer 必須與批處理一起使用。
© . This site is unofficial and not affiliated with VMware.