事務
本節描述了 Spring for Apache Kafka 如何支援事務。
概覽
0.11.0.0 客戶端庫增加了對事務的支援。Spring for Apache Kafka 透過以下方式添加了支援
-
`KafkaTransactionManager`:與常規 Spring 事務支援(`@Transactional`、`TransactionTemplate` 等)一起使用
-
事務型 `KafkaMessageListenerContainer`
-
使用 `KafkaTemplate` 的本地事務
-
與其他事務管理器進行事務同步
透過為 `DefaultKafkaProducerFactory` 提供 `transactionIdPrefix` 來啟用事務。在這種情況下,工廠不再管理一個共享的 `Producer`,而是維護一個事務型生產者快取。當用戶對生產者呼叫 `close()` 時,它會返回到快取中以便重用,而不是實際關閉。每個生產者的 `transactional.id` 屬性是 `transactionIdPrefix` + `n`,其中 `n` 從 `0` 開始,併為每個新生產者遞增。在之前版本的 Spring for Apache Kafka 中,對於由基於記錄的監聽器容器啟動的事務,`transactional.id` 的生成方式不同,以支援隔離殭屍,但從 3.0 開始,`EOSMode.V2` 是唯一的選項,因此不再需要這樣做。對於執行多個例項的應用,每個例項的 `transactionIdPrefix` 必須是唯一的。
另請參閱精確一次語義。
對於 Spring Boot,只需設定 `spring.kafka.producer.transaction-id-prefix` 屬性 - Spring Boot 將自動配置一個 `KafkaTransactionManager` bean 並將其注入到監聽器容器中。
從 2.5.8 版本開始,您現在可以在生產者工廠上配置 `maxAge` 屬性。這在使用事務型生產者時很有用,因為這些生產者可能會在 broker 的 `transactional.id.expiration.ms` 期間處於空閒狀態。對於當前的 `kafka-clients`,這可能導致 `ProducerFencedException` 而不發生 rebalance。透過將 `maxAge` 設定為小於 `transactional.id.expiration.ms`,如果生產者超過其最大年齡,工廠將重新整理生產者。 |
使用 `KafkaTransactionManager`
`KafkaTransactionManager` 是 Spring Framework 的 `PlatformTransactionManager` 的一個實現。它在其建構函式中提供了對生產者工廠的引用。如果您提供自定義的生產者工廠,它必須支援事務。請參閱 `ProducerFactory.transactionCapable()`。
您可以將 `KafkaTransactionManager` 與常規 Spring 事務支援(`@Transactional`、`TransactionTemplate` 等)一起使用。如果存在活動事務,在事務範圍內執行的任何 `KafkaTemplate` 操作都將使用該事務的 `Producer`。事務管理器會根據成功或失敗來提交或回滾事務。您必須配置 `KafkaTemplate` 使用與事務管理器相同的 `ProducerFactory`。
事務同步
本節指生產者專用事務(非監聽器容器啟動的事務);有關容器啟動事務時鏈式事務的資訊,請參閱使用消費者啟動的事務。
如果您想將記錄傳送到 Kafka 並執行一些資料庫更新,您可以使用常規的 Spring 事務管理,例如使用 `DataSourceTransactionManager`。
@Transactional
public void process(List<Thing> things) {
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
updateDb(things);
}
`@Transactional` 註解的攔截器會啟動事務,並且 `KafkaTemplate` 將與該事務管理器同步事務;每個傳送操作都將參與該事務。當方法退出時,資料庫事務將提交,隨後是 Kafka 事務。如果您希望以相反的順序(Kafka 優先)執行提交,請使用巢狀的 `@Transactional` 方法,其中外部方法配置為使用 `DataSourceTransactionManager`,內部方法配置為使用 `KafkaTransactionManager`。
有關在 Kafka 優先或資料庫優先配置中同步 JDBC 和 Kafka 事務的應用示例,請參閱使用其他事務管理器進行 Kafka 事務的示例。
從 2.5.17、2.6.12、2.7.9 和 2.8.0 版本開始,如果同步事務(在主事務提交後)提交失敗,異常將被拋給呼叫者。在此之前,這種情況會被靜默忽略(在 debug 級別記錄日誌)。如果需要,應用程式應採取補救措施,以彌補已提交的主事務。 |
使用消費者啟動的事務
`ChainedKafkaTransactionManager` 從 2.7 版本開始已被棄用;有關更多資訊,請參閱其超類 `ChainedTransactionManager` 的 JavaDocs。相反,請在容器中使用 `KafkaTransactionManager` 來啟動 Kafka 事務,並用 `@Transactional` 註解監聽器方法來啟動其他事務。
有關鏈式處理 JDBC 和 Kafka 事務的應用示例,請參閱使用其他事務管理器進行 Kafka 事務的示例。
`KafkaTemplate` 本地事務
您可以使用 `KafkaTemplate` 在本地事務中執行一系列操作。以下示例展示瞭如何實現這一點
boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});
回撥中的引數是模板本身 (`this`)。如果回撥正常退出,事務將提交。如果丟擲異常,事務將回滾。
如果存在 `KafkaTransactionManager`(或同步的)事務正在處理中,則不會使用它。相反,將使用一個新的“巢狀”事務。 |
`TransactionIdPrefix`
對於 `EOSMode.V2`(也稱為 `BETA`),這是唯一支援的模式,不再需要使用相同的 `transactional.id`,即使對於消費者啟動的事務也是如此;實際上,它必須在每個例項上唯一,與生產者啟動的事務相同。此屬性在每個應用程式例項上必須具有不同的值。
`TransactionIdSuffix 固定`
從 3.2 版本開始,引入了一個新的 `TransactionIdSuffixStrategy` 介面來管理 `transactional.id` 字尾。預設實現是 `DefaultTransactionIdSuffixStrategy`,當設定的 `maxCache` 大於零時,可以在特定範圍內重用 `transactional.id`,否則字尾將透過遞增計數器動態生成。當請求事務生產者而所有 `transactional.id` 都被使用時,將丟擲 `NoProducerAvailableException`。使用者可以使用配置好的 `RetryTemplate` 來重試該異常,並配置適當的退避策略。
public static class Config {
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
pf.setTransactionIdSuffixStrategy(ss);
return pf;
}
}
當將 `maxCache` 設定為 5 時,`transactional.id` 為 `my.txid.`+`{0-4}`。
當使用 `KafkaTransactionManager` 和 `ConcurrentMessageListenerContainer` 並啟用 `maxCache` 時,需要將 `maxCache` 設定為一個大於或等於 `concurrency` 的值。如果 `MessageListenerContainer` 無法獲取 `transactional.id` 字尾,將丟擲 `NoProducerAvailableException`。在使用 `ConcurrentMessageListenerContainer` 中的巢狀事務時,需要調整 `maxCache` 設定以處理增加的巢狀事務數量。 |
`KafkaTemplate` 事務型和非事務型釋出
通常,當 `KafkaTemplate` 是事務型的(配置了支援事務的生產者工廠)時,需要事務。事務可以由 `TransactionTemplate`、`@Transactional` 方法、呼叫 `executeInTransaction` 或由配置了 `KafkaTransactionManager` 的監聽器容器啟動。任何在事務範圍之外使用模板的嘗試都會導致模板丟擲 `IllegalStateException`。從 2.4.3 版本開始,您可以將模板的 `allowNonTransactional` 屬性設定為 `true`。在這種情況下,模板將允許操作在沒有事務的情況下執行,透過呼叫 `ProducerFactory` 的 `createNonTransactionalProducer()` 方法;生產者將被快取或繫結到執行緒,以便像往常一樣重用。請參閱使用 DefaultKafkaProducerFactory
。
批處理監聽器與事務
在使用事務時,當監聽器失敗時,`AfterRollbackProcessor` 會在回滾發生後被呼叫以執行某些操作。當對記錄監聽器使用預設的 `AfterRollbackProcessor` 時,會執行 seek 操作,以便重新投遞失敗的記錄。但是,對於批處理監聽器,整個批次將被重新投遞,因為框架不知道批次中的哪個記錄失敗了。有關更多資訊,請參閱回滾後處理器。
在使用批處理監聽器時,2.4.2 版本引入了一種處理批處理失敗的替代機制:`BatchToRecordAdapter`。當將 `batchListener` 設定為 true 的容器工廠配置了 `BatchToRecordAdapter` 時,監聽器將一次呼叫一個記錄。這可以在批處理中進行錯誤處理,同時仍然可以根據異常型別停止處理整個批次。提供了一個預設的 `BatchToRecordAdapter`,可以配置一個標準的 `ConsumerRecordRecoverer`,例如 `DeadLetterPublishingRecoverer`。以下測試用例配置片段說明了如何使用此功能
public static class TestListener {
final List<String> values = new ArrayList<>();
@KafkaListener(id = "batchRecordAdapter", topics = "test")
public void listen(String data) {
values.add(data);
if ("bar".equals(data)) {
throw new RuntimeException("reject partial");
}
}
}
@Configuration
@EnableKafka
public static class Config {
ConsumerRecord<?, ?> failed;
@Bean
public TestListener test() {
return new TestListener();
}
@Bean
public ConsumerFactory<?, ?> consumerFactory() {
return mock(ConsumerFactory.class);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) -> {
this.failed = record;
}));
return factory;
}
}