MongoDb 支援

版本 2.1 引入了對 MongoDB 的支援:一個“高效能、開源、面向文件的資料庫”。

專案需要此依賴項

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mongodb</artifactId>
    <version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-mongodb:7.0.0"

要下載、安裝和執行 MongoDB,請參閱 MongoDB 文件

連線到 MongoDb

阻塞還是響應式?

從版本 5.3 開始,Spring Integration 提供了對響應式 MongoDB 驅動程式的支援,以在訪問 MongoDB 時啟用非阻塞 I/O。要啟用響應式支援,請將 MongoDB 響應式流驅動程式新增到您的依賴項中。

  • Maven

  • Gradle

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-reactivestreams"

對於常規同步客戶端,您需要將其相應的驅動程式新增到依賴項中。

  • Maven

  • Gradle

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-sync</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-sync"

兩者在框架中都是 optional(可選)的,以更好地支援終端使用者的選擇。

要開始與 MongoDB 互動,您首先需要連線到它。Spring Integration 基於另一個 Spring 專案 Spring Data MongoDB 提供的支援。它提供了名為 MongoDatabaseFactoryReactiveMongoDatabaseFactory 的工廠類,它們簡化了與 MongoDB 客戶端 API 的整合。

Spring Data 預設提供阻塞式 MongoDB 驅動程式,但您可以透過包含上述依賴項來選擇使用響應式模式。

使用 MongoDatabaseFactory

要連線到 MongoDB,您可以使用 MongoDatabaseFactory 介面的實現。

以下示例展示瞭如何使用 SimpleMongoClientDatabaseFactory

  • Java

  • XML

MongoDatabaseFactory mongoDbFactory =
        new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

SimpleMongoClientDatabaseFactory 接收兩個引數:一個 MongoClient 例項和一個指定資料庫名稱的 String。如果您需要配置 hostport 等屬性,您可以使用底層 MongoClients 類提供的建構函式之一來傳遞這些屬性。有關如何配置 MongoDB 的更多資訊,請參閱 Spring-Data-MongoDB 參考。

使用 ReactiveMongoDatabaseFactory

要使用響應式驅動程式連線到 MongoDB,您可以使用 ReactiveMongoDatabaseFactory 介面的實現。

以下示例展示瞭如何使用 SimpleReactiveMongoDatabaseFactory

  • Java

  • XML

ReactiveMongoDatabaseFactory mongoDbFactory =
        new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

MongoDB 訊息儲存

如《企業整合模式》(EIP)一書所述,訊息儲存允許您持久化訊息。當處理具有緩衝訊息能力的元件(QueueChannelaggregatorresequencer 等)時,如果可靠性是一個問題,這會很有用。在 Spring Integration 中,MessageStore 策略也為 索賠檢查 模式奠定了基礎,該模式也在 EIP 中有所描述。

Spring Integration 的 MongoDB 模組提供了 MongoDbMessageStore,它是 MessageStore 策略(主要用於索賠檢查模式)和 MessageGroupStore 策略(主要用於聚合器和重新排序器模式)的實現。

以下示例配置了一個 MongoDbMessageStore,以使用 QueueChannelaggregator

<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="mongoDbMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="mongoDbMessageStore"/>

上述示例是一個簡單的 bean 配置,它需要一個 MongoDbFactory 作為建構函式引數。

MongoDbMessageStore 使用 Spring Data Mongo 對映機制將 Message 擴充套件為包含所有巢狀屬性的 Mongo 文件。當您需要訪問 payloadheaders 以進行審計或分析(例如,針對儲存的訊息)時,這很有用。

MongoDbMessageStore 使用自定義的 MappingMongoConverter 實現將 Message 例項儲存為 MongoDB 文件,並且對 Message 的屬性(payloadheader 值)有一些限制。

從版本 5.1.6 開始,MongoDbMessageStore 可以配置自定義轉換器,這些轉換器會傳播到內部的 MappingMongoConverter 實現中。有關更多資訊,請參閱 MongoDbMessageStore.setCustomConverters(Object…​ customConverters) JavaDocs。

Spring Integration 3.0 引入了 ConfigurableMongoDbMessageStore。它實現了 MessageStoreMessageGroupStore 介面。這個類可以接收 MongoTemplate 作為建構函式引數,例如,您可以使用它配置自定義的 WriteConcern。另一個建構函式需要一個 MappingMongoConverter 和一個 MongoDbFactory,這允許您為 Message 例項及其屬性提供一些自定義轉換。請注意,預設情況下,ConfigurableMongoDbMessageStore 使用標準 Java 序列化來寫入和讀取 MongoDB 中的 Message 例項(參見 MongoDbMessageBytesConverter),並依賴 MongoTemplate 中其他屬性的預設值。它從提供的 MongoDbFactoryMappingMongoConverter 構建一個 MongoTemplateConfigurableMongoDbMessageStore 儲存的集合的預設名稱是 configurableStoreMessages。當訊息包含複雜資料型別時,我們建議使用此實現來建立健壯且靈活的解決方案。

從 6.0.8 版本開始,AbstractConfigurableMongoDbMessageStore 提供了 setCreateIndexes(boolean) 選項(預設為 true),可用於停用自動索引建立。以下示例展示瞭如何宣告一個 bean 並停用自動索引建立。

@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
    MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndexes(false);
    return mongoDbChannelMessageStore;
}

MongoDB 通道訊息儲存

版本 4.0 引入了新的 MongoDbChannelMessageStore。它是一個最佳化的 MessageGroupStore,用於 QueueChannel 例項。透過 priorityEnabled = true,您可以在 <int:priority-queue> 例項中使用它,以實現持久化訊息的優先順序順序輪詢。優先順序 MongoDB 文件欄位由 IntegrationMessageHeaderAccessor.PRIORITY (priority) 訊息頭填充。

此外,所有 MongoDB MessageStore 例項現在都有一個用於 MessageGroup 文件的 sequence 欄位。sequence 值是同一集合中簡單 sequence 文件的 $inc 操作的結果,該文件按需建立。在 poll 操作中,sequence 欄位用於在訊息儲存在同一毫秒內時提供先進先出(FIFO)的訊息順序(如果配置了優先順序,則在優先順序內)。

我們不建議將同一個 MongoDbChannelMessageStore bean 用於優先順序和非優先順序,因為 priorityEnabled 選項適用於整個儲存。但是,同一個 collection 可以用於兩種 MongoDbChannelMessageStore 型別,因為從儲存中輪詢訊息是排序的並使用索引。要配置此場景,您可以將一個訊息儲存 bean 擴充套件到另一個,如下例所示
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
    <constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="store"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

使用停用自動索引建立的 AbstractConfigurableMongoDbMessageStore

從版本 6.0.8 開始,AbstractConfigurableMongoDbMessageStore 實現了 setCreateIndex(boolean),可用於停用或啟用(預設)自動索引建立。以下示例展示瞭如何宣告一個 bean 並停用自動索引建立

@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
    AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndex(false);

    return mongoDbChannelMessageStore;
}

MongoDB 元資料儲存

Spring Integration 4.2 引入了新的基於 MongoDB 的 MetadataStore(參見 Metadata Store)實現。您可以使用 MongoDbMetadataStore 在應用程式重啟後維護元資料狀態。您可以將此新的 MetadataStore 實現與以下介面卡一起使用

要指示這些介面卡使用新的 MongoDbMetadataStore,請宣告一個 bean 名稱為 metadataStore 的 Spring bean。Feed 入站通道介面卡會自動識別並使用宣告的 MongoDbMetadataStore。以下示例展示瞭如何宣告一個名為 metadataStore 的 bean

@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
    return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}

MongoDbMetadataStore 也實現了 ConcurrentMetadataStore,這使得它可以在多個應用程式例項之間可靠地共享,其中只有一個例項被允許儲存或修改鍵的值。由於 MongoDB 的保證,所有這些操作都是原子性的。

MongoDB 入站通道介面卡

MongoDB 入站通道介面卡是一個輪詢消費者,它從 MongoDB 讀取資料並將其作為 Message payload 傳送。以下示例展示瞭如何配置 MongoDB 入站通道介面卡

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
       channel="replyChannel"
       query="{'name' : 'Bob'}"
       entity-class="java.lang.Object"
       auto-startup="false">
		<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>

如上述配置所示,您可以使用 inbound-channel-adapter 元素配置 MongoDB 入站通道介面卡,併為各種屬性提供值,例如

  • query: 一個 JSON 查詢(參見 MongoDB 查詢

  • query-expression: 一個 SpEL 表示式,其計算結果為 JSON 查詢字串(如上面的 query 屬性)或 o.s.data.mongodb.core.query.Query 的例項。與 query 屬性互斥。

  • entity-class: payload 物件的型別。如果未提供,則返回 com.mongodb.DBObject

  • collection-namecollection-name-expression: 標識要使用的 MongoDB 集合的名稱。

  • mongodb-factory: 引用 o.s.data.mongodb.MongoDbFactory 的例項

  • mongo-template: 引用 o.s.data.mongodb.core.MongoTemplate 的例項

  • 所有其他入站介面卡共有的其他屬性(例如 'channel')。

您不能同時設定 mongo-templatemongodb-factory

前面的示例相對簡單和靜態,因為它有一個用於 query 的字面值,並使用 collection 的預設名稱。有時,您可能需要根據某些條件在執行時更改這些值。為此,請使用它們的 -expression 等效項(query-expressioncollection-name-expression),其中提供的表示式可以是任何有效的 SpEL 表示式。

此外,您可能希望對從 MongoDB 讀取的已成功處理的資料進行一些後處理。例如,您可能希望在文件處理後移動或刪除它。您可以透過使用 Spring Integration 2.2 新增的事務同步功能來實現這一點,如下例所示

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
    channel="replyChannel"
    query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
    entity-class="java.lang.Object"
    auto-startup="false">
        <int:poller fixed-rate="200" max-messages-per-poll="1">
            <int:transactional synchronization-factory="syncFactory"/>
        </int:poller>
</int-mongodb:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit
        expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
        channel="someChannel"/>
</int:transaction-synchronization-factory>

<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>

以下示例顯示了前面示例中引用的 DocumentCleaner

public class DocumentCleaner {
    public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
        if (target instanceof List<?> documents){
            for (Object document : documents) {
                mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
            }
        }
    }
}

您可以使用 transactional 元素將輪詢器宣告為事務性的。此元素可以引用一個真正的事務管理器,例如,如果您的流的某些其他部分呼叫了 JDBC。如果您沒有“真正的”事務,您可以使用 o.s.i.transaction.PseudoTransactionManager 例項,它是 Spring 的 PlatformTransactionManager 的實現,並在沒有實際事務時啟用 Mongo 介面卡的事務同步功能。

這樣做並不會使 MongoDB 本身具有事務性。它允許在成功(提交)之前或之後或失敗(回滾)之後進行動作的同步。

一旦您的輪詢器是事務性的,您就可以在 transactional 元素上設定 o.s.i.transaction.TransactionSynchronizationFactory 的例項。TransactionSynchronizationFactory 建立 TransactionSynchronization 的例項。為了您的方便,我們公開了一個基於 SpEL 的預設 TransactionSynchronizationFactory,它允許您配置 SpEL 表示式,它們的執行與事務協調(同步)。支援提交前、提交後和回滾後事件的表示式,以及每個事件的通道,其中傳送評估結果(如果有)。對於每個子元素,您可以指定 expressionchannel 屬性。如果只有 channel 屬性存在,則接收到的訊息作為特定同步場景的一部分發送到那裡。如果只有 expression 屬性存在且表示式的結果是非空值,則生成一個以結果作為有效載荷的訊息併發送到預設通道(NullChannel)並出現在日誌中(在 DEBUG 級別)。如果您希望評估結果轉到特定通道,請新增 channel 屬性。如果表示式的結果為 null 或 void,則不生成訊息。

有關事務同步的更多資訊,請參閱 事務同步

從 5.5 版本開始,MongoDbMessageSource 可以配置一個 updateExpression,它必須評估為具有 MongoDb update 語法的 Stringorg.springframework.data.mongodb.core.query.Update 例項。它可以作為描述上述後處理過程的替代方法,並且它會修改從集合中獲取的實體,這樣它們就不會在下一個輪詢週期中再次從集合中拉取(假設更新更改了查詢中使用的一些值)。當叢集中使用多個針對同一集合的 MongoDbMessageSource 例項時,仍然建議使用事務來實現執行隔離和資料一致性。

MongoDB 變更流入站通道介面卡

從 5.3 版本開始,spring-integration-mongodb 模組引入了 MongoDbChangeStreamMessageProducer —— 一個響應式 MessageProducerSupport 實現,用於 Spring Data 的 ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class) API。預設情況下,該元件生成一個訊息 Flux,其 bodyChangeStreamEvent 作為有效載荷,以及一些與變更流相關的頭部資訊(參見 MongoHeaders)。建議將此 MongoDbChangeStreamMessageProducerFluxMessageChannel 結合作為 outputChannel,以實現按需訂閱和下游事件消費。

此通道介面卡的 Java DSL 配置可能如下所示

@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
    return IntegrationFlow.from(
            MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
                    .domainType(Person.class)
                    .collection("person")
                    .extractBody(false))
            .channel(MessageChannels.flux())
            .get();
}

MongoDbChangeStreamMessageProducer 停止,或下游訂閱被取消,或 MongoDb 變更流產生 OperationType.INVALIDATE 時,Publisher 完成。通道介面卡可以再次啟動,並建立一個新的源資料 Publisher,它會自動訂閱到 MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>)。如果在啟動之間需要從其他位置消費變更流事件,則可以為新的選項重新配置此通道介面卡。

有關變更流支援的更多資訊,請參閱 Spring Data MongoDB 文件

MongoDB 出站通道介面卡

MongoDB 出站通道介面卡允許您將訊息有效載荷寫入 MongoDB 文件儲存,如下例所示

<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
	collection-name="myCollection"
	mongo-converter="mongoConverter"
	mongodb-factory="mongoDbFactory" />

如上述配置所示,您可以透過使用 outbound-channel-adapter 元素併為各種屬性提供值來配置 MongoDB 出站通道介面卡,例如

  • collection-namecollection-name-expression: 標識要使用的 MongoDb 集合的名稱。

  • mongo-converter: 引用 o.s.data.mongodb.core.convert.MongoConverter 例項,該例項協助將原始 Java 物件轉換為 JSON 文件表示形式。

  • mongodb-factory: 引用 o.s.data.mongodb.MongoDbFactory 的例項。

  • mongo-template: 引用 o.s.data.mongodb.core.MongoTemplate 的例項。注意:您不能同時設定 mongo-templatemongodb-factory

  • 所有入站介面卡共有的其他屬性(例如 'channel')。

前面的示例相對簡單和靜態,因為它有一個用於 collection-name 的字面值。有時,您可能需要根據某些條件在執行時更改此值。為此,請使用 collection-name-expression,其中提供的表示式可以是任何有效的 SpEL 表示式。

MongoDB 出站閘道器

版本 5.0 引入了 MongoDB 出站閘道器。它允許您透過向其請求通道傳送訊息來查詢資料庫。然後閘道器將響應傳送到回覆通道。您可以使用訊息有效負載和標頭來指定查詢和集合名稱,如下例所示

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory;

    @Autowired
    private MongoConverter;


    @Bean
    public IntegrationFlow gatewaySingleQueryFlow() {
        return f -> f
                .handle(queryOutboundGateway())
                .channel(c -> c.queue("retrieveResults"));
    }

    private MongoDbOutboundGatewaySpec queryOutboundGateway() {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
                .query("{name : 'Bob'}")
                .collectionNameFunction(m -> m.getHeaders().get("collection"))
                .expectSingleResult(true)
                .entityClass(Person.class);
    }

}
class MongoDbKotlinApplication {

    fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)

    @Autowired
    lateinit var mongoDbFactory: MongoDatabaseFactory

    @Autowired
    lateinit var mongoConverter: MongoConverter

    @Bean
    fun gatewaySingleQueryFlow() =
    integrationFlow {
        handle(queryOutboundGateway())
        channel { queue("retrieveResults") }
    }

    private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .query("{name : 'Bob'}")
            .collectionNameFunction<Any> { m -> m.headers["collection"] as String }
            .expectSingleResult(true)
            .entityClass(Person::class.java)
    }

}
@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory mongoDbFactory;

    @Bean
    @ServiceActivator(inputChannel = "requestChannel")
    public MessageHandler mongoDbOutboundGateway() {
        MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
        gateway.setCollectionNameExpressionString("'myCollection'");
        gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
        gateway.setExpectSingleResult(true);
        gateway.setEntityClass(Person.class);
        gateway.setOutputChannelName("replyChannel");
        return gateway;
    }

    @Bean
    @ServiceActivator(inputChannel = "replyChannel")
    public MessageHandler handler() {
        return message -> System.out.println(message.getPayload());
    }
}
<int-mongodb:outbound-gateway id="gatewayQuery"
    mongodb-factory="mongoDbFactory"
    mongo-converter="mongoConverter"
    query="{firstName: 'Bob'}"
    collection-name="myCollection"
    request-channel="in"
    reply-channel="out"
    entity-class="org.springframework.integration.mongodb.test.entity$Person"/>

您可以將以下屬性與 MongoDB 出站閘道器一起使用

  • collection-namecollection-name-expression: 標識要使用的 MongoDB 集合的名稱。

  • mongo-converter: 引用 o.s.data.mongodb.core.convert.MongoConverter 例項,該例項協助將原始 Java 物件轉換為 JSON 文件表示形式。

  • mongodb-factory: 引用 o.s.data.mongodb.MongoDbFactory 的例項。

  • mongo-template: 引用 o.s.data.mongodb.core.MongoTemplate 的例項。注意:您不能同時設定 mongo-templatemongodb-factory

  • entity-class: 要傳遞給 MongoTemplate 中 find(..)findOne(..) 方法的實體類的完全限定名。如果未提供此屬性,則預設值為 org.bson.Document

  • queryquery-expression: 指定 MongoDB 查詢。有關更多查詢示例,請參見 MongoDB 文件

  • collection-callback: 引用 org.springframework.data.mongodb.core.CollectionCallback 的例項。自 5.0.11 起,首選 o.s.i.mongodb.outbound.MessageCollectionCallback 的例項,帶有請求訊息上下文。有關更多資訊,請參閱其 Javadocs。注意:您不能同時擁有 collection-callback 和任何查詢屬性。

作為 queryquery-expression 屬性的替代方案,您可以使用 collectionCallback 屬性作為 MessageCollectionCallback 函式介面實現的引用來指定其他資料庫操作。以下示例指定了一個計數操作

private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
    return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .collectionCallback((collection, requestMessage) -> collection.count())
            .collectionName("myCollection");
}

MongoDB 響應式通道介面卡

從版本 5.3 開始,提供了 ReactiveMongoDbStoringMessageHandlerReactiveMongoDbMessageSource 實現。它們基於 Spring Data 的 ReactiveMongoOperations,並需要 org.mongodb:mongodb-driver-reactivestreams 依賴。

ReactiveMongoDbStoringMessageHandlerReactiveMessageHandler 的實現,當整合流定義中涉及響應式流組合時,該實現在框架中得到原生支援。有關更多資訊,請參見 ReactiveMessageHandler

從配置的角度來看,與許多其他標準通道介面卡沒有區別。例如,使用 Java DSL,這樣的通道介面卡可以像這樣使用

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return f -> f
            .channel(MessageChannels.flux())
            .handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}

在這個示例中,我們將透過提供的 ReactiveMongoDatabaseFactory 連線到 MongoDb,並將請求訊息中的資料儲存到名為 data 的預設集合中。實際操作將根據內部建立的 ReactiveStreamsConsumer 中的響應式流組合按需執行。

ReactiveMongoDbMessageSource 是一個 AbstractMessageSource 實現,它基於提供的 ReactiveMongoDatabaseFactoryReactiveMongoOperations 和 MongoDb 查詢(或表示式),根據 expectSingleResult 選項呼叫 find()findOne() 操作,並使用預期的 entityClass 型別來轉換查詢結果。當生成的訊息有效載荷中的 Publisher(根據 expectSingleResult 選項,為 FluxMono)被訂閱時,才按需執行查詢並評估結果。當在下游使用分發器和 FluxMessageChannel 時,框架可以自動訂閱此類有效載荷(本質上是 flatMap)。否則,下游端點中的輪詢釋出者由目標應用程式負責訂閱。

使用 Java DSL,這種通道介面卡可以這樣配置

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return IntegrationFlow
            .from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
                            .entityClass(Person.class),
                    c -> c.poller(Pollers.fixedDelay(1000)))
            .split()
            .channel(c -> c.flux("output"))
            .get();
}

從版本 5.5 開始,ReactiveMongoDbMessageSource 可以配置 updateExpression。它具有與阻塞 MongoDbMessageSource 相同的功能。有關更多資訊,請參閱 MongoDB 入站通道介面卡AbstractMongoDbMessageSourceSpec JavaDocs。

© . This site is unofficial and not affiliated with VMware.