異步出站閘道器
前面討論的閘道器是同步的,這意味著傳送執行緒會被掛起,直到接收到回覆(或發生超時)。Spring Integration 4.3 版本添加了非同步閘道器,它使用了 Spring AMQP 的 AsyncRabbitTemplate
。傳送訊息時,執行緒在傳送操作完成後立即返回;接收訊息時,回覆在模板的監聽容器執行緒上傳送。當閘道器在輪詢執行緒上被呼叫時,這會非常有用。執行緒會被釋放,並可用於框架中的其他任務。
以下列表顯示了 AMQP 異步出站閘道器可能的配置選項
-
Java DSL
-
Java
-
XML
@Configuration
public class AmqpAsyncApplication {
@Bean
public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {
return f -> f
.handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)
.routingKey("queue1")); // default exchange - route to queue 'queue1'
}
@MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")
public interface MyGateway {
String sendToRabbit(String data);
}
}
@Configuration
public class AmqpAsyncConfig {
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AsyncAmqpOutboundGateway amqpOutbound(AsyncRabbitTemplate asyncTemplate) {
AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate);
outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
return outbound;
}
@Bean
public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate,
SimpleMessageListenerContainer replyContainer) {
return new AsyncRabbitTemplate(rabbitTemplate, replyContainer);
}
@Bean
public SimpleMessageListenerContainer replyContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
container.setQueueNames("asyncRQ1");
return container;
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
}
<int-amqp:outbound-async-gateway id="asyncOutboundGateway" (1)
request-channel="myRequestChannel" (2)
async-template="" (3)
exchange-name="" (4)
exchange-name-expression="" (5)
order="1" (6)
reply-channel="" (7)
reply-timeout="" (8)
requires-reply="" (9)
routing-key="" (10)
routing-key-expression="" (11)
default-delivery-mode"" (12)
confirm-correlation-expression="" (13)
confirm-ack-channel="" (14)
confirm-nack-channel="" (15)
confirm-timeout="" (16)
return-channel="" (17)
lazy-connect="true" /> (18)
1 | 此介面卡的唯一 ID。可選。 |
2 | 應傳送訊息到此訊息通道,以便將其轉換併發布到 AMQP 交換機。必需。 |
3 | 對已配置的 AsyncRabbitTemplate Bean 的引用。可選(預設為 asyncRabbitTemplate )。 |
4 | 應傳送訊息到此 AMQP 交換機的名稱。如果未提供,訊息將傳送到預設的無名交換機。與 'exchange-name-expression' 互斥。可選。 |
5 | 一個 SpEL 表示式,用於確定訊息應傳送到的 AMQP 交換機的名稱,訊息作為根物件。如果未提供,訊息將傳送到預設的無名交換機。與 'exchange-name' 互斥。可選。 |
6 | 當註冊了多個消費者時,此消費者的順序,從而啟用負載均衡和故障轉移。可選(預設為 Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE] )。 |
7 | 從 AMQP 佇列接收並轉換後,回覆應傳送到此訊息通道。可選。 |
8 | 閘道器在傳送回覆訊息到 reply-channel 時等待的時間。這僅適用於 reply-channel 可能阻塞的情況——例如,容量已滿的 QueueChannel 。預設是無限。 |
9 | 當在 AsyncRabbitTemplate 的 receiveTimeout 屬性指定的時間內未接收到回覆訊息且此設定為 true 時,閘道器會將錯誤訊息傳送到入站訊息的 errorChannel 訊息頭。當在 AsyncRabbitTemplate 的 receiveTimeout 屬性指定的時間內未接收到回覆訊息且此設定為 false 時,閘道器會將錯誤訊息傳送到預設的 errorChannel (如果可用)。預設為 true 。 |
10 | 傳送訊息時使用的路由鍵。預設情況下,這是一個空的 String 。與 'routing-key-expression' 互斥。可選。 |
11 | 一個 SpEL 表示式,用於確定傳送訊息時使用的路由鍵,訊息作為根物件(例如,'payload.key')。預設情況下,這是一個空的 String 。與 'routing-key' 互斥。可選。 |
12 | 訊息的預設投遞模式:PERSISTENT 或 NON_PERSISTENT 。如果 header-mapper 設定了投遞模式,則會被覆蓋。如果存在 Spring Integration 訊息頭 (amqp_deliveryMode ),則 DefaultHeaderMapper 會設定其值。如果未提供此屬性且 header mapper 未設定,則預設值取決於 RabbitTemplate 使用的底層 Spring AMQP MessagePropertiesConverter 。如果未自定義該轉換器,則預設是 PERSISTENT 。可選。 |
13 | 一個表示式,用於定義關聯資料。提供此表示式後,會配置底層 AMQP 模板接收發布者確認。需要一個專用的 RabbitTemplate 和一個設定了 publisherConfirms 屬性為 true 的 CachingConnectionFactory 。當接收到釋出者確認並提供了關聯資料時,確認會寫入到 confirm-ack-channel 或 confirm-nack-channel ,具體取決於確認型別。確認的載荷是此表示式定義的關聯資料,並且訊息的 'amqp_publishConfirm' 頭會被設定為 true (ack ) 或 false (nack )。對於 nack 例項,會提供一個額外的頭 (amqp_publishConfirmNackCause )。示例:headers['myCorrelationData'] , payload 。如果表示式解析為 Message<?> 例項(例如“#this”),則在 ack /nack 通道上發出的訊息將基於該訊息,並新增額外的頭。另請參見 釋出者確認和返回的替代機制。可選。 |
14 | 積極 (ack ) 釋出者確認將傳送到此通道。載荷是 confirm-correlation-expression 定義的關聯資料。要求底層的 AsyncRabbitTemplate 將其 enableConfirms 屬性設定為 true 。另請參見 釋出者確認和返回的替代機制。可選(預設為 nullChannel )。 |
15 | 自 4.2 版本起。消極 (nack ) 釋出者確認將傳送到此通道。載荷是 confirm-correlation-expression 定義的關聯資料。要求底層的 AsyncRabbitTemplate 將其 enableConfirms 屬性設定為 true 。另請參見 釋出者確認和返回的替代機制。可選(預設為 nullChannel )。 |
16 | 設定此值後,如果在指定毫秒時間內未收到釋出者確認,閘道器將合成一個消極確認 (nack)。待確認項每隔此值的 50% 進行檢查,因此實際傳送 nack 的時間將在該值的 1 倍到 1.5 倍之間。另請參見 釋出者確認和返回的替代機制。預設無(不會生成 nacks)。 |
17 | 退回的訊息將傳送到此通道。提供此項後,底層 AMQP 模板會被配置為將無法投遞的訊息返回到閘道器。訊息將根據從 AMQP 接收的資料構建,幷包含以下附加頭:amqp_returnReplyCode , amqp_returnReplyText , amqp_returnExchange 和 amqp_returnRoutingKey 。要求底層的 AsyncRabbitTemplate 將其 mandatory 屬性設定為 true 。另請參見 釋出者確認和返回的替代機制。可選。 |
18 | 當設定為 false 時,端點會在應用上下文初始化期間嘗試連線到 broker。這樣做可以透過在 broker 關閉時記錄錯誤訊息來快速檢測錯誤的配置。“快速失敗”。當設定為 true (預設)時,連線會在傳送第一條訊息時建立(如果它尚未因其他元件而存在)。 |
另請參見 非同步服務啟用器 以獲取更多資訊。
RabbitTemplate
當您使用確認和返回時,建議連線到 |