事務性繫結器
透過將 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
設定為非空值(例如 tx-
)來啟用事務。在處理器應用中使用時,消費者會啟動事務;消費者執行緒上傳送的任何記錄都參與同一事務。當監聽器正常退出時,監聽器容器會將偏移量傳送到事務並提交。所有使用 spring.cloud.stream.kafka.binder.transaction.producer.*
屬性配置的生產者繫結都使用一個通用的生產者工廠;單獨的繫結 Kafka 生產者屬性將被忽略。
正常的繫結器重試(和死信處理)不支援事務,因為重試將在原始事務中執行,該事務可能會回滾,並且任何已釋出的記錄也會回滾。當啟用重試時(公共屬性 maxAttempts 大於零),重試屬性用於配置 DefaultAfterRollbackProcessor ,以在容器級別啟用重試。類似地,死信記錄釋出功能不再在事務內執行,而是移至監聽器容器,同樣透過在主事務回滾後執行的 DefaultAfterRollbackProcessor 實現。 |
如果您希望在源應用中使用事務,或者從任意執行緒執行僅生產者事務(例如 @Scheduled
方法),您必須獲取事務性生產者工廠的引用,並使用它定義一個 KafkaTransactionManager
bean。
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
@Value("${unique.tx.id.per.instance}") String txId) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionId(txId)
return tm;
}
請注意,我們使用 BinderFactory
獲取繫結器的引用;當只配置一個繫結器時,第一個引數使用 null
。如果配置了多個繫結器,請使用繫結器名稱獲取引用。獲得繫結器的引用後,我們可以獲取 ProducerFactory
的引用並建立一個事務管理器。
然後您可以使用正常的 Spring 事務支援,例如 TransactionTemplate
或 @Transactional
,例如
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
如果您希望將僅生產者事務與其他事務管理器中的事務同步,請使用 ChainedTransactionManager
。
如果您部署應用的多個例項,每個例項需要一個唯一的 transactionIdPrefix 。 |
Kafka 事務中的異常重試行為
預設重試行為
DefaultAfterRollbackProcessor
決定哪些異常會在事務回滾後觸發重試。預設情況下,所有異常都會重試,但您可以修改此行為
spring:
cloud:
stream:
kafka:
bindings:
<binding-name>:
consumer:
defaultRetryable: false # Change default to NOT retry exceptions
當 defaultRetryable
設定為 false
時,DefaultAfterRollbackProcessor
將配置為 defaultFalse(true)
,這意味著除非顯式配置為可重試,否則異常將不會重試。
異常特定配置
為了進行精細控制,您可以為單個異常型別指定重試行為
spring:
cloud:
stream:
kafka:
bindings:
<binding-name>:
consumer:
retryableExceptions:
java.lang.IllegalStateException: true # Always retry this exception
java.lang.IllegalArgumentException: false # Never retry this exception
DefaultAfterRollbackProcessor
將對標記為 true
的異常使用 addRetryableExceptions()
,對標記為 false
的異常使用 addNotRetryableExceptions()
。這些異常特定配置優先於預設行為。