會話與事務
從版本 3.6 開始,MongoDB 支援會話的概念。使用會話可以啟用 MongoDB 的因果一致性模型,該模型保證操作按照尊重其因果關係順序執行。這些會話分為 ServerSession
例項和 ClientSession
例項。在本節中,當我們提到會話時,我們指的是 ClientSession
。
客戶端會話內的操作與會話外的操作並非隔離。 |
MongoOperations
和 ReactiveMongoOperations
都提供了將 ClientSession
繫結到操作的閘道器方法。MongoCollection
和 MongoDatabase
使用實現 MongoDB 集合和資料庫介面的會話代理物件,因此您無需在每次呼叫時都新增會話。這意味著對 MongoCollection#find()
的潛在呼叫會被委託給 MongoCollection#find(ClientSession)
。
諸如 (Reactive)MongoOperations#getCollection 之類的方法會返回原生 MongoDB Java Driver 閘道器物件(例如 MongoCollection ),這些物件本身提供了用於 ClientSession 的專用方法。這些方法**不**進行會話代理。當直接與 MongoCollection 或 MongoDatabase 互動而不是透過 MongoOperations 上的 #execute 回撥時,您應在需要的地方提供 ClientSession 。 |
ClientSession 支援
以下示例展示了會話的使用方法
-
命令式
-
響應式
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();
ClientSession session = client.startSession(sessionOptions); (1)
template.withSession(() -> session)
.execute(action -> {
Query query = query(where("name").is("Durzo Blint"));
Person durzo = action.findOne(query, Person.class); (2)
Person azoth = new Person("Kylar Stern");
azoth.setMaster(durzo);
action.insert(azoth); (3)
return azoth;
});
session.close() (4)
1 | 從伺服器獲取一個新會話。 |
2 | 像以前一樣使用 MongoOperation 方法。ClientSession 會自動應用。 |
3 | 務必關閉 ClientSession 。 |
4 | 關閉會話。 |
處理 DBRef 例項時,特別是延遲載入的例項,至關重要的是在所有資料載入之前**不要**關閉 ClientSession 。否則,延遲載入會失敗。 |
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();
Publisher<ClientSession> session = client.startSession(sessionOptions); (1)
template.withSession(session)
.execute(action -> {
Query query = query(where("name").is("Durzo Blint"));
return action.findOne(query, Person.class)
.flatMap(durzo -> {
Person azoth = new Person("Kylar Stern");
azoth.setMaster(durzo);
return action.insert(azoth); (2)
});
}, ClientSession::close) (3)
.subscribe(); (4)
1 | 獲取用於檢索新會話的 Publisher 。 |
2 | 像以前一樣使用 ReactiveMongoOperation 方法。ClientSession 會自動獲取和應用。 |
3 | 務必關閉 ClientSession 。 |
4 | 在您訂閱之前不會發生任何事情。有關詳細資訊,請參閱Project Reactor 參考指南。 |
透過使用提供實際會話的 Publisher
,您可以將會話獲取延遲到實際訂閱時。此外,您需要在完成後關閉會話,以免在伺服器上留下陳舊的會話。在 execute
上使用 doFinally
鉤子,在不再需要會話時呼叫 ClientSession#close()
。如果您希望對會話本身有更多控制,可以透過驅動程式獲取 ClientSession
並透過 Supplier
提供它。
ClientSession 的響應式用法僅限於 Template API 的使用。目前,響應式 Repository 沒有會話整合。 |
MongoDB 事務
除非您在應用程式上下文中指定了 MongoTransactionManager ,否則事務支援將**停用**。您可以使用 setSessionSynchronization(ALWAYS) 來參與正在進行的非原生 MongoDB 事務。 |
為了完全透過程式設計控制事務,您可能希望使用 MongoOperations
上的會話回撥。
以下示例展示了程式設計方式的事務控制
-
命令式
-
響應式
ClientSession session = client.startSession(options); (1)
template.withSession(session)
.execute(action -> {
session.startTransaction(); (2)
try {
Step step = // ...;
action.insert(step);
process(step);
action.update(Step.class).apply(Update.set("state", // ...
session.commitTransaction(); (3)
} catch (RuntimeException e) {
session.abortTransaction(); (4)
}
}, ClientSession::close) (5)
1 | 獲取一個新的 ClientSession 。 |
2 | 開始事務。 |
3 | 如果一切按預期進行,提交更改。 |
4 | 發生錯誤,因此回滾所有內容。 |
5 | 完成後不要忘記關閉會話。 |
上面的示例讓您可以完全控制事務行為,同時在回撥中使用會話範圍的 MongoOperations
例項,以確保將會話傳遞到每個伺服器呼叫。為了避免這種方法帶來的一些開銷,您可以使用 TransactionTemplate
來減少手動事務流程的繁瑣。
Mono<DeleteResult> result = Mono
.from(client.startSession()) (1)
.flatMap(session -> {
session.startTransaction(); (2)
return Mono.from(collection.deleteMany(session, ...)) (3)
.onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e))) (4)
.flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val))) (5)
.doFinally(signal -> session.close()); (6)
});
1 | 首先,我們顯然需要初始化會話。 |
2 | 一旦有了 ClientSession ,就開始事務。 |
3 | 透過將 ClientSession 傳遞給操作,在事務內進行操作。 |
4 | 如果操作異常完成,我們需要停止事務並保留錯誤。 |
5 | 或者,當然,在成功的情況下提交更改。同時保留操作結果。 |
6 | 最後,我們需要確保關閉會話。 |
上述操作的問題在於保留主流程的 DeleteResult
,而不是透過 commitTransaction()
或 abortTransaction()
釋出事務結果,這導致設定相當複雜。
除非您在應用程式上下文中指定了 ReactiveMongoTransactionManager ,否則事務支援將**停用**。您可以使用 setSessionSynchronization(ALWAYS) 來參與正在進行的非原生 MongoDB 事務。 |
使用 TransactionTemplate / TransactionalOperator 進行事務
Spring Data MongoDB 事務支援 TransactionTemplate
和 TransactionalOperator
。
TransactionTemplate
/ TransactionalOperator
進行事務-
命令式
-
響應式
template.setSessionSynchronization(ALWAYS); (1)
// ...
TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager); (2)
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) { (3)
Step step = // ...;
template.insert(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
}
});
1 | 在 Template API 配置期間啟用事務同步。 |
2 | 使用提供的 PlatformTransactionManager 建立 TransactionTemplate 。 |
3 | 在回撥中,ClientSession 和事務已被註冊。 |
在執行時更改 MongoTemplate 的狀態(如您可能認為在前面的列表項 1 中那樣)可能會導致執行緒和可見性問題。 |
template.setSessionSynchronization(ALWAYS); (1)
// ...
TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager,
new DefaultTransactionDefinition()); (2)
Step step = // ...;
template.insert(step);
Mono<Void> process(step)
.then(template.update(Step.class).apply(Update.set("state", …))
.as(rxtx::transactional) (3)
.then();
1 | 啟用事務同步以便參與事務。 |
2 | 使用提供的 ReactiveTransactionManager 建立 TransactionalOperator 。 |
3 | TransactionalOperator.transactional(…) 為所有上游操作提供事務管理。 |
使用 MongoTransactionManager 與 ReactiveMongoTransactionManager 進行事務
MongoTransactionManager
/ ReactiveMongoTransactionManager
是通往眾所周知的 Spring 事務支援的閘道器。它允許應用程式使用 Spring 的託管事務特性。MongoTransactionManager
將 ClientSession
繫結到執行緒,而 ReactiveMongoTransactionManager
為此使用 ReactorContext
。MongoTemplate
檢測到會話並根據與事務相關的資源進行操作。MongoTemplate
還可以參與其他正在進行的事務。以下示例展示瞭如何使用 MongoTransactionManager
建立和使用事務
MongoTransactionManager
/ ReactiveMongoTransactionManager
進行事務-
命令式
-
響應式
@Configuration
static class Config extends AbstractMongoClientConfiguration {
@Bean
MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) { (1)
return new MongoTransactionManager(dbFactory);
}
// ...
}
@Component
public class StateService {
@Transactional
void someBusinessFunction(Step step) { (2)
template.insert(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
};
});
1 | 在應用程式上下文中註冊 MongoTransactionManager 。 |
2 | 將方法標記為事務性的。 |
@Transactional(readOnly = true) 建議 MongoTransactionManager 也啟動一個事務,將 ClientSession 新增到傳出請求中。 |
@Configuration
public class Config extends AbstractReactiveMongoConfiguration {
@Bean
ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) { (1)
return new ReactiveMongoTransactionManager(factory);
}
// ...
}
@Service
public class StateService {
@Transactional
Mono<UpdateResult> someBusinessFunction(Step step) { (2)
return template.insert(step)
.then(process(step))
.then(template.update(Step.class).apply(Update.set("state", …));
};
});
1 | 在應用程式上下文中註冊 ReactiveMongoTransactionManager 。 |
2 | 將方法標記為事務性的。 |
@Transactional(readOnly = true) 建議 ReactiveMongoTransactionManager 也啟動一個事務,將 ClientSession 新增到傳出請求中。 |
控制 MongoDB 特定的事務選項
事務性服務方法可能需要特定的事務選項來執行事務。Spring Data MongoDB 的事務管理器支援評估事務標籤,例如 @Transactional(label = { "mongo:readConcern=available" })
。
預設情況下,使用 mongo:
字首的標籤名稱空間由預設配置的 MongoTransactionOptionsResolver
進行評估。事務標籤由 TransactionAttribute
提供,並可透過 TransactionTemplate
和 TransactionalOperator
進行程式設計方式的事務控制。由於其宣告性本質,@Transactional(label = …)
提供了一個很好的起點,也可以作為文件。
當前支援以下選項
- 最大提交時間
-
控制伺服器上 commitTransaction 操作的最大執行時間。值格式與
Duration.parse(…)
使用的 ISO-8601 持續時間格式相對應。用法:
mongo:maxCommitTime=PT1S
- 讀取關注
-
設定事務的讀取關注。
用法:
mongo:readConcern=LOCAL|MAJORITY|LINEARIZABLE|SNAPSHOT|AVAILABLE
- 讀取偏好
-
設定事務的讀取偏好。
用法:
mongo:readPreference=PRIMARY|SECONDARY|SECONDARY_PREFERRED|PRIMARY_PREFERRED|NEAREST
- 寫入關注
-
設定事務的寫入關注。
用法:
mongo:writeConcern=ACKNOWLEDGED|W1|W2|W3|UNACKNOWLEDGED|JOURNALED|MAJORITY
加入外部事務的巢狀事務不會影響初始事務選項,因為事務已經開始。事務選項僅在啟動新事務時應用。 |
事務中的特殊行為
在事務內部,MongoDB 伺服器的行為略有不同。
連線設定
MongoDB 驅動程式提供了一個專用的副本集名稱配置選項,將驅動程式轉為自動檢測模式。此選項有助於在事務期間識別主副本集節點和命令路由。
確保將 replicaSet 新增到 MongoDB URI 中。請參閱連線字串選項瞭解更多詳細資訊。 |
集合操作
MongoDB 不支援在事務中執行集合操作,例如建立集合。這也會影響首次使用時動態建立集合的操作。因此,請確保所有必需的結構都已到位。
瞬時錯誤
MongoDB 可以在事務操作期間引發的錯誤中新增特殊標籤。這些可能表明僅透過重試操作就會消失的瞬時故障。我們強烈推薦使用Spring Retry來處理這些情況。不過,可以重寫 MongoTransactionManager#doCommit(MongoTransactionObject)
以實現 MongoDB 參考手冊中概述的重試提交操作行為。
計數
MongoDB 的 count
操作基於集合統計資訊,這可能無法反映事務內的實際情況。在多文件事務中發出 count
命令時,伺服器會響應 錯誤 50851。一旦 MongoTemplate
檢測到活躍事務,所有公開的 count()
方法都會被轉換並委託給聚合框架,使用 $match
和 $count
運算子,同時保留 Query
設定,例如 collation
。
在聚合計數助手中使用地理命令時會有限制。以下運算子不能使用,必須替換為不同的運算子
-
$where
→$expr
-
$near
→$geoWithin
結合$center
-
$nearSphere
→$geoWithin
結合$centerSphere
使用 Criteria.near(…)
和 Criteria.nearSphere(…)
的查詢必須分別重寫為 Criteria.within(…)
和 Criteria.withinSphere(…)
。Repository 查詢方法中的 near
查詢關鍵詞也適用,必須更改為 within
。另請參閱 MongoDB JIRA 議題DRIVERS-518瞭解更多參考資訊。
以下程式碼片段展示了在會話繫結閉包內使用 count
session.startTransaction();
template.withSession(session)
.execute(action -> {
action.count(query(where("state").is("active")), Step.class)
...
上述程式碼片段會生成以下命令
db.collection.aggregate(
[
{ $match: { state: "active" } },
{ $count: "totalEntityCount" }
]
)
而不是
db.collection.find( { state: "active" } ).count()