MQTT 支援

Spring Integration 提供入站和出站通道介面卡,以支援訊息佇列遙測傳輸 (MQTT) 協議。

專案需要此依賴項

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-mqtt:7.0.0"

當前實現使用 Eclipse Paho MQTT 客戶端 庫。從版本 6.5 開始,org.eclipse.paho:org.eclipse.paho.client.mqttv3 依賴項是一個 optional 依賴項,因此必須顯式包含在目標專案中以支援 MQTT v3。

XML 配置和本章的大部分內容都涉及 MQTT v3.1 協議支援和相應的 Paho 客戶端。有關相應的協議支援,請參閱 MQTT v5 支援 段落。

兩個介面卡的配置都透過使用 DefaultMqttPahoClientFactory 來實現。有關配置選項的更多資訊,請參閱 Paho 文件。

我們建議配置一個 MqttConnectOptions 物件並將其注入工廠,而不是在工廠本身上設定(已棄用的)選項。

入站(訊息驅動)通道介面卡

入站通道介面卡由 MqttPahoMessageDrivenChannelAdapter 實現。為方便起見,您可以使用名稱空間對其進行配置。最簡單的配置可能如下所示

<bean id="clientFactory"
        class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
    <property name="connectionOptions">
        <bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
            <property name="userName" value="${mqtt.username}"/>
            <property name="password" value="${mqtt.password}"/>
        </bean>
    </property>
</bean>

<int-mqtt:message-driven-channel-adapter id="mqttInbound"
    client-id="${mqtt.default.client.id}.src"
    url="${mqtt.url}"
    topics="sometopic"
    client-factory="clientFactory"
    channel="output"/>

以下列表顯示了可用的屬性

<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
    client-id="foo"  (1)
    url="tcp://:1883"  (2)
    topics="bar,baz"  (3)
    qos="1,2"  (4)
    converter="myConverter"  (5)
    client-factory="clientFactory"  (6)
    send-timeout="123"  (7)
    error-channel="errors"  (8)
    recovery-interval="10000"  (9)
    manual-acks="false" (10)
    channel="out" />
1 客戶端 ID。
2 代理 URL。
3 此介面卡接收訊息的主題的逗號分隔列表。
4 QoS 值的逗號分隔列表。它可以是應用於所有主題的單個值,也可以是每個主題的值,在這種情況下,列表的長度必須相同。
5 一個 MqttMessageConverter(可選)。預設情況下,預設的 DefaultPahoMessageConverter 生成一個帶有 String 有效載荷的訊息,並帶有以下頭部資訊
  • mqtt_topic: 接收訊息的主題

  • mqtt_duplicate: 如果訊息是重複的,則為 true

  • mqtt_qos: 服務質量 您可以透過將其宣告為 <bean/> 並將 payloadAsBytes 屬性設定為 true,來配置 DefaultPahoMessageConverter 以在有效載荷中返回原始 byte[]

6 客戶端工廠。
7 send() 超時。它僅在通道可能阻塞時(例如,當前已滿的有界 QueueChannel)才適用。
8 錯誤通道。如果提供,下游異常將以 ErrorMessage 的形式傳送到此通道。有效載荷是一個 MessagingException,其中包含失敗的訊息和原因。
9 恢復間隔。它控制介面卡在失敗後嘗試重新連線的間隔。預設為 10000ms(十秒)。
10 確認模式;設定為 true 表示手動確認。
從版本 4.1 開始,您可以省略 URL。相反,您可以在 DefaultMqttPahoClientFactoryserverURIs 屬性中提供伺服器 URI。這樣做可以實現,例如,連線到高可用 (HA) 叢集。

從版本 4.2.2 開始,當介面卡成功訂閱主題時,會發佈一個 MqttSubscribedEvent。當連線或訂閱失敗時,會發布 MqttConnectionFailedEvent 事件。這些事件可以由實現 ApplicationListener 的 bean 接收。

此外,一個名為 recoveryInterval 的新屬性控制介面卡在失敗後嘗試重新連線的間隔。它預設為 10000ms(十秒)。

在 4.2.3 版本之前,當介面卡停止時,客戶端總是取消訂閱。這是不正確的,因為如果客戶端 QoS 大於 0,我們需要保持訂閱處於活動狀態,以便在介面卡停止時到達的訊息在下次啟動時交付。這還需要將客戶端工廠上的 cleanSession 屬性設定為 false。它預設為 true

從版本 4.2.3 開始,如果 cleanSession 屬性為 false,則介面卡不會取消訂閱(預設情況下)。

可以透過在工廠上設定 consumerCloseAction 屬性來覆蓋此行為。它可以具有以下值:UNSUBSCRIBE_ALWAYSUNSUBSCRIBE_NEVERUNSUBSCRIBE_CLEAN。後者(預設值)僅在 cleanSession 屬性為 true 時取消訂閱。

要恢復到 4.2.3 之前的行為,請使用 UNSUBSCRIBE_ALWAYS

從版本 5.0 開始,topicqosretained 屬性被對映到 .RECEIVED_…​ 頭部(MqttHeaders.RECEIVED_TOPICMqttHeaders.RECEIVED_QOSMqttHeaders.RECEIVED_RETAINED),以避免意外傳播到出站訊息,該訊息(預設情況下)使用 MqttHeaders.TOPICMqttHeaders.QOSMqttHeaders.RETAINED 頭部。

執行時新增和刪除主題

從版本 4.1 開始,您可以以程式設計方式更改介面卡訂閱的主題。Spring Integration 提供了 addTopic()removeTopic() 方法。新增主題時,您可以選擇指定 QoS(預設值:1)。您還可以透過向 <control-bus/> 傳送適當的訊息並帶有適當的有效載荷來修改主題——例如:"myMqttAdapter.addTopic('foo', 1)"

停止和啟動介面卡對主題列表沒有影響(它不會恢復到配置中的原始設定)。更改不會保留在應用程式上下文的生命週期之外。新的應用程式上下文將恢復到配置的設定。

在介面卡停止(或與代理斷開連線)時更改主題會在下次建立連線時生效。

手動確認

從版本 5.3 開始,您可以將 manualAcks 屬性設定為 true。通常用於非同步確認交付。當設定為 true 時,頭部 (IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK) 會新增到訊息中,其值為 SimpleAcknowledgment。您必須呼叫 acknowledge() 方法才能完成交付。有關更多資訊,請參閱 IMqttClientsetManualAcks()messageArrivedComplete() 的 Javadoc。為方便起見,提供了一個頭部訪問器

StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();

從版本 5.2.11 開始,當訊息轉換器丟擲異常或從 MqttMessage 轉換返回 null 時,MqttPahoMessageDrivenChannelAdapter 會將 ErrorMessage 傳送到 errorChannel(如果提供)。否則會將此轉換錯誤重新丟擲到 MQTT 客戶端回撥中。

使用 Java 配置進行配置

以下 Spring Boot 應用程式展示瞭如何使用 Java 配置入站介面卡的示例

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
                .web(false)
                .run(args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://:1883", "testClient",
                                                 "topic1", "topic2");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

使用 Java DSL 進行配置

以下 Spring Boot 應用程式提供了一個使用 Java DSL 配置入站介面卡的示例

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow mqttInbound() {
        return IntegrationFlow.from(
                         new MqttPahoMessageDrivenChannelAdapter("tcp://:1883",
                                        "testClient", "topic1", "topic2"))
                .handle(m -> System.out.println(m.getPayload()))
                .get();
    }

}

出站通道介面卡

出站通道介面卡由 MqttPahoMessageHandler 實現,該介面卡包裝在 ConsumerEndpoint 中。為方便起見,您可以使用名稱空間對其進行配置。

從版本 4.1 開始,介面卡支援非同步傳送操作,避免阻塞直到交付確認。如果需要,您可以發出應用程式事件,以使應用程式能夠確認交付。

以下列表顯示了出站通道介面卡可用的屬性

<int-mqtt:outbound-channel-adapter id="withConverter"
    client-id="foo"  (1)
    url="tcp://:1883"  (2)
    converter="myConverter"  (3)
    client-factory="clientFactory"  (4)
    default-qos="1"  (5)
    qos-expression="" (6)
    default-retained="true"  (7)
    retained-expression="" (8)
    default-topic="bar"  (9)
    topic-expression="" (10)
    async="false"  (11)
    async-events="false"  (12)
    channel="target" />
1 客戶端 ID。
2 代理 URL。
3 一個 MqttMessageConverter(可選)。預設的 DefaultPahoMessageConverter 識別以下頭部資訊
  • mqtt_topic: 訊息將傳送到的主題

  • mqtt_retained: 如果訊息要保留,則為 true

  • mqtt_qos: 服務質量

4 客戶端工廠。
5 預設的服務質量。如果未找到 mqtt_qos 頭部或 qos-expression 返回 null,則使用此值。如果提供了自定義 converter,則不使用此值。
6 一個表示式,用於評估以確定 QoS。預設值為 headers[mqtt_qos]
7 保留標誌的預設值。如果未找到 mqtt_retained 頭部,則使用此值。如果提供了自定義 converter,則不使用此值。
8 一個表示式,用於評估以確定保留的布林值。預設值為 headers[mqtt_retained]
9 訊息傳送到的預設主題(如果未找到 mqtt_topic 頭部,則使用此值)。
10 一個表示式,用於評估以確定目標主題。預設值為 headers['mqtt_topic']
11 true 時,呼叫者不會阻塞。相反,它會在傳送訊息時等待交付確認。預設值為 false(傳送會阻塞直到交付確認)。
12 asyncasync-events 都為 true 時,會發出一個 MqttMessageSentEvent(請參閱 事件)。它包含訊息、主題、客戶端庫生成的 messageIdclientIdclientInstance(每次客戶端連線時遞增)。當客戶端庫確認交付時,會發出一個 MqttMessageDeliveredEvent。它包含 messageIdclientIdclientInstance,從而可以將交付與 send() 相關聯。任何 ApplicationListener 或事件入站通道介面卡都可以接收這些事件。請注意,MqttMessageDeliveredEvent 有可能在 MqttMessageSentEvent 之前收到。預設值為 false
從版本 4.1 開始,可以省略 URL。相反,可以在 DefaultMqttPahoClientFactoryserverURIs 屬性中提供伺服器 URI。這使得,例如,可以連線到高可用 (HA) 叢集。

使用 Java 配置進行配置

以下 Spring Boot 應用程式展示瞭如何使用 Java 配置配置出站介面卡的示例

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo");
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
        options.setUserName("username");
        options.setPassword("password".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

    }

}

使用 Java DSL 進行配置

以下 Spring Boot 應用程式提供了一個使用 Java DSL 配置出站介面卡的示例

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

       @Bean
       public IntegrationFlow mqttOutboundFlow() {
           return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
    }

}

事件

介面卡會發布某些應用程式事件。

  • MqttConnectionFailedEvent - 如果我們無法連線或連線隨後丟失,則由兩個介面卡釋出。對於 MQTT v5 Paho 客戶端,當伺服器執行正常斷開連線時也會發出此事件,在這種情況下,連線丟失的 causenull

  • MqttMessageSentEvent - 如果在非同步模式下執行,則由出站介面卡在訊息已傳送時釋出。

  • MqttMessageDeliveredEvent - 如果在非同步模式下執行,則由出站介面卡在客戶端指示訊息已送達時釋出。

  • MqttMessageNotDeliveredEvent - 如果在非同步模式下執行,則由出站介面卡在客戶端指示訊息未送達時釋出。

  • MqttSubscribedEvent - 在訂閱主題後由入站介面卡釋出。

這些事件可以由 ApplicationListener<MqttIntegrationEvent>@EventListener 方法接收。

要確定事件的來源,請使用以下方法;您可以檢查 bean 名稱和/或連線選項(以訪問伺服器 URI 等)。

MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();

MQTT v5 支援

從版本 5.5.5 開始,spring-integration-mqtt 模組提供了 MQTT v5 協議的通道介面卡實現。org.eclipse.paho:org.eclipse.paho.mqttv5.client 是一個 optional 依賴項,因此必須顯式包含在目標專案中。

由於 MQTT v5 協議支援 MQTT 訊息中的額外任意屬性,因此引入了 MqttHeaderMapper 實現,用於在釋出和接收操作時對映頭部資訊。預設情況下(透過 * 模式),它對映所有接收到的 PUBLISH 幀屬性(包括使用者屬性)。在出站方面,它對映 PUBLISH 幀的以下頭部子集:contentTypemqtt_messageExpiryIntervalmqtt_responseTopicmqtt_correlationData

MQTT v5 協議的出站通道介面卡以 Mqttv5PahoMessageHandler 的形式存在。它需要一個 clientId 和 MQTT 代理 URL 或 MqttConnectionOptions 引用。它支援 MqttClientPersistence 選項,可以是 async 並在這種情況下發出 MqttIntegrationEvent 物件(參見 asyncEvents 選項)。如果請求訊息有效載荷是 org.eclipse.paho.mqttv5.common.MqttMessage,則透過內部 IMqttAsyncClient 按原樣釋出。如果有效載荷是 byte[],則將其按原樣用於目標 MqttMessage 有效載荷進行釋出。如果有效載荷是 String,則將其轉換為 byte[] 進行釋出。其餘用例委託給提供的 MessageConverter,它是應用程式上下文中的 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME ConfigurableCompositeMessageConverter bean。注意:當請求的訊息有效載荷已經是 MqttMessage 時,不使用提供的 HeaderMapper<MqttProperties>。以下 Java DSL 配置示例演示瞭如何在整合流中使用此通道介面卡

@Bean
public IntegrationFlow mqttOutFlow() {
    Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
    MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
    mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
    messageHandler.setHeaderMapper(mqttHeaderMapper);
    messageHandler.setAsync(true);
    messageHandler.setAsyncEvents(true);
    messageHandler.setConverter(mqttStringToBytesConverter());

    return f -> f.handle(messageHandler);
}
org.springframework.integration.mqtt.support.MqttMessageConverter 不能與 Mqttv5PahoMessageHandler 一起使用,因為其契約僅針對 MQTT v3 協議。

如果連線在啟動時或執行時失敗,Mqttv5PahoMessageHandler 會嘗試在下一個生成到此處理器的訊息上重新連線。如果此手動重新連線失敗,則連線異常將拋回給呼叫者。在這種情況下,將應用標準的 Spring Integration 錯誤處理過程,包括請求處理程式建議,例如重試或斷路器。

有關更多資訊,請參閱 Mqttv5PahoMessageHandler 的 javadoc 及其超類。

MQTT v5 協議的入站通道介面卡以 Mqttv5PahoMessageDrivenChannelAdapter 的形式存在。它需要一個 clientId 和 MQTT 代理 URL 或 MqttConnectionOptions 引用,以及要訂閱和消費的主題。它支援 MqttClientPersistence 選項,預設為記憶體中。可以配置預期的 payloadType(預設為 byte[]),並將其傳播到提供的 SmartMessageConverter 以從接收到的 MqttMessagebyte[] 進行轉換。如果設定了 manualAck 選項,則會將 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 頭部新增到要作為 SimpleAcknowledgment 例項生成的訊息中。HeaderMapper<MqttProperties> 用於將 PUBLISH 幀屬性(包括使用者屬性)對映到目標訊息頭部。標準的 MqttMessage 屬性,例如 qosiddupretained 以及接收到的主題,總是對映到頭部。有關更多資訊,請參閱 MqttHeaders

從版本 6.3 開始,Mqttv5PahoMessageDrivenChannelAdapter 提供了基於 MqttSubscription 的建構函式,用於更精細的配置,而不是簡單的 topic 名稱。當提供了這些訂閱時,通道介面卡的 qos 選項不能使用,因為這種 qos 模式是 MqttSubscription API 的一部分。

以下 Java DSL 配置示例演示瞭如何在整合流中使用此通道介面卡

@Bean
public IntegrationFlow mqttInFlow() {
    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
    messageProducer.setPayloadType(String.class);
    messageProducer.setMessageConverter(mqttStringToBytesConverter());
    messageProducer.setManualAcks(true);

    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}
org.springframework.integration.mqtt.support.MqttMessageConverter 不能與 Mqttv5PahoMessageDrivenChannelAdapter 一起使用,因為其契約僅針對 MQTT v3 協議。

有關更多資訊,請參閱 Mqttv5PahoMessageDrivenChannelAdapter 的 javadoc 及其超類。

建議將 MqttConnectionOptions#setAutomaticReconnect(boolean) 設定為 true,以使內部 IMqttAsyncClient 例項處理重新連線。否則,只有手動重啟 Mqttv5PahoMessageDrivenChannelAdapter 才能處理重新連線,例如透過斷開連線時的 MqttConnectionFailedEvent 處理。

共享 MQTT 客戶端支援

如果多個整合需要單個 MQTT ClientID,則不能使用多個 MQTT 客戶端例項,因為 MQTT 代理可能會限制每個 ClientID 的連線數(通常只允許單個連線)。為了使單個客戶端可重用於不同的通道介面卡,可以使用 org.springframework.integration.mqtt.core.ClientManager 元件並將其傳遞給任何需要的通道介面卡。它將管理 MQTT 連線生命週期並在需要時自動重新連線。此外,可以向客戶端管理器提供自定義連線選項和 MqttClientPersistence,就像當前可以為通道介面卡元件所做的那樣。

請注意,支援 MQTT v5 和 v3 通道介面卡。

以下 Java DSL 配置示例演示瞭如何在整合流中使用此客戶端管理器

@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
    MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
    connectionOptions.setServerURIs(new String[]{ "tcp://:1883" });
    connectionOptions.setConnectionTimeout(30000);
    connectionOptions.setMaxReconnectDelay(1000);
    connectionOptions.setAutomaticReconnect(true);
    Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
    clientManager.setPersistence(new MqttDefaultFilePersistence());
    return clientManager;
}

@Bean
public IntegrationFlow mqttInFlowTopic1(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

@Bean
public IntegrationFlow mqttInFlowTopic2(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

@Bean
public IntegrationFlow mqttOutFlow(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}
從版本 6.4 開始,現在可以透過 IntegrationFlowContext 使用相應的 ClientManager 在執行時新增 MqttPahoMessageDrivenChannelAdapterMqttv5PahoMessageDrivenChannelAdapter 的多個例項
private void addAddRuntimeAdapter(IntegrationFlowContext flowContext, Mqttv5ClientManager clientManager,
                                  String topic, MessageChannel channel) {
    flowContext
        .registration(
            IntegrationFlow
                .from(new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, topic))
                .channel(channel)
                .get())
        .register();
}
© . This site is unofficial and not affiliated with VMware.