事務

本節介紹 Spring for Apache Kafka 如何支援事務。

概述

0.11.0.0 客戶端庫增加了對事務的支援。Spring for Apache Kafka 以以下方式增加支援:

  • KafkaTransactionManager:與正常的 Spring 事務支援(@TransactionalTransactionTemplate 等)一起使用

  • 事務性 KafkaMessageListenerContainer

  • 使用 KafkaTemplate 的本地事務

  • 與其他事務管理器進行事務同步

透過為 DefaultKafkaProducerFactory 提供一個 transactionIdPrefix 來啟用事務。在這種情況下,工廠不是管理一個共享的 Producer,而是維護一個事務性生產者快取。當用戶對生產者呼叫 close() 時,它被返回到快取中以供重用,而不是實際關閉。每個生產者的 transactional.id 屬性是 transactionIdPrefix + n,其中 n0 開始,併為每個新生產者遞增。在 Spring for Apache Kafka 的早期版本中,由記錄監聽器的監聽器容器啟動的事務的 transactional.id 的生成方式不同,以支援圍欄殭屍,但這在 3.0 版本後不再需要,因為 EOSMode.V2 是唯一的選項。對於執行多個例項的應用程式,每個例項的 transactionIdPrefix 必須是唯一的。

另請參閱 精確一次語義

另請參閱 transactionIdPrefix

使用 Spring Boot 時,只需設定 spring.kafka.producer.transaction-id-prefix 屬性即可——Spring Boot 將自動配置一個 KafkaTransactionManager bean 並將其連線到監聽器容器中。

從版本 2.5.8 開始,您現在可以在生產者工廠上配置 maxAge 屬性。當使用事務性生產者時,這非常有用,因為它們可能會在代理的 transactional.id.expiration.ms 期間處於空閒狀態。在當前的 kafka-clients 中,這可能會導致 ProducerFencedException 而不會發生重新平衡。透過將 maxAge 設定為小於 transactional.id.expiration.ms,如果生產者超過其最大年齡,工廠將重新整理生產者。

使用 KafkaTransactionManager

KafkaTransactionManager 是 Spring Framework 的 PlatformTransactionManager 的一個實現。它在建構函式中被提供了一個生產者工廠的引用。如果您提供自定義生產者工廠,它必須支援事務。請參閱 ProducerFactory.transactionCapable()

您可以將 KafkaTransactionManager 與正常的 Spring 事務支援(@TransactionalTransactionTemplate 等)一起使用。如果事務處於活動狀態,則在事務範圍內執行的任何 KafkaTemplate 操作都將使用事務的 Producer。管理器根據成功或失敗提交或回滾事務。您必須將 KafkaTemplate 配置為與事務管理器使用相同的 ProducerFactory

事務同步

本節指的是僅生產者事務(非監聽器容器啟動的事務);有關容器啟動事務時鏈式事務的資訊,請參閱 使用消費者啟動的事務

如果您想向 Kafka 傳送記錄並執行一些資料庫更新,您可以使用正常的 Spring 事務管理,例如使用 DataSourceTransactionManager

@Transactional
public void process(List<Thing> things) {
    things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
    updateDb(things);
}

@Transactional 註解的攔截器啟動事務,KafkaTemplate 將與該事務管理器同步事務;每次傳送都將參與該事務。當方法退出時,資料庫事務將提交,隨後是 Kafka 事務。如果您希望以相反的順序執行提交(Kafka 優先),請使用巢狀的 @Transactional 方法,其中外部方法配置為使用 DataSourceTransactionManager,內部方法配置為使用 KafkaTransactionManager

有關在 Kafka 優先或 DB 優先配置中同步 JDBC 和 Kafka 事務的應用程式示例,請參閱 Kafka 事務與其他事務管理器的示例

從 2.5.17、2.6.12、2.7.9 和 2.8.0 版本開始,如果同步事務(在主事務提交後)的提交失敗,異常將拋給呼叫者。此前,此異常會被靜默忽略(在除錯級別記錄)。應用程式應在必要時採取補救措施,以彌補已提交的主事務。

使用消費者啟動的事務

ChainedKafkaTransactionManager 自 2.7 版本起已棄用;有關其父類 ChainedTransactionManager 的更多資訊,請參閱 JavaDocs。相反,請在容器中使用 KafkaTransactionManager 啟動 Kafka 事務,並使用 @Transactional 註解監聽器方法以啟動其他事務。

有關鏈式 JDBC 和 Kafka 事務的示例應用程式,請參閱 Kafka 事務與其他事務管理器的示例

非阻塞重試 不能與 容器事務 結合使用。當監聽器程式碼丟擲異常時,容器事務提交成功,並且記錄被髮送到可重試主題。

KafkaTemplate 本地事務

您可以使用 KafkaTemplate 在本地事務中執行一系列操作。以下示例展示瞭如何實現:

boolean result = template.executeInTransaction(t -> {
    t.sendDefault("thing1", "thing2");
    t.sendDefault("cat", "hat");
    return true;
});

回撥中的引數是模板本身(this)。如果回撥正常退出,則事務被提交。如果丟擲異常,則事務被回滾。

如果存在正在進行的 KafkaTransactionManager(或同步)事務,則不使用它。相反,將使用一個新的“巢狀”事務。

TransactionIdPrefix

EOSMode.V2(又名 BETA)中,唯一的受支援模式,不再需要使用相同的 transactional.id,即使對於消費者啟動的事務也是如此;事實上,它必須在每個例項上唯一,就像生產者啟動的事務一樣。此屬性在每個應用程式例項上必須具有不同的值。

TransactionIdSuffix Fixed

自 3.2 版本起,引入了一個新的 TransactionIdSuffixStrategy 介面來管理 transactional.id 字尾。預設實現是 DefaultTransactionIdSuffixStrategy,當 maxCache 設定為大於零時,可以在特定範圍內重用 transactional.id,否則字尾將透過遞增計數器即時生成。當請求事務生產者且所有 transactional.id 都已在使用時,丟擲 NoProducerAvailableException。使用者可以使用配置為重試該異常的 RetryTemplate,並配置適當的退避策略。

public static class Config {

    @Bean
    public ProducerFactory<String, String> myProducerFactory() {
        Map<String, Object> configs = producerConfigs();
        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
        ...
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
        ...
        TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
        pf.setTransactionIdSuffixStrategy(ss);
        return pf;
    }

}

maxCache 設定為 5 時,transactional.idmy.txid.+`{0-4}`。

當使用 KafkaTransactionManagerConcurrentMessageListenerContainer 並啟用 maxCache 時,需要將 maxCache 設定為大於或等於 concurrency 的值。如果 MessageListenerContainer 無法獲取 transactional.id 字尾,它將丟擲 NoProducerAvailableException。當在 ConcurrentMessageListenerContainer 中使用巢狀事務時,需要調整 maxCache 設定以處理增加的巢狀事務數量。

KafkaTemplate 事務性和非事務性發布

通常,當 KafkaTemplate 是事務性時(使用支援事務的生產者工廠配置),事務是必需的。事務可以由 TransactionTemplate@Transactional 方法、呼叫 executeInTransaction 或由監聽器容器(配置了 KafkaTransactionManager)啟動。任何在事務範圍之外使用模板的嘗試都會導致模板丟擲 IllegalStateException。從版本 2.4.3 開始,您可以將模板的 allowNonTransactional 屬性設定為 true。在這種情況下,模板將允許操作在沒有事務的情況下執行,透過呼叫 ProducerFactorycreateNonTransactionalProducer() 方法;生產者將被快取或執行緒繫結,以便正常重用。請參閱 使用 DefaultKafkaProducerFactory

批次監聽器事務

當監聽器在使用事務時失敗時,會在回滾發生後呼叫 AfterRollbackProcessor 以執行一些操作。當使用帶有記錄監聽器的預設 AfterRollbackProcessor 時,會執行查詢操作,以便重新傳遞失敗的記錄。但是,對於批次監聽器,整個批次將重新傳遞,因為框架不知道批次中的哪個記錄失敗了。有關更多資訊,請參閱 回滾後處理器

在使用批次監聽器時,版本 2.4.2 引入了一種處理批次處理失敗的替代機制:BatchToRecordAdapter。當配置了 BatchToRecordAdapterbatchListener 設定為 true 的容器工廠時,監聽器將一次處理一條記錄。這使得在批處理中處理錯誤成為可能,同時仍然可以根據異常型別停止處理整個批處理。提供了預設的 BatchToRecordAdapter,可以配置標準的 ConsumerRecordRecoverer,例如 DeadLetterPublishingRecoverer。以下測試用例配置片段說明了如何使用此功能:

public static class TestListener {

    final List<String> values = new ArrayList<>();

    @KafkaListener(id = "batchRecordAdapter", topics = "test")
    public void listen(String data) {
        values.add(data);
        if ("bar".equals(data)) {
            throw new RuntimeException("reject partial");
        }
    }

}

@Configuration
@EnableKafka
public static class Config {

    ConsumerRecord<?, ?> failed;

    @Bean
    public TestListener test() {
        return new TestListener();
    }

    @Bean
    public ConsumerFactory<?, ?> consumerFactory() {
        return mock(ConsumerFactory.class);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) ->  {
            this.failed = record;
        }));
        return factory;
    }

}
© . This site is unofficial and not affiliated with VMware.