會話與事務

從版本 3.6 開始,MongoDB 支援會話的概念。使用會話可以啟用 MongoDB 的因果一致性模型,該模型保證操作按照尊重其因果關係順序執行。這些會話分為 ServerSession 例項和 ClientSession 例項。在本節中,當我們提到會話時,我們指的是 ClientSession

客戶端會話內的操作與會話外的操作並非隔離。

MongoOperationsReactiveMongoOperations 都提供了將 ClientSession 繫結到操作的閘道器方法。MongoCollectionMongoDatabase 使用實現 MongoDB 集合和資料庫介面的會話代理物件,因此您無需在每次呼叫時都新增會話。這意味著對 MongoCollection#find() 的潛在呼叫會被委託給 MongoCollection#find(ClientSession)

諸如 (Reactive)MongoOperations#getCollection 之類的方法會返回原生 MongoDB Java Driver 閘道器物件(例如 MongoCollection),這些物件本身提供了用於 ClientSession 的專用方法。這些方法**不**進行會話代理。當直接與 MongoCollectionMongoDatabase 互動而不是透過 MongoOperations 上的 #execute 回撥時,您應在需要的地方提供 ClientSession

ClientSession 支援

以下示例展示了會話的使用方法

  • 命令式

  • 響應式

ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
    .causallyConsistent(true)
    .build();

ClientSession session = client.startSession(sessionOptions); (1)

template.withSession(() -> session)
    .execute(action -> {

        Query query = query(where("name").is("Durzo Blint"));
        Person durzo = action.findOne(query, Person.class);  (2)

        Person azoth = new Person("Kylar Stern");
        azoth.setMaster(durzo);

        action.insert(azoth);                                (3)

        return azoth;
    });

session.close()                                              (4)
1 從伺服器獲取一個新會話。
2 像以前一樣使用 MongoOperation 方法。ClientSession 會自動應用。
3 務必關閉 ClientSession
4 關閉會話。
處理 DBRef 例項時,特別是延遲載入的例項,至關重要的是在所有資料載入之前**不要**關閉 ClientSession。否則,延遲載入會失敗。
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();

Publisher<ClientSession> session = client.startSession(sessionOptions); (1)

template.withSession(session)
.execute(action -> {

        Query query = query(where("name").is("Durzo Blint"));
        return action.findOne(query, Person.class)
            .flatMap(durzo -> {

                Person azoth = new Person("Kylar Stern");
                azoth.setMaster(durzo);

                return action.insert(azoth);                            (2)
            });
    }, ClientSession::close)                                            (3)
    .subscribe();                                                       (4)
1 獲取用於檢索新會話的 Publisher
2 像以前一樣使用 ReactiveMongoOperation 方法。ClientSession 會自動獲取和應用。
3 務必關閉 ClientSession
4 在您訂閱之前不會發生任何事情。有關詳細資訊,請參閱Project Reactor 參考指南

透過使用提供實際會話的 Publisher,您可以將會話獲取延遲到實際訂閱時。此外,您需要在完成後關閉會話,以免在伺服器上留下陳舊的會話。在 execute 上使用 doFinally 鉤子,在不再需要會話時呼叫 ClientSession#close()。如果您希望對會話本身有更多控制,可以透過驅動程式獲取 ClientSession 並透過 Supplier 提供它。

ClientSession 的響應式用法僅限於 Template API 的使用。目前,響應式 Repository 沒有會話整合。

MongoDB 事務

從版本 4 開始,MongoDB 支援事務。事務構建在會話之上,因此需要一個活躍的 ClientSession

除非您在應用程式上下文中指定了 MongoTransactionManager,否則事務支援將**停用**。您可以使用 setSessionSynchronization(ALWAYS) 來參與正在進行的非原生 MongoDB 事務。

為了完全透過程式設計控制事務,您可能希望使用 MongoOperations 上的會話回撥。

以下示例展示了程式設計方式的事務控制

程式設計方式的事務
  • 命令式

  • 響應式

ClientSession session = client.startSession(options);                   (1)

template.withSession(session)
    .execute(action -> {

        session.startTransaction();                                     (2)

        try {

            Step step = // ...;
            action.insert(step);

            process(step);

            action.update(Step.class).apply(Update.set("state", // ...

            session.commitTransaction();                                (3)

        } catch (RuntimeException e) {
            session.abortTransaction();                                 (4)
        }
    }, ClientSession::close)                                            (5)
1 獲取一個新的 ClientSession
2 開始事務。
3 如果一切按預期進行,提交更改。
4 發生錯誤,因此回滾所有內容。
5 完成後不要忘記關閉會話。

上面的示例讓您可以完全控制事務行為,同時在回撥中使用會話範圍的 MongoOperations 例項,以確保將會話傳遞到每個伺服器呼叫。為了避免這種方法帶來的一些開銷,您可以使用 TransactionTemplate 來減少手動事務流程的繁瑣。

Mono<DeleteResult> result = Mono
    .from(client.startSession())                                                             (1)

    .flatMap(session -> {
        session.startTransaction();                                                          (2)

        return Mono.from(collection.deleteMany(session, ...))                                (3)

            .onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e)))   (4)

            .flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val)))     (5)

            .doFinally(signal -> session.close());                                           (6)
      });
1 首先,我們顯然需要初始化會話。
2 一旦有了 ClientSession,就開始事務。
3 透過將 ClientSession 傳遞給操作,在事務內進行操作。
4 如果操作異常完成,我們需要停止事務並保留錯誤。
5 或者,當然,在成功的情況下提交更改。同時保留操作結果。
6 最後,我們需要確保關閉會話。

上述操作的問題在於保留主流程的 DeleteResult,而不是透過 commitTransaction()abortTransaction() 釋出事務結果,這導致設定相當複雜。

除非您在應用程式上下文中指定了 ReactiveMongoTransactionManager,否則事務支援將**停用**。您可以使用 setSessionSynchronization(ALWAYS) 來參與正在進行的非原生 MongoDB 事務。

使用 TransactionTemplate / TransactionalOperator 進行事務

Spring Data MongoDB 事務支援 TransactionTemplateTransactionalOperator

使用 TransactionTemplate / TransactionalOperator 進行事務
  • 命令式

  • 響應式

template.setSessionSynchronization(ALWAYS);                                     (1)

// ...

TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager);         (2)

txTemplate.execute(new TransactionCallbackWithoutResult() {

    @Override
    protected void doInTransactionWithoutResult(TransactionStatus status) {     (3)

        Step step = // ...;
        template.insert(step);

        process(step);

        template.update(Step.class).apply(Update.set("state", // ...
    }
});
1 在 Template API 配置期間啟用事務同步。
2 使用提供的 PlatformTransactionManager 建立 TransactionTemplate
3 在回撥中,ClientSession 和事務已被註冊。
在執行時更改 MongoTemplate 的狀態(如您可能認為在前面的列表項 1 中那樣)可能會導致執行緒和可見性問題。
template.setSessionSynchronization(ALWAYS);                                          (1)

// ...

TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager,
                                   new DefaultTransactionDefinition());              (2)


Step step = // ...;
template.insert(step);

Mono<Void> process(step)
    .then(template.update(Step.class).apply(Update.set("state", …))
    .as(rxtx::transactional)                                                         (3)
    .then();
1 啟用事務同步以便參與事務。
2 使用提供的 ReactiveTransactionManager 建立 TransactionalOperator
3 TransactionalOperator.transactional(…) 為所有上游操作提供事務管理。

使用 MongoTransactionManager 與 ReactiveMongoTransactionManager 進行事務

MongoTransactionManager / ReactiveMongoTransactionManager 是通往眾所周知的 Spring 事務支援的閘道器。它允許應用程式使用 Spring 的託管事務特性MongoTransactionManagerClientSession 繫結到執行緒,而 ReactiveMongoTransactionManager 為此使用 ReactorContextMongoTemplate 檢測到會話並根據與事務相關的資源進行操作。MongoTemplate 還可以參與其他正在進行的事務。以下示例展示瞭如何使用 MongoTransactionManager 建立和使用事務

使用 MongoTransactionManager / ReactiveMongoTransactionManager 進行事務
  • 命令式

  • 響應式

@Configuration
static class Config extends AbstractMongoClientConfiguration {

    @Bean
    MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) {  (1)
        return new MongoTransactionManager(dbFactory);
    }

    // ...
}

@Component
public class StateService {

    @Transactional
    void someBusinessFunction(Step step) {                                        (2)

        template.insert(step);

        process(step);

        template.update(Step.class).apply(Update.set("state", // ...
    };
});
1 在應用程式上下文中註冊 MongoTransactionManager
2 將方法標記為事務性的。
@Transactional(readOnly = true) 建議 MongoTransactionManager 也啟動一個事務,將 ClientSession 新增到傳出請求中。
@Configuration
public class Config extends AbstractReactiveMongoConfiguration {

    @Bean
    ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) {  (1)
        return new ReactiveMongoTransactionManager(factory);
    }

    // ...
}

@Service
public class StateService {

    @Transactional
    Mono<UpdateResult> someBusinessFunction(Step step) {                                  (2)

        return template.insert(step)
            .then(process(step))
            .then(template.update(Step.class).apply(Update.set("state", …));
    };
});
1 在應用程式上下文中註冊 ReactiveMongoTransactionManager
2 將方法標記為事務性的。
@Transactional(readOnly = true) 建議 ReactiveMongoTransactionManager 也啟動一個事務,將 ClientSession 新增到傳出請求中。

控制 MongoDB 特定的事務選項

事務性服務方法可能需要特定的事務選項來執行事務。Spring Data MongoDB 的事務管理器支援評估事務標籤,例如 @Transactional(label = { "mongo:readConcern=available" })

預設情況下,使用 mongo: 字首的標籤名稱空間由預設配置的 MongoTransactionOptionsResolver 進行評估。事務標籤由 TransactionAttribute 提供,並可透過 TransactionTemplateTransactionalOperator 進行程式設計方式的事務控制。由於其宣告性本質,@Transactional(label = …) 提供了一個很好的起點,也可以作為文件。

當前支援以下選項

最大提交時間

控制伺服器上 commitTransaction 操作的最大執行時間。值格式與 Duration.parse(…) 使用的 ISO-8601 持續時間格式相對應。

用法:mongo:maxCommitTime=PT1S

讀取關注

設定事務的讀取關注。

用法:mongo:readConcern=LOCAL|MAJORITY|LINEARIZABLE|SNAPSHOT|AVAILABLE

讀取偏好

設定事務的讀取偏好。

用法:mongo:readPreference=PRIMARY|SECONDARY|SECONDARY_PREFERRED|PRIMARY_PREFERRED|NEAREST

寫入關注

設定事務的寫入關注。

用法:mongo:writeConcern=ACKNOWLEDGED|W1|W2|W3|UNACKNOWLEDGED|JOURNALED|MAJORITY

加入外部事務的巢狀事務不會影響初始事務選項,因為事務已經開始。事務選項僅在啟動新事務時應用。

事務中的特殊行為

在事務內部,MongoDB 伺服器的行為略有不同。

連線設定

MongoDB 驅動程式提供了一個專用的副本集名稱配置選項,將驅動程式轉為自動檢測模式。此選項有助於在事務期間識別主副本集節點和命令路由。

確保將 replicaSet 新增到 MongoDB URI 中。請參閱連線字串選項瞭解更多詳細資訊。

集合操作

MongoDB 支援在事務中執行集合操作,例如建立集合。這也會影響首次使用時動態建立集合的操作。因此,請確保所有必需的結構都已到位。

瞬時錯誤

MongoDB 可以在事務操作期間引發的錯誤中新增特殊標籤。這些可能表明僅透過重試操作就會消失的瞬時故障。我們強烈推薦使用Spring Retry來處理這些情況。不過,可以重寫 MongoTransactionManager#doCommit(MongoTransactionObject) 以實現 MongoDB 參考手冊中概述的重試提交操作行為。

計數

MongoDB 的 count 操作基於集合統計資訊,這可能無法反映事務內的實際情況。在多文件事務中發出 count 命令時,伺服器會響應 錯誤 50851。一旦 MongoTemplate 檢測到活躍事務,所有公開的 count() 方法都會被轉換並委託給聚合框架,使用 $match$count 運算子,同時保留 Query 設定,例如 collation

在聚合計數助手中使用地理命令時會有限制。以下運算子不能使用,必須替換為不同的運算子

  • $where$expr

  • $near$geoWithin 結合 $center

  • $nearSphere$geoWithin 結合 $centerSphere

使用 Criteria.near(…)Criteria.nearSphere(…) 的查詢必須分別重寫為 Criteria.within(…)Criteria.withinSphere(…)。Repository 查詢方法中的 near 查詢關鍵詞也適用,必須更改為 within。另請參閱 MongoDB JIRA 議題DRIVERS-518瞭解更多參考資訊。

以下程式碼片段展示了在會話繫結閉包內使用 count

session.startTransaction();

template.withSession(session)
    .execute(action -> {
        action.count(query(where("state").is("active")), Step.class)
        ...

上述程式碼片段會生成以下命令

db.collection.aggregate(
   [
      { $match: { state: "active" } },
      { $count: "totalEntityCount" }
   ]
)

而不是

db.collection.find( { state: "active" } ).count()