MongoDB 支援
2.1 版本引入了對 MongoDB 的支援:一個“高效能、開源、面向文件的資料庫”。
您需要在專案中包含此依賴項
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mongodb</artifactId>
<version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-mongodb:6.4.4"
要下載、安裝和執行 MongoDB,請參閱 MongoDB 文件。
連線到 MongoDB
阻塞還是響應式?
從 5.3 版本開始,Spring Integration 提供了對響應式 MongoDB 驅動程式的支援,以便在訪問 MongoDB 時啟用非阻塞 I/O。要啟用響應式支援,請將 MongoDB 響應式流驅動程式新增到您的依賴項中
-
Maven
-
Gradle
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-reactivestreams"
對於常規同步客戶端,您需要將相應的驅動程式新增到依賴項中
-
Maven
-
Gradle
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-sync"
在框架中,兩者都是可選的 (optional
),以更好地支援終端使用者的選擇。
要開始與 MongoDB 互動,您首先需要連線到它。Spring Integration 構建在另一個 Spring 專案 Spring Data MongoDB 提供的支援之上。它提供了名為 MongoDatabaseFactory
和 ReactiveMongoDatabaseFactory
的工廠類,它們簡化了與 MongoDB 客戶端 API 的整合。
Spring Data 預設提供阻塞式 MongoDB 驅動程式,但您可以透過引入上述依賴項來選擇使用響應式方式。 |
使用 MongoDatabaseFactory
要連線到 MongoDB,您可以使用 MongoDatabaseFactory
介面的一個實現。
以下示例展示瞭如何使用 SimpleMongoClientDatabaseFactory
-
Java
-
XML
MongoDatabaseFactory mongoDbFactory =
new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
<constructor-arg>
<bean class="com.mongodb.client.MongoClients" factory-method="create"/>
</constructor-arg>
<constructor-arg value="test"/>
</bean>
SimpleMongoClientDatabaseFactory
接受兩個引數:一個 MongoClient
例項和一個指定資料庫名稱的 String
。如果您需要配置 host
、port
等屬性,可以使用底層 MongoClients
類提供的建構函式之一進行傳遞。有關如何配置 MongoDB 的更多資訊,請參閱 Spring-Data-MongoDB 參考文件。
使用 ReactiveMongoDatabaseFactory
要使用響應式驅動程式連線到 MongoDB,您可以使用 ReactiveMongoDatabaseFactory
介面的一個實現。
以下示例展示瞭如何使用 SimpleReactiveMongoDatabaseFactory
-
Java
-
XML
ReactiveMongoDatabaseFactory mongoDbFactory =
new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
<constructor-arg>
<bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
</constructor-arg>
<constructor-arg value="test"/>
</bean>
MongoDB 訊息儲存
正如《企業整合模式》(EIP) 一書中所述,訊息儲存 允許您持久化訊息。如果可靠性是關注點,這在處理具有訊息緩衝能力的元件(如 QueueChannel
、aggregator
、resequencer
等)時非常有用。在 Spring Integration 中,MessageStore
策略也為 EIP 中描述的 存根(claim check) 模式提供了基礎。
Spring Integration 的 MongoDB 模組提供了 MongoDbMessageStore
,它是 MessageStore
策略(主要用於存根模式)和 MessageGroupStore
策略(主要用於聚合器和重排序器模式)的實現。
以下示例配置了一個 MongoDbMessageStore
以使用 QueueChannel
和一個 aggregator
<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
<constructor-arg ref="mongoDbFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="mongoDbMessageStore"/>
<int:channel>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="mongoDbMessageStore"/>
上述示例是一個簡單的 Bean 配置,它期望一個 MongoDbFactory
作為建構函式引數。
MongoDbMessageStore
使用 Spring Data Mongo 對映機制將 Message
作為 Mongo 文件展開,包含所有巢狀屬性。當您需要訪問儲存訊息的 payload
或 headers
進行審計或分析時,這非常有用。
MongoDbMessageStore 使用自定義的 MappingMongoConverter 實現將 Message 例項儲存為 MongoDB 文件,並且對 Message 的屬性(payload 和 header 值)有一些限制。 |
從 5.1.6 版本開始,MongoDbMessageStore
可以配置自定義轉換器,這些轉換器會傳播到內部的 MappingMongoConverter
實現中。有關更多資訊,請參閱 MongoDbMessageStore.setCustomConverters(Object… customConverters)
的 JavaDocs。
Spring Integration 3.0 引入了 ConfigurableMongoDbMessageStore
。它實現了 MessageStore
和 MessageGroupStore
介面。此類可以接受一個 MongoTemplate
作為建構函式引數,您可以使用它來配置自定義的 WriteConcern
等。另一個建構函式需要一個 MappingMongoConverter
和一個 MongoDbFactory
,這允許您為 Message
例項及其屬性提供一些自定義轉換。請注意,預設情況下,ConfigurableMongoDbMessageStore
使用標準的 Java 序列化來向 MongoDB 寫入和讀取 Message
例項(參見 MongoDbMessageBytesConverter
),並依賴 MongoTemplate
的其他屬性的預設值。它從提供的 MongoDbFactory
和 MappingMongoConverter
構建一個 MongoTemplate
。ConfigurableMongoDbMessageStore
儲存的集合的預設名稱是 configurableStoreMessages
。我們建議在訊息包含複雜資料型別時使用此實現來建立健壯且靈活的解決方案。
從 6.0.8 版本開始,AbstractConfigurableMongoDbMessageStore
提供了一個 setCreateIndexes(boolean)
選項(預設為 true
),可用於停用自動索引建立。以下示例展示瞭如何宣告一個 Bean 並停用自動索引建立
@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
mongoDbChannelMessageStore.setCreateIndexes(false);
return mongoDbChannelMessageStore;
}
MongoDB 通道訊息儲存
4.0 版本引入了新的 MongoDbChannelMessageStore
。它是一個最佳化的 MessageGroupStore
,用於 QueueChannel
例項。透過設定 priorityEnabled = true
,您可以在 <int:priority-queue>
例項中使用它,以實現持久化訊息的優先順序順序輪詢。MongoDB 文件中的 priority 欄位填充自 IntegrationMessageHeaderAccessor.PRIORITY
(priority
) 訊息頭。
此外,所有 MongoDB MessageStore
例項現在都有一個用於 MessageGroup
文件的 sequence
欄位。sequence
值是同一集合中一個簡單 sequence
文件的 $inc
操作結果,該文件按需建立。sequence
欄位用於 poll
操作,以在訊息儲存在同一毫秒內時提供先進先出(FIFO)的訊息順序(如果在配置中指定了優先順序,則在優先順序內排序)。
我們不建議對優先順序和非優先順序使用同一個 MongoDbChannelMessageStore Bean,因為 priorityEnabled 選項適用於整個儲存。然而,同一個 collection 可以用於兩種 MongoDbChannelMessageStore 型別,因為從儲存中輪詢訊息是排序並使用索引的。要配置這種情況,您可以從另一個訊息儲存 Bean 擴充套件一個,如下面的示例所示 |
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
<constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="store"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
使用 AbstractConfigurableMongoDbMessageStore 並停用自動索引建立
從 6.0.8 版本開始,AbstractConfigurableMongoDbMessageStore
實現了一個 setCreateIndex(boolean)
方法,可用於停用或啟用(預設為啟用)自動索引建立。以下示例展示瞭如何宣告一個 Bean 並停用自動索引建立
@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
mongoDbChannelMessageStore.setCreateIndex(false);
return mongoDbChannelMessageStore;
}
MongoDB 元資料儲存
Spring Integration 4.2 引入了一個新的基於 MongoDB 的 MetadataStore
(參見 元資料儲存)實現。您可以使用 MongoDbMetadataStore
在應用程式重啟後維護元資料狀態。您可以將這個新的 MetadataStore
實現與以下介面卡一起使用,例如
要指示這些介面卡使用新的 MongoDbMetadataStore
,請宣告一個 Bean 名稱為 metadataStore
的 Spring Bean。feed 入站通道介面卡會自動拾取並使用宣告的 MongoDbMetadataStore
。以下示例展示瞭如何宣告一個 Bean 名稱為 metadataStore
的 Bean
@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}
MongoDbMetadataStore
還實現了 ConcurrentMetadataStore
,使其能夠可靠地在多個應用程式例項之間共享,其中只有一個例項被允許儲存或修改鍵的值。由於 MongoDB 的保證,所有這些操作都是原子性的。
MongoDB 入站通道介面卡
MongoDB 入站通道介面卡是一個輪詢消費者,它從 MongoDB 讀取資料並將其作為 Message
的 payload 傳送。以下示例展示瞭如何配置 MongoDB 入站通道介面卡
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query="{'name' : 'Bob'}"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>
如前述配置所示,您可以透過使用 inbound-channel-adapter
元素併為各種屬性提供值來配置 MongoDB 入站通道介面卡,例如
-
query
: 一個 JSON 查詢(參見 MongoDB 查詢) -
query-expression
: 一個 SpEL 表示式,評估結果為一個 JSON 查詢字串(如上面query
屬性所示)或o.s.data.mongodb.core.query.Query
的一個例項。與query
屬性互斥。 -
entity-class
: Payload 物件的型別。如果未提供,則返回一個com.mongodb.DBObject
。 -
collection-name
或collection-name-expression
: 指定要使用的 MongoDB 集合的名稱。 -
mongodb-factory
: 對o.s.data.mongodb.MongoDbFactory
例項的引用 -
mongo-template
: 對o.s.data.mongodb.core.MongoTemplate
例項的引用 -
其他適用於所有入站介面卡的通用屬性(如 'channel')。
您不能同時設定 mongo-template 和 mongodb-factory 。 |
上述示例相對簡單和靜態,因為它對 query
使用字面值,並對 collection
使用預設名稱。有時,您可能需要在執行時根據某些條件更改這些值。為此,可以使用它們的 -expression
等效項(query-expression
和 collection-name-expression
),其中提供的表示式可以是任何有效的 SpEL 表示式。
此外,您可能希望對從 MongoDB 讀取併成功處理的資料進行一些後處理。例如,您可能希望在文件處理後移動或刪除它。您可以透過使用 Spring Integration 2.2 新增的事務同步功能來實現這一點,如下面的示例所示
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="200" max-messages-per-poll="1">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-mongodb:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit
expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
channel="someChannel"/>
</int:transaction-synchronization-factory>
<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
以下示例展示了前述示例中引用的 DocumentCleaner
public class DocumentCleaner {
public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
if (target instanceof List<?> documents){
for (Object document : documents) {
mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
}
}
}
}
您可以使用 transactional
元素將輪詢器宣告為事務性的。此元素可以引用一個真正的事務管理器(例如,如果您的流的其他部分呼叫 JDBC)。如果您沒有“真正的”事務,您可以使用 o.s.i.transaction.PseudoTransactionManager
的例項,它是 Spring PlatformTransactionManager
的一個實現,並在沒有實際事務時啟用 Mongo 介面卡的事務同步功能。
這樣做並不會使 MongoDB 本身具有事務性。它允許在成功(提交)之前或之後,或在失敗(回滾)之後進行操作同步。 |
一旦您的輪詢器是事務性的,您可以在 transactional
元素上設定一個 o.s.i.transaction.TransactionSynchronizationFactory
的例項。TransactionSynchronizationFactory
建立一個 TransactionSynchronization
的例項。為了方便起見,我們公開了一個預設的基於 SpEL 的 TransactionSynchronizationFactory
,它允許您配置 SpEL 表示式,其執行與事務協調(同步)。支援 before-commit、after-commit 和 after-rollback 事件的表示式,併為每個事件提供一個通道,評估結果(如果有)將傳送到該通道。對於每個子元素,您可以指定 expression
和 channel
屬性。如果只存在 channel
屬性,則接收到的訊息將作為特定同步場景的一部分發送到那裡。如果只存在 expression
屬性且表示式結果為非空值,則會生成一個以結果為 payload 的訊息併發送到預設通道(NullChannel
),並出現在日誌中(DEBUG
級別)。如果您希望評估結果傳送到特定通道,請新增一個 channel
屬性。如果表示式結果為 null 或 void,則不會生成訊息。
有關事務同步的更多資訊,請參閱 事務同步。
從 5.5 版本開始,MongoDbMessageSource
可以配置一個 updateExpression
,它必須評估為一個具有 MongoDB update
語法的 String
或 org.springframework.data.mongodb.core.query.Update
的一個例項。它可以用作上述後處理過程的替代方案,它修改從集合中取出的實體,以便在下一個輪詢週期中不會再次從集合中取出它們(假設更新改變了查詢中使用的某個值)。當同一個集合的多個 MongoDbMessageSource
例項在叢集中使用時,仍然建議使用事務來實現執行隔離和資料一致性。
MongoDB 變更流入站通道介面卡
從 5.3 版本開始,spring-integration-mongodb
模組引入了 MongoDbChangeStreamMessageProducer
- 這是 Spring Data ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class)
API 的一個響應式 MessageProducerSupport
實現。該元件預設生成一個訊息的 Flux
流,其中 payload 為 ChangeStreamEvent
的 body
,幷包含一些變更流相關的 headers(參見 MongoHeaders
)。建議將此 MongoDbChangeStreamMessageProducer
與一個 FluxMessageChannel
結合作為 outputChannel
,用於按需訂閱和下游事件消費。
此通道介面卡的 Java DSL 配置可能如下所示
@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
return IntegrationFlow.from(
MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
.domainType(Person.class)
.collection("person")
.extractBody(false))
.channel(MessageChannels.flux())
.get();
}
當 MongoDbChangeStreamMessageProducer
停止,或下游取消訂閱,或 MongoDB 變更流生成 OperationType.INVALIDATE
時,Publisher
會完成。通道介面卡可以再次啟動,並建立一個新的源資料 Publisher
,並在 MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>)
中自動訂閱。如果在啟動之間需要從其他位置消費變更流事件,可以重新配置此通道介面卡的新選項。
有關 Spring Data MongoDB 中變更流支援的更多資訊,請參閱文件。
MongoDB 出站通道介面卡
MongoDB 出站通道介面卡允許您將訊息 payload 寫入 MongoDB 文件儲存,如下面的示例所示
<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
collection-name="myCollection"
mongo-converter="mongoConverter"
mongodb-factory="mongoDbFactory" />
如前述配置所示,您可以透過使用 outbound-channel-adapter
元素併為各種屬性提供值來配置 MongoDB 出站通道介面卡,例如
-
collection-name
或collection-name-expression
: 指定要使用的 MongoDB 集合的名稱。 -
mongo-converter
: 對o.s.data.mongodb.core.convert.MongoConverter
例項的引用,用於協助將原始 Java 物件轉換為 JSON 文件表示形式。 -
mongodb-factory
: 對o.s.data.mongodb.MongoDbFactory
例項的引用。 -
mongo-template
: 對o.s.data.mongodb.core.MongoTemplate
例項的引用。注意:您不能同時設定 mongo-template 和 mongodb-factory。 -
其他適用於所有入站介面卡的通用屬性(如 'channel')。
上述示例相對簡單和靜態,因為它對 collection-name
使用字面值。有時,您可能需要在執行時根據某些條件更改此值。為此,可以使用 collection-name-expression
,其中提供的表示式可以是任何有效的 SpEL 表示式。
MongoDB 出站閘道器
5.0 版本引入了 MongoDB 出站閘道器。它允許您透過向其請求通道傳送訊息來查詢資料庫。然後閘道器將響應傳送到回覆通道。您可以使用訊息 payload 和 headers 來指定查詢和集合名稱,如下面的示例所示
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@SpringBootApplication
public class MongoDbJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private MongoDbFactory;
@Autowired
private MongoConverter;
@Bean
public IntegrationFlow gatewaySingleQueryFlow() {
return f -> f
.handle(queryOutboundGateway())
.channel(c -> c.queue("retrieveResults"));
}
private MongoDbOutboundGatewaySpec queryOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction(m -> m.getHeaders().get("collection"))
.expectSingleResult(true)
.entityClass(Person.class);
}
}
class MongoDbKotlinApplication {
fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)
@Autowired
lateinit var mongoDbFactory: MongoDatabaseFactory
@Autowired
lateinit var mongoConverter: MongoConverter
@Bean
fun gatewaySingleQueryFlow() =
integrationFlow {
handle(queryOutboundGateway())
channel { queue("retrieveResults") }
}
private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction<Any> { m -> m.headers["collection"] as String }
.expectSingleResult(true)
.entityClass(Person::class.java)
}
}
@SpringBootApplication
public class MongoDbJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private MongoDbFactory mongoDbFactory;
@Bean
@ServiceActivator(inputChannel = "requestChannel")
public MessageHandler mongoDbOutboundGateway() {
MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
gateway.setCollectionNameExpressionString("'myCollection'");
gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
gateway.setExpectSingleResult(true);
gateway.setEntityClass(Person.class);
gateway.setOutputChannelName("replyChannel");
return gateway;
}
@Bean
@ServiceActivator(inputChannel = "replyChannel")
public MessageHandler handler() {
return message -> System.out.println(message.getPayload());
}
}
<int-mongodb:outbound-gateway id="gatewayQuery"
mongodb-factory="mongoDbFactory"
mongo-converter="mongoConverter"
query="{firstName: 'Bob'}"
collection-name="myCollection"
request-channel="in"
reply-channel="out"
entity-class="org.springframework.integration.mongodb.test.entity$Person"/>
您可以與 MongoDB 出站閘道器一起使用以下屬性
-
collection-name
或collection-name-expression
: 指定要使用的 MongoDB 集合的名稱。 -
mongo-converter
: 對o.s.data.mongodb.core.convert.MongoConverter
例項的引用,用於協助將原始 Java 物件轉換為 JSON 文件表示形式。 -
mongodb-factory
: 對o.s.data.mongodb.MongoDbFactory
例項的引用。 -
mongo-template
: 對o.s.data.mongodb.core.MongoTemplate
例項的引用。注意:您不能同時設定mongo-template
和mongodb-factory
。 -
entity-class
: 要傳遞給 MongoTemplate 中find(..)
和findOne(..)
方法的實體類的完全限定名。如果未提供此屬性,預設值為org.bson.Document
。 -
query
或query-expression
: 指定 MongoDB 查詢。有關更多查詢示例,請參閱 MongoDB 文件。 -
collection-callback
: 對org.springframework.data.mongodb.core.CollectionCallback
例項的引用。自 5.0.11 版本起,推薦使用帶有請求訊息上下文的o.s.i.mongodb.outbound.MessageCollectionCallback
例項。有關更多資訊,請參閱其 Javadocs。注意:您不能同時擁有collection-callback
和任何查詢屬性。
作為 query
和 query-expression
屬性的替代方案,您可以使用 collectionCallback
屬性作為對 MessageCollectionCallback
函式式介面實現的引用來指定其他資料庫操作。以下示例指定了一個計數操作
private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.collectionCallback((collection, requestMessage) -> collection.count())
.collectionName("myCollection");
}
MongoDB 響應式通道介面卡
從 5.3 版本開始,提供了 ReactiveMongoDbStoringMessageHandler
和 ReactiveMongoDbMessageSource
實現。它們基於 Spring Data 的 ReactiveMongoOperations
,並需要 org.mongodb:mongodb-driver-reactivestreams
依賴項。
ReactiveMongoDbStoringMessageHandler
是 ReactiveMessageHandler
的實現,當整合流定義涉及響應式流組合時,框架原生支援它。有關更多資訊,請參閱 ReactiveMessageHandler。
從配置角度來看,它與許多其他標準通道介面卡沒有區別。例如,使用 Java DSL,此類通道介面卡可以這樣使用
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return f -> f
.channel(MessageChannels.flux())
.handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}
在此示例中,我們將透過提供的 ReactiveMongoDatabaseFactory
連線到 MongoDB,並將請求訊息中的資料儲存到預設名稱為 data
的集合中。實際操作將根據內部建立的 ReactiveStreamsConsumer
中響應式流組合的需要執行。
ReactiveMongoDbMessageSource
是一個 AbstractMessageSource
實現,它基於所提供的 ReactiveMongoDatabaseFactory
或 ReactiveMongoOperations
和 MongoDB 查詢(或表示式),根據 expectSingleResult
選項呼叫 find()
或 findOne()
操作,並使用預期的 entityClass
型別來轉換查詢結果。查詢執行和結果評估是按需執行的,即當生成的訊息的有效載荷中的 Publisher
(Flux
或 Mono
,根據 expectSingleResult
選項)被訂閱時。當在下游使用 splitter 和 FluxMessageChannel
時,框架可以自動訂閱此類有效載荷(本質上是 flatMap
)。否則,目標應用程式有責任在下游端點中訂閱輪詢的 Publisher
。
使用 Java DSL,這樣的通道介面卡可以配置為
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return IntegrationFlow
.from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
.entityClass(Person.class),
c -> c.poller(Pollers.fixedDelay(1000)))
.split()
.channel(c -> c.flux("output"))
.get();
}
從版本 5.5 開始,ReactiveMongoDbMessageSource
可以配置 updateExpression
。它具有與阻塞式 MongoDbMessageSource
相同的功能。參見 MongoDB 入站通道介面卡 和 AbstractMongoDbMessageSourceSpec
JavaDocs 瞭解更多資訊。