異步出站閘道器
上一節討論的閘道器是同步的,即傳送執行緒會一直掛起,直到收到回覆(或發生超時)。Spring Integration 4.3 版增加了非同步閘道器,它使用 Spring AMQP 的 AsyncRabbitTemplate。傳送訊息時,執行緒在傳送操作完成後立即返回;收到訊息時,回覆將在模板的監聽器容器執行緒上傳送。當閘道器在輪詢執行緒上呼叫時,這會非常有用。該執行緒被釋放,可用於框架中的其他任務。
以下列表顯示了 AMQP 異步出站閘道器的可能配置選項
-
Java DSL
-
Java
-
XML
@Configuration
public class AmqpAsyncApplication {
@Bean
public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {
return f -> f
.handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)
.routingKey("queue1")); // default exchange - route to queue 'queue1'
}
@MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")
public interface MyGateway {
String sendToRabbit(String data);
}
}
@Configuration
public class AmqpAsyncConfig {
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AsyncAmqpOutboundGateway amqpOutbound(AsyncRabbitTemplate asyncTemplate) {
AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate);
outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
return outbound;
}
@Bean
public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate,
SimpleMessageListenerContainer replyContainer) {
return new AsyncRabbitTemplate(rabbitTemplate, replyContainer);
}
@Bean
public SimpleMessageListenerContainer replyContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
container.setQueueNames("asyncRQ1");
return container;
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
}
<int-amqp:outbound-async-gateway id="asyncOutboundGateway" (1)
request-channel="myRequestChannel" (2)
async-template="" (3)
exchange-name="" (4)
exchange-name-expression="" (5)
order="1" (6)
reply-channel="" (7)
reply-timeout="" (8)
requires-reply="" (9)
routing-key="" (10)
routing-key-expression="" (11)
default-delivery-mode"" (12)
confirm-correlation-expression="" (13)
confirm-ack-channel="" (14)
confirm-nack-channel="" (15)
confirm-timeout="" (16)
return-channel="" (17)
lazy-connect="true" /> (18)
| 1 | 此介面卡的唯一 ID。可選。 |
| 2 | 訊息通道,訊息應傳送到該通道,以便將其轉換併發布到 AMQP 交換機。必需。 |
| 3 | 對已配置的 AsyncRabbitTemplate 的 Bean 引用。可選(預設為 asyncRabbitTemplate)。 |
| 4 | 訊息應傳送到的 AMQP 交換機名稱。如果未提供,訊息將傳送到預設的無名交換機。與“exchange-name-expression”互斥。可選。 |
| 5 | 一個 SpEL 表示式,用於評估訊息應傳送到的 AMQP 交換機名稱,訊息作為根物件。如果未提供,訊息將傳送到預設的無名交換機。與“exchange-name”互斥。可選。 |
| 6 | 當註冊了多個消費者時,此消費者的順序,從而實現負載均衡和故障轉移。可選(預設為 Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE])。 |
| 7 | 訊息通道,回覆應在從 AMQP 佇列接收並轉換後傳送到該通道。可選。 |
| 8 | 閘道器將回復訊息傳送到 reply-channel 時等待的時間。這僅適用於 reply-channel 可以阻塞的情況——例如,容量有限且當前已滿的 QueueChannel。預設值為無窮大。 |
| 9 | 當在 AsyncRabbitTemplate 的 receiveTimeout 屬性內未收到回覆訊息且此設定為 true 時,閘道器會將錯誤訊息傳送到入站訊息的 errorChannel 頭部。當在 AsyncRabbitTemplate 的 receiveTimeout 屬性內未收到回覆訊息且此設定為 false 時,閘道器會將錯誤訊息傳送到預設的 errorChannel(如果可用)。它預設為 true。 |
| 10 | 傳送訊息時使用的路由鍵。預設情況下,這是一個空 String。與“routing-key-expression”互斥。可選。 |
| 11 | 一個 SpEL 表示式,用於評估傳送訊息時使用的路由鍵,訊息作為根物件(例如,payload.key)。預設情況下,這是一個空 String。與 routing-key 互斥。可選。 |
| 12 | 訊息的預設傳遞模式:PERSISTENT 或 NON_PERSISTENT。如果 header-mapper 設定了傳遞模式,則覆蓋。如果存在 Spring Integration 訊息頭部(amqp_deliveryMode),則 DefaultHeaderMapper 設定該值。如果未提供此屬性且頭部對映器未設定它,則預設值取決於 RabbitTemplate 使用的底層 Spring AMQP MessagePropertiesConverter。如果未自定義,則預設值為 PERSISTENT。可選。 |
| 13 | 定義關聯資料的表示式。提供時,此表示式配置底層 AMQP 模板以接收發布者確認。需要專用的 RabbitTemplate 和一個將 publisherConfirms 屬性設定為 true 的 CachingConnectionFactory。當收到釋出者確認並提供關聯資料時,確認將寫入 confirm-ack-channel 或 confirm-nack-channel,具體取決於確認型別。確認的有效負載是此表示式定義的關聯資料,並且訊息的“amqp_publishConfirm”頭部設定為 true(ack)或 false(nack)。對於 nack 例項,將提供一個額外的頭部(amqp_publishConfirmNackCause)。示例:headers['myCorrelationData'],payload。如果表示式解析為 Message<?> 例項(例如“#this”),則在 ack/nack 通道上發出的訊息將基於該訊息,並新增額外的頭部。另請參閱 釋出者確認和返回的替代機制。可選。 |
| 14 | 傳送肯定(ack)釋出者確認的通道。有效負載是 confirm-correlation-expression 定義的關聯資料。要求底層 AsyncRabbitTemplate 的 enableConfirms 屬性設定為 true。另請參閱 釋出者確認和返回的替代機制。可選(預設為 nullChannel)。 |
| 15 | 自 4.2 版起。傳送否定(nack)釋出者確認的通道。有效負載是 confirm-correlation-expression 定義的關聯資料。要求底層 AsyncRabbitTemplate 的 enableConfirms 屬性設定為 true。另請參閱 釋出者確認和返回的替代機制。可選(預設為 nullChannel)。 |
| 16 | 設定後,如果在指定時間內(毫秒)未收到釋出者確認,閘道器將合成一個否定確認(nack)。待處理的確認每隔此值 50% 的時間檢查一次,因此傳送 nack 的實際時間將介於此值的 1 倍到 1.5 倍之間。另請參閱 釋出者確認和返回的替代機制。預設無(不會生成 nack)。 |
| 17 | 返回訊息傳送到的通道。提供時,底層 AMQP 模板配置為將無法傳遞的訊息返回到閘道器。訊息由從 AMQP 接收到的資料構建,並帶有以下附加頭部:amqp_returnReplyCode、amqp_returnReplyText、amqp_returnExchange 和 amqp_returnRoutingKey。要求底層 AsyncRabbitTemplate 的 mandatory 屬性設定為 true。另請參閱 釋出者確認和返回的替代機制。可選。 |
| 18 | 當設定為 false 時,端點在應用程式上下文初始化期間嘗試連線到代理。這樣做可以透過在代理關閉時記錄錯誤訊息來實現“快速失敗”檢測錯誤配置。當設定為 true(預設值)時,連線將在傳送第一條訊息時建立(如果由於其他元件建立而尚未存在)。 |
另請參閱 非同步服務啟用器 以獲取更多資訊。
|
RabbitTemplate
當您使用確認和返回時,我們建議將連線到 |