事務

本節描述了 Spring for Apache Kafka 如何支援事務。

概覽

0.11.0.0 客戶端庫增加了對事務的支援。Spring for Apache Kafka 透過以下方式添加了支援

  • `KafkaTransactionManager`:與常規 Spring 事務支援(`@Transactional`、`TransactionTemplate` 等)一起使用

  • 事務型 `KafkaMessageListenerContainer`

  • 使用 `KafkaTemplate` 的本地事務

  • 與其他事務管理器進行事務同步

透過為 `DefaultKafkaProducerFactory` 提供 `transactionIdPrefix` 來啟用事務。在這種情況下,工廠不再管理一個共享的 `Producer`,而是維護一個事務型生產者快取。當用戶對生產者呼叫 `close()` 時,它會返回到快取中以便重用,而不是實際關閉。每個生產者的 `transactional.id` 屬性是 `transactionIdPrefix` + `n`,其中 `n` 從 `0` 開始,併為每個新生產者遞增。在之前版本的 Spring for Apache Kafka 中,對於由基於記錄的監聽器容器啟動的事務,`transactional.id` 的生成方式不同,以支援隔離殭屍,但從 3.0 開始,`EOSMode.V2` 是唯一的選項,因此不再需要這樣做。對於執行多個例項的應用,每個例項的 `transactionIdPrefix` 必須是唯一的。

另請參閱精確一次語義

另請參閱`transactionIdPrefix`

對於 Spring Boot,只需設定 `spring.kafka.producer.transaction-id-prefix` 屬性 - Spring Boot 將自動配置一個 `KafkaTransactionManager` bean 並將其注入到監聽器容器中。

從 2.5.8 版本開始,您現在可以在生產者工廠上配置 `maxAge` 屬性。這在使用事務型生產者時很有用,因為這些生產者可能會在 broker 的 `transactional.id.expiration.ms` 期間處於空閒狀態。對於當前的 `kafka-clients`,這可能導致 `ProducerFencedException` 而不發生 rebalance。透過將 `maxAge` 設定為小於 `transactional.id.expiration.ms`,如果生產者超過其最大年齡,工廠將重新整理生產者。

使用 `KafkaTransactionManager`

`KafkaTransactionManager` 是 Spring Framework 的 `PlatformTransactionManager` 的一個實現。它在其建構函式中提供了對生產者工廠的引用。如果您提供自定義的生產者工廠,它必須支援事務。請參閱 `ProducerFactory.transactionCapable()`。

您可以將 `KafkaTransactionManager` 與常規 Spring 事務支援(`@Transactional`、`TransactionTemplate` 等)一起使用。如果存在活動事務,在事務範圍內執行的任何 `KafkaTemplate` 操作都將使用該事務的 `Producer`。事務管理器會根據成功或失敗來提交或回滾事務。您必須配置 `KafkaTemplate` 使用與事務管理器相同的 `ProducerFactory`。

事務同步

本節指生產者專用事務(非監聽器容器啟動的事務);有關容器啟動事務時鏈式事務的資訊,請參閱使用消費者啟動的事務

如果您想將記錄傳送到 Kafka 並執行一些資料庫更新,您可以使用常規的 Spring 事務管理,例如使用 `DataSourceTransactionManager`。

@Transactional
public void process(List<Thing> things) {
    things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
    updateDb(things);
}

`@Transactional` 註解的攔截器會啟動事務,並且 `KafkaTemplate` 將與該事務管理器同步事務;每個傳送操作都將參與該事務。當方法退出時,資料庫事務將提交,隨後是 Kafka 事務。如果您希望以相反的順序(Kafka 優先)執行提交,請使用巢狀的 `@Transactional` 方法,其中外部方法配置為使用 `DataSourceTransactionManager`,內部方法配置為使用 `KafkaTransactionManager`。

有關在 Kafka 優先或資料庫優先配置中同步 JDBC 和 Kafka 事務的應用示例,請參閱使用其他事務管理器進行 Kafka 事務的示例

從 2.5.17、2.6.12、2.7.9 和 2.8.0 版本開始,如果同步事務(在主事務提交後)提交失敗,異常將被拋給呼叫者。在此之前,這種情況會被靜默忽略(在 debug 級別記錄日誌)。如果需要,應用程式應採取補救措施,以彌補已提交的主事務。

使用消費者啟動的事務

`ChainedKafkaTransactionManager` 從 2.7 版本開始已被棄用;有關更多資訊,請參閱其超類 `ChainedTransactionManager` 的 JavaDocs。相反,請在容器中使用 `KafkaTransactionManager` 來啟動 Kafka 事務,並用 `@Transactional` 註解監聽器方法來啟動其他事務。

有關鏈式處理 JDBC 和 Kafka 事務的應用示例,請參閱使用其他事務管理器進行 Kafka 事務的示例

非阻塞重試不能與容器事務結合使用。當監聽器程式碼丟擲異常時,容器事務提交成功,並將記錄傳送到可重試的 Topic。

`KafkaTemplate` 本地事務

您可以使用 `KafkaTemplate` 在本地事務中執行一系列操作。以下示例展示瞭如何實現這一點

boolean result = template.executeInTransaction(t -> {
    t.sendDefault("thing1", "thing2");
    t.sendDefault("cat", "hat");
    return true;
});

回撥中的引數是模板本身 (`this`)。如果回撥正常退出,事務將提交。如果丟擲異常,事務將回滾。

如果存在 `KafkaTransactionManager`(或同步的)事務正在處理中,則不會使用它。相反,將使用一個新的“巢狀”事務。

`TransactionIdPrefix`

對於 `EOSMode.V2`(也稱為 `BETA`),這是唯一支援的模式,不再需要使用相同的 `transactional.id`,即使對於消費者啟動的事務也是如此;實際上,它必須在每個例項上唯一,與生產者啟動的事務相同。此屬性在每個應用程式例項上必須具有不同的值。

`TransactionIdSuffix 固定`

從 3.2 版本開始,引入了一個新的 `TransactionIdSuffixStrategy` 介面來管理 `transactional.id` 字尾。預設實現是 `DefaultTransactionIdSuffixStrategy`,當設定的 `maxCache` 大於零時,可以在特定範圍內重用 `transactional.id`,否則字尾將透過遞增計數器動態生成。當請求事務生產者而所有 `transactional.id` 都被使用時,將丟擲 `NoProducerAvailableException`。使用者可以使用配置好的 `RetryTemplate` 來重試該異常,並配置適當的退避策略。

public static class Config {

    @Bean
    public ProducerFactory<String, String> myProducerFactory() {
        Map<String, Object> configs = producerConfigs();
        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
        ...
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
        ...
        TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
        pf.setTransactionIdSuffixStrategy(ss);
        return pf;
    }

}

當將 `maxCache` 設定為 5 時,`transactional.id` 為 `my.txid.`+`{0-4}`。

當使用 `KafkaTransactionManager` 和 `ConcurrentMessageListenerContainer` 並啟用 `maxCache` 時,需要將 `maxCache` 設定為一個大於或等於 `concurrency` 的值。如果 `MessageListenerContainer` 無法獲取 `transactional.id` 字尾,將丟擲 `NoProducerAvailableException`。在使用 `ConcurrentMessageListenerContainer` 中的巢狀事務時,需要調整 `maxCache` 設定以處理增加的巢狀事務數量。

`KafkaTemplate` 事務型和非事務型釋出

通常,當 `KafkaTemplate` 是事務型的(配置了支援事務的生產者工廠)時,需要事務。事務可以由 `TransactionTemplate`、`@Transactional` 方法、呼叫 `executeInTransaction` 或由配置了 `KafkaTransactionManager` 的監聽器容器啟動。任何在事務範圍之外使用模板的嘗試都會導致模板丟擲 `IllegalStateException`。從 2.4.3 版本開始,您可以將模板的 `allowNonTransactional` 屬性設定為 `true`。在這種情況下,模板將允許操作在沒有事務的情況下執行,透過呼叫 `ProducerFactory` 的 `createNonTransactionalProducer()` 方法;生產者將被快取或繫結到執行緒,以便像往常一樣重用。請參閱使用 DefaultKafkaProducerFactory

批處理監聽器與事務

在使用事務時,當監聽器失敗時,`AfterRollbackProcessor` 會在回滾發生後被呼叫以執行某些操作。當對記錄監聽器使用預設的 `AfterRollbackProcessor` 時,會執行 seek 操作,以便重新投遞失敗的記錄。但是,對於批處理監聽器,整個批次將被重新投遞,因為框架不知道批次中的哪個記錄失敗了。有關更多資訊,請參閱回滾後處理器

在使用批處理監聽器時,2.4.2 版本引入了一種處理批處理失敗的替代機制:`BatchToRecordAdapter`。當將 `batchListener` 設定為 true 的容器工廠配置了 `BatchToRecordAdapter` 時,監聽器將一次呼叫一個記錄。這可以在批處理中進行錯誤處理,同時仍然可以根據異常型別停止處理整個批次。提供了一個預設的 `BatchToRecordAdapter`,可以配置一個標準的 `ConsumerRecordRecoverer`,例如 `DeadLetterPublishingRecoverer`。以下測試用例配置片段說明了如何使用此功能

public static class TestListener {

    final List<String> values = new ArrayList<>();

    @KafkaListener(id = "batchRecordAdapter", topics = "test")
    public void listen(String data) {
        values.add(data);
        if ("bar".equals(data)) {
            throw new RuntimeException("reject partial");
        }
    }

}

@Configuration
@EnableKafka
public static class Config {

    ConsumerRecord<?, ?> failed;

    @Bean
    public TestListener test() {
        return new TestListener();
    }

    @Bean
    public ConsumerFactory<?, ?> consumerFactory() {
        return mock(ConsumerFactory.class);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) ->  {
            this.failed = record;
        }));
        return factory;
    }

}