AMQP 1.0 支援

從版本 7.0 開始,Spring Integration 為 RabbitMQ AMQP 1.0 支援提供了通道介面卡。這些通道介面卡基於 org.springframework.amqp:spring-rabbitmq-client 庫。

Spring AMQP 文件提供了關於 RabbitMQ AMQP 1.0 支援的更多詳細資訊。

AMQP 1.0 出站通道介面卡

AmqpClientMessageHandlerAbstractReplyProducingMessageHandler 的實現,可以根據 setRequiresReply() 配置作為單向通道介面卡或出站閘道器。此通道介面卡例項需要一個用於 AMQP 1.0 協議的 AsyncAmqpTemplate 實現,例如來自上述 spring-rabbitmq-client 庫的 RabbitAmqpTemplate。此訊息處理程式預設是非同步的;因此,釋出錯誤應透過請求訊息中的 errorChannel 頭部或應用程式上下文中的全域性預設 errorChannel 來處理。

釋出訊息的 exchange(以及可選的 routingKey)與要釋出的 queue 互斥。如果兩者都沒有提供,那麼 AsyncAmqpTemplate 實現必須確保這些目標部分的一些預設值;否則訊息將被拒絕,因為它未送達。

預設情況下,MessageConverter 是一個 org.springframework.amqp.support.converter.SimpleMessageConverter,它處理 String、可序列化例項和位元組陣列。此外,預設的 AmqpHeaderMapper 是一個 DefaultAmqpHeaderMapper.outboundMapper()。此頭部對映器還用於將 AMQP 訊息屬性映射回回覆中的頭部。

在閘道器模式下,可以提供 replyPayloadType 以轉換回復訊息體。但是,MessageConverter 必須是 SmartMessageConverter 的實現,例如 JacksonJsonMessageConverter。此外,與 replyPayloadType 互斥的是,可以將 returnMessage 標誌設定為 true 以返回整個 org.springframework.amqp.core.Message 例項作為回覆訊息負載。

以下示例演示如何將 AmqpClientMessageHandler 配置為簡單的 @ServiceActivator

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • Java

@Bean
IntegrationFlow sendFlow(RabbitAmqpTemplate rabbitTemplate) {
    return f -> f
            .handle(AmqpClient.outboundAdapter(rabbitTemplate)
                    .exchange("e1")
                    .routingKeyExpression("'k1'"));
}
@Bean
fun sendFlow(rabbitTemplate: RabbitAmqpTemplate) =
    integrationFlow {
                handle(AmqpClient.outboundAdapter(rabbitTemplate)
    		            .apply {
    		                exchange("e1")
                            routingKeyExpression("'k1'")
    		            }
    		    )
    }
@Bean
sendFlow() {
    integrationFlow {
        handle(AmqpClient.outboundAdapter(rabbitTemplate)
                .with {
                     exchange 'e1'
                     routingKeyExpression '''k1'''
                }
        )
    }
}
@Bean
@ServiceActivator(inputChannel = "amqpClientSendChannel")
AmqpClientMessageHandler amqpClientMessageHandler(RabbitAmqpTemplate rabbitTemplate) {
    AmqpClientMessageHandler messageHandler = new AmqpClientMessageHandler(rabbitTemplate);
    messageHandler.setExchangeExpressionString("headers[exchange]");
    messageHandler.setRoutingKeyExpressionString("headers[routingKey]");
    return messageHandler;
}

AmqpClientMessageHandler 的閘道器變體可以是

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • Java

@Bean
IntegrationFlow requestReplyOutboundFlow(RabbitAmqpTemplate rabbitTemplate) {
    return f -> f
            .handle(AmqpClient.outboundGateway(rabbitTemplate)
                    .queueFunction(m -> "requestReply"));
}
@Bean
fun sendFlow(rabbitTemplate: RabbitAmqpTemplate) =
    integrationFlow {
                handle(AmqpClient.outboundGateway(rabbitTemplate)
    		            .queueFunction { "requestReply" }
                )
    }
@Bean
sendFlow() {
    integrationFlow {
        handle(AmqpClient.outboundGateway(rabbitTemplate)
                .with {
                     queueFunction { 'requestReply' }
                }
        )
    }
}
@Bean
@ServiceActivator(inputChannel = "amqpClientSendAndReceiveChannel")
AmqpClientMessageHandler amqpClientGateway(RabbitAmqpTemplate rabbitTemplate) {
    AmqpClientMessageHandler messageHandler = new AmqpClientMessageHandler(rabbitTemplate);
    messageHandler.setRequiresReply(true);
    messageHandler.setReplyPayloadType(String.class);
    messageHandler.setMessageConverter(new JacksonJsonMessageConverter());
    messageHandler.setQueue("q1");
    return messageHandler;
}

AMQP 1.0 訊息驅動通道介面卡

AmqpClientMessageProducerMessageProducerSupport 的實現,作為訊息驅動通道介面卡,透過 RabbitMQ AMQP 1.0 協議從佇列消費訊息。它需要一個 AmqpConnectionFactory 和至少一個要消費的佇列。其內部邏輯基於 RabbitAmqpListenerContainerIntegrationRabbitAmqpMessageListener,用於將消費的 AMQP 訊息(轉換後)中繼到 outputChannelRabbitAmqpListenerContainer 的一些配置選項作為 AmqpClientMessageProducer 的設定器暴露。

預設情況下,MessageConverter 是一個 org.springframework.amqp.support.converter.SimpleMessageConverter,它處理 String、可序列化例項和位元組陣列。此外,預設的 AmqpHeaderMapper 是一個 DefaultAmqpHeaderMapper.inboundMapper()。可以將 messageConverter 選項設定為 null 以完全跳過轉換(包括頭部對映),並返回收到的 AMQP 訊息作為要生成的 Spring 訊息的負載。

此外,AmqpClientMessageProducer 實現了 Pausable 契約,並委託給相應的 RabbitAmqpListenerContainer API。

AmqpClientMessageProducer.setBatchSize() > 1 時,此通道介面卡以批處理模式工作。在這種情況下,接收到的訊息將被收集,直到達到批處理大小,或 batchReceiveTimeout 期限耗盡。然後,所有批處理的 AMQP 訊息將被轉換為 Spring 訊息,並且結果列表將作為包裝訊息的負載生成,以傳送到 outputChannel。批處理模式由於一次性結算所有批處理訊息而帶來一些效能提升。

autoSettle 標誌設定為 false 時,AcknowledgmentCallback 例項作為 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 訊息頭部提供,以對接收到的訊息或整個批處理做出結算決策。

以下示例演示如何將 AmqpClientMessageProducer 配置為簡單的入站端點

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • Java

@Bean
IntegrationFlow receiveFlow(AmqpConnectionFactory connectionFactory) {
    return IntegrationFlow.from(AmqpClient.inboundChannelAdapter(connectionFactory, "q1"))
            .channel(c -> c.queue("receiveChannel"))
            .get();
}
@Bean
fun receiveFlow(connectionFactory: AmqpConnectionFactory) =
        integrationFlow(AmqpClient.inboundChannelAdapter(connectionFactory, "q1")) {
            channel("inputChannel")
        }
@Bean
receiveFlow(AmqpConnectionFactory connectionFactory) {
    integrationFlow(AmqpClient.inboundChannelAdapter(connectionFactory, 'q1')) {
        channel 'inputChannel'
    }
}
@Bean
AmqpClientMessageProducer batchAmqpClientMessageProducer(AmqpConnectionFactory connectionFactory,
        QueueChannel inputChannel) {

    AmqpClientMessageProducer amqpClientMessageProducer = new AmqpClientMessageProducer(connectionFactory, "q3");
    amqpClientMessageProducer.setOutputChannel(inputChannel);
    amqpClientMessageProducer.setBatchSize(2);
    return amqpClientMessageProducer;
}

AMQP 1.0 入站閘道器

AmqpClientInboundGatewayMessagingGatewaySupport 的實現,用於透過 RabbitMQ AMQP 1.0 協議接收請求並生成回覆。它與上述 AmqpClientMessageProducer 類似,並共享許多 RabbitAmqpListenerContainer 配置選項。此外,為了生成 AMQP 1.0 回覆,AmqpClientInboundGateway 內部使用 RabbitAmqpTemplate

為了自動將回復與其請求關聯起來,必須提供請求訊息的 replyTo 屬性。例如,RabbitAmqpTemplate.sendAndReceive() 依賴於 RabbitMQ AMQP 1.0 庫中的 RpcClient,該庫生成一個獨佔且自動刪除的佇列。或者,回覆地址可以作為 AmqpClientInboundGateway 上的 replyExchange(和可選的 replyRoutingKey)或 replyQueue(但不能同時設定)來設定,這些選項將委託給 RabbitAmqpTemplate 的預設選項。messageIdcorrelationId 請求訊息屬性可以用於與回覆關聯。如果缺失,RabbitAmqpTemplate.sendAndReceive() 中的 RpcClient 會生成一個。AmqpClientInboundGateway 能夠將此類關聯鍵映射回回覆訊息。

以下示例演示如何將 AmqpClientInboundGateway 配置為簡單的入站閘道器

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • Java

@Bean
IntegrationFlow amqpClientInboundGatewayFlow(AmqpConnectionFactory connectionFactory) {
    return IntegrationFlow.from(AmqpClient.inboundGateway(connectionFactory, "requestReply"))
            .channel(c -> c.queue("inputChannel"))
            .get();
}
@Bean
fun receiveFlow(connectionFactory: AmqpConnectionFactory) =
        integrationFlow(AmqpClient.inboundGateway(connectionFactory, "requestReply")) {
            channel { queue("inputChannel") }
        }
@Bean
receiveFlow(AmqpConnectionFactory connectionFactory) {
    integrationFlow(AmqpClient.inboundGateway(connectionFactory, 'requestReply')) {
        channel { queue 'inputChannel' }
    }
}
@Bean
AmqpClientInboundGateway amqpClientInboundGateway(AmqpConnectionFactory connectionFactory) {
    AmqpClientInboundGateway amqpClientInboundGateway = new AmqpClientInboundGateway(connectionFactory, "requestReply");
    amqpClientInboundGateway.setRequestChannelName("inputChannel");
    return amqpClientInboundGateway;
}
© . This site is unofficial and not affiliated with VMware.