MongoDb 支援
版本 2.1 引入了對 MongoDB 的支援:一個“高效能、開源、面向文件的資料庫”。
專案需要此依賴項
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mongodb</artifactId>
<version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-mongodb:7.0.0"
要下載、安裝和執行 MongoDB,請參閱 MongoDB 文件。
連線到 MongoDb
阻塞還是響應式?
從版本 5.3 開始,Spring Integration 提供了對響應式 MongoDB 驅動程式的支援,以在訪問 MongoDB 時啟用非阻塞 I/O。要啟用響應式支援,請將 MongoDB 響應式流驅動程式新增到您的依賴項中。
-
Maven
-
Gradle
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-reactivestreams"
對於常規同步客戶端,您需要將其相應的驅動程式新增到依賴項中。
-
Maven
-
Gradle
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-sync"
兩者在框架中都是 optional(可選)的,以更好地支援終端使用者的選擇。
要開始與 MongoDB 互動,您首先需要連線到它。Spring Integration 基於另一個 Spring 專案 Spring Data MongoDB 提供的支援。它提供了名為 MongoDatabaseFactory 和 ReactiveMongoDatabaseFactory 的工廠類,它們簡化了與 MongoDB 客戶端 API 的整合。
| Spring Data 預設提供阻塞式 MongoDB 驅動程式,但您可以透過包含上述依賴項來選擇使用響應式模式。 |
使用 MongoDatabaseFactory
要連線到 MongoDB,您可以使用 MongoDatabaseFactory 介面的實現。
以下示例展示瞭如何使用 SimpleMongoClientDatabaseFactory
-
Java
-
XML
MongoDatabaseFactory mongoDbFactory =
new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
<constructor-arg>
<bean class="com.mongodb.client.MongoClients" factory-method="create"/>
</constructor-arg>
<constructor-arg value="test"/>
</bean>
SimpleMongoClientDatabaseFactory 接收兩個引數:一個 MongoClient 例項和一個指定資料庫名稱的 String。如果您需要配置 host、port 等屬性,您可以使用底層 MongoClients 類提供的建構函式之一來傳遞這些屬性。有關如何配置 MongoDB 的更多資訊,請參閱 Spring-Data-MongoDB 參考。
使用 ReactiveMongoDatabaseFactory
要使用響應式驅動程式連線到 MongoDB,您可以使用 ReactiveMongoDatabaseFactory 介面的實現。
以下示例展示瞭如何使用 SimpleReactiveMongoDatabaseFactory
-
Java
-
XML
ReactiveMongoDatabaseFactory mongoDbFactory =
new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
<constructor-arg>
<bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
</constructor-arg>
<constructor-arg value="test"/>
</bean>
MongoDB 訊息儲存
如《企業整合模式》(EIP)一書所述,訊息儲存允許您持久化訊息。當處理具有緩衝訊息能力的元件(QueueChannel、aggregator、resequencer 等)時,如果可靠性是一個問題,這會很有用。在 Spring Integration 中,MessageStore 策略也為 索賠檢查 模式奠定了基礎,該模式也在 EIP 中有所描述。
Spring Integration 的 MongoDB 模組提供了 MongoDbMessageStore,它是 MessageStore 策略(主要用於索賠檢查模式)和 MessageGroupStore 策略(主要用於聚合器和重新排序器模式)的實現。
以下示例配置了一個 MongoDbMessageStore,以使用 QueueChannel 和 aggregator
<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
<constructor-arg ref="mongoDbFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="mongoDbMessageStore"/>
<int:channel>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="mongoDbMessageStore"/>
上述示例是一個簡單的 bean 配置,它需要一個 MongoDbFactory 作為建構函式引數。
MongoDbMessageStore 使用 Spring Data Mongo 對映機制將 Message 擴充套件為包含所有巢狀屬性的 Mongo 文件。當您需要訪問 payload 或 headers 以進行審計或分析(例如,針對儲存的訊息)時,這很有用。
MongoDbMessageStore 使用自定義的 MappingMongoConverter 實現將 Message 例項儲存為 MongoDB 文件,並且對 Message 的屬性(payload 和 header 值)有一些限制。 |
從版本 5.1.6 開始,MongoDbMessageStore 可以配置自定義轉換器,這些轉換器會傳播到內部的 MappingMongoConverter 實現中。有關更多資訊,請參閱 MongoDbMessageStore.setCustomConverters(Object… customConverters) JavaDocs。
Spring Integration 3.0 引入了 ConfigurableMongoDbMessageStore。它實現了 MessageStore 和 MessageGroupStore 介面。這個類可以接收 MongoTemplate 作為建構函式引數,例如,您可以使用它配置自定義的 WriteConcern。另一個建構函式需要一個 MappingMongoConverter 和一個 MongoDbFactory,這允許您為 Message 例項及其屬性提供一些自定義轉換。請注意,預設情況下,ConfigurableMongoDbMessageStore 使用標準 Java 序列化來寫入和讀取 MongoDB 中的 Message 例項(參見 MongoDbMessageBytesConverter),並依賴 MongoTemplate 中其他屬性的預設值。它從提供的 MongoDbFactory 和 MappingMongoConverter 構建一個 MongoTemplate。ConfigurableMongoDbMessageStore 儲存的集合的預設名稱是 configurableStoreMessages。當訊息包含複雜資料型別時,我們建議使用此實現來建立健壯且靈活的解決方案。
從 6.0.8 版本開始,AbstractConfigurableMongoDbMessageStore 提供了 setCreateIndexes(boolean) 選項(預設為 true),可用於停用自動索引建立。以下示例展示瞭如何宣告一個 bean 並停用自動索引建立。
@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
mongoDbChannelMessageStore.setCreateIndexes(false);
return mongoDbChannelMessageStore;
}
MongoDB 通道訊息儲存
版本 4.0 引入了新的 MongoDbChannelMessageStore。它是一個最佳化的 MessageGroupStore,用於 QueueChannel 例項。透過 priorityEnabled = true,您可以在 <int:priority-queue> 例項中使用它,以實現持久化訊息的優先順序順序輪詢。優先順序 MongoDB 文件欄位由 IntegrationMessageHeaderAccessor.PRIORITY (priority) 訊息頭填充。
此外,所有 MongoDB MessageStore 例項現在都有一個用於 MessageGroup 文件的 sequence 欄位。sequence 值是同一集合中簡單 sequence 文件的 $inc 操作的結果,該文件按需建立。在 poll 操作中,sequence 欄位用於在訊息儲存在同一毫秒內時提供先進先出(FIFO)的訊息順序(如果配置了優先順序,則在優先順序內)。
我們不建議將同一個 MongoDbChannelMessageStore bean 用於優先順序和非優先順序,因為 priorityEnabled 選項適用於整個儲存。但是,同一個 collection 可以用於兩種 MongoDbChannelMessageStore 型別,因為從儲存中輪詢訊息是排序的並使用索引。要配置此場景,您可以將一個訊息儲存 bean 擴充套件到另一個,如下例所示 |
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
<constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="store"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
使用停用自動索引建立的 AbstractConfigurableMongoDbMessageStore
從版本 6.0.8 開始,AbstractConfigurableMongoDbMessageStore 實現了 setCreateIndex(boolean),可用於停用或啟用(預設)自動索引建立。以下示例展示瞭如何宣告一個 bean 並停用自動索引建立
@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
mongoDbChannelMessageStore.setCreateIndex(false);
return mongoDbChannelMessageStore;
}
MongoDB 元資料儲存
Spring Integration 4.2 引入了新的基於 MongoDB 的 MetadataStore(參見 Metadata Store)實現。您可以使用 MongoDbMetadataStore 在應用程式重啟後維護元資料狀態。您可以將此新的 MetadataStore 實現與以下介面卡一起使用
要指示這些介面卡使用新的 MongoDbMetadataStore,請宣告一個 bean 名稱為 metadataStore 的 Spring bean。Feed 入站通道介面卡會自動識別並使用宣告的 MongoDbMetadataStore。以下示例展示瞭如何宣告一個名為 metadataStore 的 bean
@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}
MongoDbMetadataStore 也實現了 ConcurrentMetadataStore,這使得它可以在多個應用程式例項之間可靠地共享,其中只有一個例項被允許儲存或修改鍵的值。由於 MongoDB 的保證,所有這些操作都是原子性的。
MongoDB 入站通道介面卡
MongoDB 入站通道介面卡是一個輪詢消費者,它從 MongoDB 讀取資料並將其作為 Message payload 傳送。以下示例展示瞭如何配置 MongoDB 入站通道介面卡
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query="{'name' : 'Bob'}"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>
如上述配置所示,您可以使用 inbound-channel-adapter 元素配置 MongoDB 入站通道介面卡,併為各種屬性提供值,例如
-
query: 一個 JSON 查詢(參見 MongoDB 查詢) -
query-expression: 一個 SpEL 表示式,其計算結果為 JSON 查詢字串(如上面的query屬性)或o.s.data.mongodb.core.query.Query的例項。與query屬性互斥。 -
entity-class: payload 物件的型別。如果未提供,則返回com.mongodb.DBObject。 -
collection-name或collection-name-expression: 標識要使用的 MongoDB 集合的名稱。 -
mongodb-factory: 引用o.s.data.mongodb.MongoDbFactory的例項 -
mongo-template: 引用o.s.data.mongodb.core.MongoTemplate的例項 -
所有其他入站介面卡共有的其他屬性(例如 'channel')。
您不能同時設定 mongo-template 和 mongodb-factory。 |
前面的示例相對簡單和靜態,因為它有一個用於 query 的字面值,並使用 collection 的預設名稱。有時,您可能需要根據某些條件在執行時更改這些值。為此,請使用它們的 -expression 等效項(query-expression 和 collection-name-expression),其中提供的表示式可以是任何有效的 SpEL 表示式。
此外,您可能希望對從 MongoDB 讀取的已成功處理的資料進行一些後處理。例如,您可能希望在文件處理後移動或刪除它。您可以透過使用 Spring Integration 2.2 新增的事務同步功能來實現這一點,如下例所示
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="200" max-messages-per-poll="1">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-mongodb:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit
expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
channel="someChannel"/>
</int:transaction-synchronization-factory>
<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
以下示例顯示了前面示例中引用的 DocumentCleaner
public class DocumentCleaner {
public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
if (target instanceof List<?> documents){
for (Object document : documents) {
mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
}
}
}
}
您可以使用 transactional 元素將輪詢器宣告為事務性的。此元素可以引用一個真正的事務管理器,例如,如果您的流的某些其他部分呼叫了 JDBC。如果您沒有“真正的”事務,您可以使用 o.s.i.transaction.PseudoTransactionManager 例項,它是 Spring 的 PlatformTransactionManager 的實現,並在沒有實際事務時啟用 Mongo 介面卡的事務同步功能。
| 這樣做並不會使 MongoDB 本身具有事務性。它允許在成功(提交)之前或之後或失敗(回滾)之後進行動作的同步。 |
一旦您的輪詢器是事務性的,您就可以在 transactional 元素上設定 o.s.i.transaction.TransactionSynchronizationFactory 的例項。TransactionSynchronizationFactory 建立 TransactionSynchronization 的例項。為了您的方便,我們公開了一個基於 SpEL 的預設 TransactionSynchronizationFactory,它允許您配置 SpEL 表示式,它們的執行與事務協調(同步)。支援提交前、提交後和回滾後事件的表示式,以及每個事件的通道,其中傳送評估結果(如果有)。對於每個子元素,您可以指定 expression 和 channel 屬性。如果只有 channel 屬性存在,則接收到的訊息作為特定同步場景的一部分發送到那裡。如果只有 expression 屬性存在且表示式的結果是非空值,則生成一個以結果作為有效載荷的訊息併發送到預設通道(NullChannel)並出現在日誌中(在 DEBUG 級別)。如果您希望評估結果轉到特定通道,請新增 channel 屬性。如果表示式的結果為 null 或 void,則不生成訊息。
有關事務同步的更多資訊,請參閱 事務同步。
從 5.5 版本開始,MongoDbMessageSource 可以配置一個 updateExpression,它必須評估為具有 MongoDb update 語法的 String 或 org.springframework.data.mongodb.core.query.Update 例項。它可以作為描述上述後處理過程的替代方法,並且它會修改從集合中獲取的實體,這樣它們就不會在下一個輪詢週期中再次從集合中拉取(假設更新更改了查詢中使用的一些值)。當叢集中使用多個針對同一集合的 MongoDbMessageSource 例項時,仍然建議使用事務來實現執行隔離和資料一致性。
MongoDB 變更流入站通道介面卡
從 5.3 版本開始,spring-integration-mongodb 模組引入了 MongoDbChangeStreamMessageProducer —— 一個響應式 MessageProducerSupport 實現,用於 Spring Data 的 ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class) API。預設情況下,該元件生成一個訊息 Flux,其 body 為 ChangeStreamEvent 作為有效載荷,以及一些與變更流相關的頭部資訊(參見 MongoHeaders)。建議將此 MongoDbChangeStreamMessageProducer 與 FluxMessageChannel 結合作為 outputChannel,以實現按需訂閱和下游事件消費。
此通道介面卡的 Java DSL 配置可能如下所示
@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
return IntegrationFlow.from(
MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
.domainType(Person.class)
.collection("person")
.extractBody(false))
.channel(MessageChannels.flux())
.get();
}
當 MongoDbChangeStreamMessageProducer 停止,或下游訂閱被取消,或 MongoDb 變更流產生 OperationType.INVALIDATE 時,Publisher 完成。通道介面卡可以再次啟動,並建立一個新的源資料 Publisher,它會自動訂閱到 MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>)。如果在啟動之間需要從其他位置消費變更流事件,則可以為新的選項重新配置此通道介面卡。
有關變更流支援的更多資訊,請參閱 Spring Data MongoDB 文件。
MongoDB 出站通道介面卡
MongoDB 出站通道介面卡允許您將訊息有效載荷寫入 MongoDB 文件儲存,如下例所示
<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
collection-name="myCollection"
mongo-converter="mongoConverter"
mongodb-factory="mongoDbFactory" />
如上述配置所示,您可以透過使用 outbound-channel-adapter 元素併為各種屬性提供值來配置 MongoDB 出站通道介面卡,例如
-
collection-name或collection-name-expression: 標識要使用的 MongoDb 集合的名稱。 -
mongo-converter: 引用o.s.data.mongodb.core.convert.MongoConverter例項,該例項協助將原始 Java 物件轉換為 JSON 文件表示形式。 -
mongodb-factory: 引用o.s.data.mongodb.MongoDbFactory的例項。 -
mongo-template: 引用o.s.data.mongodb.core.MongoTemplate的例項。注意:您不能同時設定mongo-template和mongodb-factory。 -
所有入站介面卡共有的其他屬性(例如 'channel')。
前面的示例相對簡單和靜態,因為它有一個用於 collection-name 的字面值。有時,您可能需要根據某些條件在執行時更改此值。為此,請使用 collection-name-expression,其中提供的表示式可以是任何有效的 SpEL 表示式。
MongoDB 出站閘道器
版本 5.0 引入了 MongoDB 出站閘道器。它允許您透過向其請求通道傳送訊息來查詢資料庫。然後閘道器將響應傳送到回覆通道。您可以使用訊息有效負載和標頭來指定查詢和集合名稱,如下例所示
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@SpringBootApplication
public class MongoDbJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private MongoDbFactory;
@Autowired
private MongoConverter;
@Bean
public IntegrationFlow gatewaySingleQueryFlow() {
return f -> f
.handle(queryOutboundGateway())
.channel(c -> c.queue("retrieveResults"));
}
private MongoDbOutboundGatewaySpec queryOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction(m -> m.getHeaders().get("collection"))
.expectSingleResult(true)
.entityClass(Person.class);
}
}
class MongoDbKotlinApplication {
fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)
@Autowired
lateinit var mongoDbFactory: MongoDatabaseFactory
@Autowired
lateinit var mongoConverter: MongoConverter
@Bean
fun gatewaySingleQueryFlow() =
integrationFlow {
handle(queryOutboundGateway())
channel { queue("retrieveResults") }
}
private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction<Any> { m -> m.headers["collection"] as String }
.expectSingleResult(true)
.entityClass(Person::class.java)
}
}
@SpringBootApplication
public class MongoDbJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private MongoDbFactory mongoDbFactory;
@Bean
@ServiceActivator(inputChannel = "requestChannel")
public MessageHandler mongoDbOutboundGateway() {
MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
gateway.setCollectionNameExpressionString("'myCollection'");
gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
gateway.setExpectSingleResult(true);
gateway.setEntityClass(Person.class);
gateway.setOutputChannelName("replyChannel");
return gateway;
}
@Bean
@ServiceActivator(inputChannel = "replyChannel")
public MessageHandler handler() {
return message -> System.out.println(message.getPayload());
}
}
<int-mongodb:outbound-gateway id="gatewayQuery"
mongodb-factory="mongoDbFactory"
mongo-converter="mongoConverter"
query="{firstName: 'Bob'}"
collection-name="myCollection"
request-channel="in"
reply-channel="out"
entity-class="org.springframework.integration.mongodb.test.entity$Person"/>
您可以將以下屬性與 MongoDB 出站閘道器一起使用
-
collection-name或collection-name-expression: 標識要使用的 MongoDB 集合的名稱。 -
mongo-converter: 引用o.s.data.mongodb.core.convert.MongoConverter例項,該例項協助將原始 Java 物件轉換為 JSON 文件表示形式。 -
mongodb-factory: 引用o.s.data.mongodb.MongoDbFactory的例項。 -
mongo-template: 引用o.s.data.mongodb.core.MongoTemplate的例項。注意:您不能同時設定mongo-template和mongodb-factory。 -
entity-class: 要傳遞給 MongoTemplate 中find(..)和findOne(..)方法的實體類的完全限定名。如果未提供此屬性,則預設值為org.bson.Document。 -
query或query-expression: 指定 MongoDB 查詢。有關更多查詢示例,請參見 MongoDB 文件。 -
collection-callback: 引用org.springframework.data.mongodb.core.CollectionCallback的例項。自 5.0.11 起,首選o.s.i.mongodb.outbound.MessageCollectionCallback的例項,帶有請求訊息上下文。有關更多資訊,請參閱其 Javadocs。注意:您不能同時擁有collection-callback和任何查詢屬性。
作為 query 和 query-expression 屬性的替代方案,您可以使用 collectionCallback 屬性作為 MessageCollectionCallback 函式介面實現的引用來指定其他資料庫操作。以下示例指定了一個計數操作
private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.collectionCallback((collection, requestMessage) -> collection.count())
.collectionName("myCollection");
}
MongoDB 響應式通道介面卡
從版本 5.3 開始,提供了 ReactiveMongoDbStoringMessageHandler 和 ReactiveMongoDbMessageSource 實現。它們基於 Spring Data 的 ReactiveMongoOperations,並需要 org.mongodb:mongodb-driver-reactivestreams 依賴。
ReactiveMongoDbStoringMessageHandler 是 ReactiveMessageHandler 的實現,當整合流定義中涉及響應式流組合時,該實現在框架中得到原生支援。有關更多資訊,請參見 ReactiveMessageHandler。
從配置的角度來看,與許多其他標準通道介面卡沒有區別。例如,使用 Java DSL,這樣的通道介面卡可以像這樣使用
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return f -> f
.channel(MessageChannels.flux())
.handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}
在這個示例中,我們將透過提供的 ReactiveMongoDatabaseFactory 連線到 MongoDb,並將請求訊息中的資料儲存到名為 data 的預設集合中。實際操作將根據內部建立的 ReactiveStreamsConsumer 中的響應式流組合按需執行。
ReactiveMongoDbMessageSource 是一個 AbstractMessageSource 實現,它基於提供的 ReactiveMongoDatabaseFactory 或 ReactiveMongoOperations 和 MongoDb 查詢(或表示式),根據 expectSingleResult 選項呼叫 find() 或 findOne() 操作,並使用預期的 entityClass 型別來轉換查詢結果。當生成的訊息有效載荷中的 Publisher(根據 expectSingleResult 選項,為 Flux 或 Mono)被訂閱時,才按需執行查詢並評估結果。當在下游使用分發器和 FluxMessageChannel 時,框架可以自動訂閱此類有效載荷(本質上是 flatMap)。否則,下游端點中的輪詢釋出者由目標應用程式負責訂閱。
使用 Java DSL,這種通道介面卡可以這樣配置
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return IntegrationFlow
.from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
.entityClass(Person.class),
c -> c.poller(Pollers.fixedDelay(1000)))
.split()
.channel(c -> c.flux("output"))
.get();
}
從版本 5.5 開始,ReactiveMongoDbMessageSource 可以配置 updateExpression。它具有與阻塞 MongoDbMessageSource 相同的功能。有關更多資訊,請參閱 MongoDB 入站通道介面卡 和 AbstractMongoDbMessageSourceSpec JavaDocs。