MongoDB 支援

2.1 版本引入了對 MongoDB 的支援:一個“高效能、開源、面向文件的資料庫”。

您需要在專案中包含此依賴項

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mongodb</artifactId>
    <version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-mongodb:6.4.4"

要下載、安裝和執行 MongoDB,請參閱 MongoDB 文件

連線到 MongoDB

阻塞還是響應式?

從 5.3 版本開始,Spring Integration 提供了對響應式 MongoDB 驅動程式的支援,以便在訪問 MongoDB 時啟用非阻塞 I/O。要啟用響應式支援,請將 MongoDB 響應式流驅動程式新增到您的依賴項中

  • Maven

  • Gradle

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-reactivestreams"

對於常規同步客戶端,您需要將相應的驅動程式新增到依賴項中

  • Maven

  • Gradle

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-sync</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-sync"

在框架中,兩者都是可選的 (optional),以更好地支援終端使用者的選擇。

要開始與 MongoDB 互動,您首先需要連線到它。Spring Integration 構建在另一個 Spring 專案 Spring Data MongoDB 提供的支援之上。它提供了名為 MongoDatabaseFactoryReactiveMongoDatabaseFactory 的工廠類,它們簡化了與 MongoDB 客戶端 API 的整合。

Spring Data 預設提供阻塞式 MongoDB 驅動程式,但您可以透過引入上述依賴項來選擇使用響應式方式。

使用 MongoDatabaseFactory

要連線到 MongoDB,您可以使用 MongoDatabaseFactory 介面的一個實現。

以下示例展示瞭如何使用 SimpleMongoClientDatabaseFactory

  • Java

  • XML

MongoDatabaseFactory mongoDbFactory =
        new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

SimpleMongoClientDatabaseFactory 接受兩個引數:一個 MongoClient 例項和一個指定資料庫名稱的 String。如果您需要配置 hostport 等屬性,可以使用底層 MongoClients 類提供的建構函式之一進行傳遞。有關如何配置 MongoDB 的更多資訊,請參閱 Spring-Data-MongoDB 參考文件。

使用 ReactiveMongoDatabaseFactory

要使用響應式驅動程式連線到 MongoDB,您可以使用 ReactiveMongoDatabaseFactory 介面的一個實現。

以下示例展示瞭如何使用 SimpleReactiveMongoDatabaseFactory

  • Java

  • XML

ReactiveMongoDatabaseFactory mongoDbFactory =
        new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

MongoDB 訊息儲存

正如《企業整合模式》(EIP) 一書中所述,訊息儲存 允許您持久化訊息。如果可靠性是關注點,這在處理具有訊息緩衝能力的元件(如 QueueChannelaggregatorresequencer 等)時非常有用。在 Spring Integration 中,MessageStore 策略也為 EIP 中描述的 存根(claim check) 模式提供了基礎。

Spring Integration 的 MongoDB 模組提供了 MongoDbMessageStore,它是 MessageStore 策略(主要用於存根模式)和 MessageGroupStore 策略(主要用於聚合器和重排序器模式)的實現。

以下示例配置了一個 MongoDbMessageStore 以使用 QueueChannel 和一個 aggregator

<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="mongoDbMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="mongoDbMessageStore"/>

上述示例是一個簡單的 Bean 配置,它期望一個 MongoDbFactory 作為建構函式引數。

MongoDbMessageStore 使用 Spring Data Mongo 對映機制將 Message 作為 Mongo 文件展開,包含所有巢狀屬性。當您需要訪問儲存訊息的 payloadheaders 進行審計或分析時,這非常有用。

MongoDbMessageStore 使用自定義的 MappingMongoConverter 實現將 Message 例項儲存為 MongoDB 文件,並且對 Message 的屬性(payloadheader 值)有一些限制。

從 5.1.6 版本開始,MongoDbMessageStore 可以配置自定義轉換器,這些轉換器會傳播到內部的 MappingMongoConverter 實現中。有關更多資訊,請參閱 MongoDbMessageStore.setCustomConverters(Object…​ customConverters) 的 JavaDocs。

Spring Integration 3.0 引入了 ConfigurableMongoDbMessageStore。它實現了 MessageStoreMessageGroupStore 介面。此類可以接受一個 MongoTemplate 作為建構函式引數,您可以使用它來配置自定義的 WriteConcern 等。另一個建構函式需要一個 MappingMongoConverter 和一個 MongoDbFactory,這允許您為 Message 例項及其屬性提供一些自定義轉換。請注意,預設情況下,ConfigurableMongoDbMessageStore 使用標準的 Java 序列化來向 MongoDB 寫入和讀取 Message 例項(參見 MongoDbMessageBytesConverter),並依賴 MongoTemplate 的其他屬性的預設值。它從提供的 MongoDbFactoryMappingMongoConverter 構建一個 MongoTemplateConfigurableMongoDbMessageStore 儲存的集合的預設名稱是 configurableStoreMessages。我們建議在訊息包含複雜資料型別時使用此實現來建立健壯且靈活的解決方案。

從 6.0.8 版本開始,AbstractConfigurableMongoDbMessageStore 提供了一個 setCreateIndexes(boolean) 選項(預設為 true),可用於停用自動索引建立。以下示例展示瞭如何宣告一個 Bean 並停用自動索引建立

@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
    MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndexes(false);
    return mongoDbChannelMessageStore;
}

MongoDB 通道訊息儲存

4.0 版本引入了新的 MongoDbChannelMessageStore。它是一個最佳化的 MessageGroupStore,用於 QueueChannel 例項。透過設定 priorityEnabled = true,您可以在 <int:priority-queue> 例項中使用它,以實現持久化訊息的優先順序順序輪詢。MongoDB 文件中的 priority 欄位填充自 IntegrationMessageHeaderAccessor.PRIORITY (priority) 訊息頭。

此外,所有 MongoDB MessageStore 例項現在都有一個用於 MessageGroup 文件的 sequence 欄位。sequence 值是同一集合中一個簡單 sequence 文件的 $inc 操作結果,該文件按需建立。sequence 欄位用於 poll 操作,以在訊息儲存在同一毫秒內時提供先進先出(FIFO)的訊息順序(如果在配置中指定了優先順序,則在優先順序內排序)。

我們不建議對優先順序和非優先順序使用同一個 MongoDbChannelMessageStore Bean,因為 priorityEnabled 選項適用於整個儲存。然而,同一個 collection 可以用於兩種 MongoDbChannelMessageStore 型別,因為從儲存中輪詢訊息是排序並使用索引的。要配置這種情況,您可以從另一個訊息儲存 Bean 擴充套件一個,如下面的示例所示
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
    <constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="store"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

使用 AbstractConfigurableMongoDbMessageStore 並停用自動索引建立

從 6.0.8 版本開始,AbstractConfigurableMongoDbMessageStore 實現了一個 setCreateIndex(boolean) 方法,可用於停用或啟用(預設為啟用)自動索引建立。以下示例展示瞭如何宣告一個 Bean 並停用自動索引建立

@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
    AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndex(false);

    return mongoDbChannelMessageStore;
}

MongoDB 元資料儲存

Spring Integration 4.2 引入了一個新的基於 MongoDB 的 MetadataStore(參見 元資料儲存)實現。您可以使用 MongoDbMetadataStore 在應用程式重啟後維護元資料狀態。您可以將這個新的 MetadataStore 實現與以下介面卡一起使用,例如

要指示這些介面卡使用新的 MongoDbMetadataStore,請宣告一個 Bean 名稱為 metadataStore 的 Spring Bean。feed 入站通道介面卡會自動拾取並使用宣告的 MongoDbMetadataStore。以下示例展示瞭如何宣告一個 Bean 名稱為 metadataStore 的 Bean

@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
    return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}

MongoDbMetadataStore 還實現了 ConcurrentMetadataStore,使其能夠可靠地在多個應用程式例項之間共享,其中只有一個例項被允許儲存或修改鍵的值。由於 MongoDB 的保證,所有這些操作都是原子性的。

MongoDB 入站通道介面卡

MongoDB 入站通道介面卡是一個輪詢消費者,它從 MongoDB 讀取資料並將其作為 Message 的 payload 傳送。以下示例展示瞭如何配置 MongoDB 入站通道介面卡

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
       channel="replyChannel"
       query="{'name' : 'Bob'}"
       entity-class="java.lang.Object"
       auto-startup="false">
		<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>

如前述配置所示,您可以透過使用 inbound-channel-adapter 元素併為各種屬性提供值來配置 MongoDB 入站通道介面卡,例如

  • query: 一個 JSON 查詢(參見 MongoDB 查詢

  • query-expression: 一個 SpEL 表示式,評估結果為一個 JSON 查詢字串(如上面 query 屬性所示)或 o.s.data.mongodb.core.query.Query 的一個例項。與 query 屬性互斥。

  • entity-class: Payload 物件的型別。如果未提供,則返回一個 com.mongodb.DBObject

  • collection-namecollection-name-expression: 指定要使用的 MongoDB 集合的名稱。

  • mongodb-factory: 對 o.s.data.mongodb.MongoDbFactory 例項的引用

  • mongo-template: 對 o.s.data.mongodb.core.MongoTemplate 例項的引用

  • 其他適用於所有入站介面卡的通用屬性(如 'channel')。

您不能同時設定 mongo-templatemongodb-factory

上述示例相對簡單和靜態,因為它對 query 使用字面值,並對 collection 使用預設名稱。有時,您可能需要在執行時根據某些條件更改這些值。為此,可以使用它們的 -expression 等效項(query-expressioncollection-name-expression),其中提供的表示式可以是任何有效的 SpEL 表示式。

此外,您可能希望對從 MongoDB 讀取併成功處理的資料進行一些後處理。例如,您可能希望在文件處理後移動或刪除它。您可以透過使用 Spring Integration 2.2 新增的事務同步功能來實現這一點,如下面的示例所示

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
    channel="replyChannel"
    query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
    entity-class="java.lang.Object"
    auto-startup="false">
        <int:poller fixed-rate="200" max-messages-per-poll="1">
            <int:transactional synchronization-factory="syncFactory"/>
        </int:poller>
</int-mongodb:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit
        expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
        channel="someChannel"/>
</int:transaction-synchronization-factory>

<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>

以下示例展示了前述示例中引用的 DocumentCleaner

public class DocumentCleaner {
    public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
        if (target instanceof List<?> documents){
            for (Object document : documents) {
                mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
            }
        }
    }
}

您可以使用 transactional 元素將輪詢器宣告為事務性的。此元素可以引用一個真正的事務管理器(例如,如果您的流的其他部分呼叫 JDBC)。如果您沒有“真正的”事務,您可以使用 o.s.i.transaction.PseudoTransactionManager 的例項,它是 Spring PlatformTransactionManager 的一個實現,並在沒有實際事務時啟用 Mongo 介面卡的事務同步功能。

這樣做並不會使 MongoDB 本身具有事務性。它允許在成功(提交)之前或之後,或在失敗(回滾)之後進行操作同步。

一旦您的輪詢器是事務性的,您可以在 transactional 元素上設定一個 o.s.i.transaction.TransactionSynchronizationFactory 的例項。TransactionSynchronizationFactory 建立一個 TransactionSynchronization 的例項。為了方便起見,我們公開了一個預設的基於 SpEL 的 TransactionSynchronizationFactory,它允許您配置 SpEL 表示式,其執行與事務協調(同步)。支援 before-commit、after-commit 和 after-rollback 事件的表示式,併為每個事件提供一個通道,評估結果(如果有)將傳送到該通道。對於每個子元素,您可以指定 expressionchannel 屬性。如果只存在 channel 屬性,則接收到的訊息將作為特定同步場景的一部分發送到那裡。如果只存在 expression 屬性且表示式結果為非空值,則會生成一個以結果為 payload 的訊息併發送到預設通道(NullChannel),並出現在日誌中(DEBUG 級別)。如果您希望評估結果傳送到特定通道,請新增一個 channel 屬性。如果表示式結果為 null 或 void,則不會生成訊息。

有關事務同步的更多資訊,請參閱 事務同步

從 5.5 版本開始,MongoDbMessageSource 可以配置一個 updateExpression,它必須評估為一個具有 MongoDB update 語法的 Stringorg.springframework.data.mongodb.core.query.Update 的一個例項。它可以用作上述後處理過程的替代方案,它修改從集合中取出的實體,以便在下一個輪詢週期中不會再次從集合中取出它們(假設更新改變了查詢中使用的某個值)。當同一個集合的多個 MongoDbMessageSource 例項在叢集中使用時,仍然建議使用事務來實現執行隔離和資料一致性。

MongoDB 變更流入站通道介面卡

從 5.3 版本開始,spring-integration-mongodb 模組引入了 MongoDbChangeStreamMessageProducer - 這是 Spring Data ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class) API 的一個響應式 MessageProducerSupport 實現。該元件預設生成一個訊息的 Flux 流,其中 payload 為 ChangeStreamEventbody,幷包含一些變更流相關的 headers(參見 MongoHeaders)。建議將此 MongoDbChangeStreamMessageProducer 與一個 FluxMessageChannel 結合作為 outputChannel,用於按需訂閱和下游事件消費。

此通道介面卡的 Java DSL 配置可能如下所示

@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
    return IntegrationFlow.from(
            MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
                    .domainType(Person.class)
                    .collection("person")
                    .extractBody(false))
            .channel(MessageChannels.flux())
            .get();
}

MongoDbChangeStreamMessageProducer 停止,或下游取消訂閱,或 MongoDB 變更流生成 OperationType.INVALIDATE 時,Publisher 會完成。通道介面卡可以再次啟動,並建立一個新的源資料 Publisher,並在 MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>) 中自動訂閱。如果在啟動之間需要從其他位置消費變更流事件,可以重新配置此通道介面卡的新選項。

有關 Spring Data MongoDB 中變更流支援的更多資訊,請參閱文件

MongoDB 出站通道介面卡

MongoDB 出站通道介面卡允許您將訊息 payload 寫入 MongoDB 文件儲存,如下面的示例所示

<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
	collection-name="myCollection"
	mongo-converter="mongoConverter"
	mongodb-factory="mongoDbFactory" />

如前述配置所示,您可以透過使用 outbound-channel-adapter 元素併為各種屬性提供值來配置 MongoDB 出站通道介面卡,例如

  • collection-namecollection-name-expression: 指定要使用的 MongoDB 集合的名稱。

  • mongo-converter: 對 o.s.data.mongodb.core.convert.MongoConverter 例項的引用,用於協助將原始 Java 物件轉換為 JSON 文件表示形式。

  • mongodb-factory: 對 o.s.data.mongodb.MongoDbFactory 例項的引用。

  • mongo-template: 對 o.s.data.mongodb.core.MongoTemplate 例項的引用。注意:您不能同時設定 mongo-template 和 mongodb-factory。

  • 其他適用於所有入站介面卡的通用屬性(如 'channel')。

上述示例相對簡單和靜態,因為它對 collection-name 使用字面值。有時,您可能需要在執行時根據某些條件更改此值。為此,可以使用 collection-name-expression,其中提供的表示式可以是任何有效的 SpEL 表示式。

MongoDB 出站閘道器

5.0 版本引入了 MongoDB 出站閘道器。它允許您透過向其請求通道傳送訊息來查詢資料庫。然後閘道器將響應傳送到回覆通道。您可以使用訊息 payload 和 headers 來指定查詢和集合名稱,如下面的示例所示

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory;

    @Autowired
    private MongoConverter;


    @Bean
    public IntegrationFlow gatewaySingleQueryFlow() {
        return f -> f
                .handle(queryOutboundGateway())
                .channel(c -> c.queue("retrieveResults"));
    }

    private MongoDbOutboundGatewaySpec queryOutboundGateway() {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
                .query("{name : 'Bob'}")
                .collectionNameFunction(m -> m.getHeaders().get("collection"))
                .expectSingleResult(true)
                .entityClass(Person.class);
    }

}
class MongoDbKotlinApplication {

    fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)

    @Autowired
    lateinit var mongoDbFactory: MongoDatabaseFactory

    @Autowired
    lateinit var mongoConverter: MongoConverter

    @Bean
    fun gatewaySingleQueryFlow() =
    integrationFlow {
        handle(queryOutboundGateway())
        channel { queue("retrieveResults") }
    }

    private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .query("{name : 'Bob'}")
            .collectionNameFunction<Any> { m -> m.headers["collection"] as String }
            .expectSingleResult(true)
            .entityClass(Person::class.java)
    }

}
@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory mongoDbFactory;

    @Bean
    @ServiceActivator(inputChannel = "requestChannel")
    public MessageHandler mongoDbOutboundGateway() {
        MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
        gateway.setCollectionNameExpressionString("'myCollection'");
        gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
        gateway.setExpectSingleResult(true);
        gateway.setEntityClass(Person.class);
        gateway.setOutputChannelName("replyChannel");
        return gateway;
    }

    @Bean
    @ServiceActivator(inputChannel = "replyChannel")
    public MessageHandler handler() {
        return message -> System.out.println(message.getPayload());
    }
}
<int-mongodb:outbound-gateway id="gatewayQuery"
    mongodb-factory="mongoDbFactory"
    mongo-converter="mongoConverter"
    query="{firstName: 'Bob'}"
    collection-name="myCollection"
    request-channel="in"
    reply-channel="out"
    entity-class="org.springframework.integration.mongodb.test.entity$Person"/>

您可以與 MongoDB 出站閘道器一起使用以下屬性

  • collection-namecollection-name-expression: 指定要使用的 MongoDB 集合的名稱。

  • mongo-converter: 對 o.s.data.mongodb.core.convert.MongoConverter 例項的引用,用於協助將原始 Java 物件轉換為 JSON 文件表示形式。

  • mongodb-factory: 對 o.s.data.mongodb.MongoDbFactory 例項的引用。

  • mongo-template: 對 o.s.data.mongodb.core.MongoTemplate 例項的引用。注意:您不能同時設定 mongo-templatemongodb-factory

  • entity-class: 要傳遞給 MongoTemplate 中 find(..)findOne(..) 方法的實體類的完全限定名。如果未提供此屬性,預設值為 org.bson.Document

  • queryquery-expression: 指定 MongoDB 查詢。有關更多查詢示例,請參閱 MongoDB 文件

  • collection-callback: 對 org.springframework.data.mongodb.core.CollectionCallback 例項的引用。自 5.0.11 版本起,推薦使用帶有請求訊息上下文的 o.s.i.mongodb.outbound.MessageCollectionCallback 例項。有關更多資訊,請參閱其 Javadocs。注意:您不能同時擁有 collection-callback 和任何查詢屬性。

作為 queryquery-expression 屬性的替代方案,您可以使用 collectionCallback 屬性作為對 MessageCollectionCallback 函式式介面實現的引用來指定其他資料庫操作。以下示例指定了一個計數操作

private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
    return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .collectionCallback((collection, requestMessage) -> collection.count())
            .collectionName("myCollection");
}

MongoDB 響應式通道介面卡

從 5.3 版本開始,提供了 ReactiveMongoDbStoringMessageHandlerReactiveMongoDbMessageSource 實現。它們基於 Spring Data 的 ReactiveMongoOperations,並需要 org.mongodb:mongodb-driver-reactivestreams 依賴項。

ReactiveMongoDbStoringMessageHandlerReactiveMessageHandler 的實現,當整合流定義涉及響應式流組合時,框架原生支援它。有關更多資訊,請參閱 ReactiveMessageHandler

從配置角度來看,它與許多其他標準通道介面卡沒有區別。例如,使用 Java DSL,此類通道介面卡可以這樣使用

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return f -> f
            .channel(MessageChannels.flux())
            .handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}

在此示例中,我們將透過提供的 ReactiveMongoDatabaseFactory 連線到 MongoDB,並將請求訊息中的資料儲存到預設名稱為 data 的集合中。實際操作將根據內部建立的 ReactiveStreamsConsumer 中響應式流組合的需要執行。

ReactiveMongoDbMessageSource 是一個 AbstractMessageSource 實現,它基於所提供的 ReactiveMongoDatabaseFactoryReactiveMongoOperations 和 MongoDB 查詢(或表示式),根據 expectSingleResult 選項呼叫 find()findOne() 操作,並使用預期的 entityClass 型別來轉換查詢結果。查詢執行和結果評估是按需執行的,即當生成的訊息的有效載荷中的 PublisherFluxMono,根據 expectSingleResult 選項)被訂閱時。當在下游使用 splitter 和 FluxMessageChannel 時,框架可以自動訂閱此類有效載荷(本質上是 flatMap)。否則,目標應用程式有責任在下游端點中訂閱輪詢的 Publisher

使用 Java DSL,這樣的通道介面卡可以配置為

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return IntegrationFlow
            .from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
                            .entityClass(Person.class),
                    c -> c.poller(Pollers.fixedDelay(1000)))
            .split()
            .channel(c -> c.flux("output"))
            .get();
}

從版本 5.5 開始,ReactiveMongoDbMessageSource 可以配置 updateExpression。它具有與阻塞式 MongoDbMessageSource 相同的功能。參見 MongoDB 入站通道介面卡AbstractMongoDbMessageSourceSpec JavaDocs 瞭解更多資訊。