事務繫結器

透過將 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix 設定為非空值(例如 tx-)來啟用事務。在處理器應用程式中使用時,消費者啟動事務;消費者執行緒上傳送的任何記錄都參與同一事務。當監聽器正常退出時,監聽器容器將把偏移量傳送到事務並提交。一個通用的生產者工廠用於使用 spring.cloud.stream.kafka.binder.transaction.producer.* 屬性配置的所有生產者繫結;單個繫結 Kafka 生產者屬性將被忽略。

由於重試將在原始事務中執行,而該事務可能會回滾,並且任何已釋出的記錄也將回滾,因此事務不支援正常的繫結器重試(和死信)。當啟用重試時(通用屬性 maxAttempts 大於零),重試屬性用於配置 DefaultAfterRollbackProcessor 以啟用容器級別的重試。同樣,此功能不是在事務內釋出死信記錄,而是移到監聽器容器,同樣透過在主事務回滾後執行的 DefaultAfterRollbackProcessor 實現。

如果您希望在源應用程式中使用事務,或者從某個任意執行緒進行僅生產者事務(例如 @Scheduled 方法),您必須獲取事務性生產者工廠的引用,並使用它定義一個 KafkaTransactionManager bean。

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
        @Value("${unique.tx.id.per.instance}") String txId) {

    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
    tm.setTransactionId(txId)
    return tm;
}

請注意,我們使用 BinderFactory 獲取繫結器的引用;如果只配置了一個繫結器,則在第一個引數中使用 null。如果配置了多個繫結器,請使用繫結器名稱來獲取引用。一旦我們獲得了繫結器的引用,我們就可以獲取 ProducerFactory 的引用並建立事務管理器。

然後,您將使用正常的 Spring 事務支援,例如 TransactionTemplate@Transactional,例如

public static class Sender {

    @Transactional
    public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
        stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
    }

}

如果您希望將僅生產者事務與來自其他事務管理器的事務同步,請使用 ChainedTransactionManager

如果您部署應用程式的多個例項,每個例項都需要唯一的 transactionIdPrefix

Kafka 事務中的異常重試行為

配置事務回滾重試行為

在 Kafka 事務中處理訊息時,您可以使用 defaultRetryable 屬性和 retryableExceptions 對映來配置在事務回滾後應重試哪些異常。

預設重試行為

DefaultAfterRollbackProcessor 確定哪些異常會在事務回滾後觸發重試。預設情況下,所有異常都將重試,但您可以修改此行為

spring:
 cloud:
   stream:
     kafka:
       bindings:
         <binding-name>:
           consumer:
             defaultRetryable: false  # Change default to NOT retry exceptions

defaultRetryable 設定為 false 時,DefaultAfterRollbackProcessor 將配置為 defaultFalse(true),這意味著除非明確配置為可重試,否則異常將不會重試。

異常特定配置

為了進行精細控制,您可以為單個異常型別指定重試行為

spring:
 cloud:
   stream:
     kafka:
       bindings:
         <binding-name>:
           consumer:
             retryableExceptions:
               java.lang.IllegalStateException: true    # Always retry this exception
               java.lang.IllegalArgumentException: false  # Never retry this exception

DefaultAfterRollbackProcessor 將對標記為 true 的異常使用 addRetryableExceptions(),對標記為 false 的異常使用 addNotRetryableExceptions()。這些異常特定配置優先於預設行為。

實現細節

  • 使用事務時,只有異常型別(Exception 的子類)可以在 retryableExceptions 中配置

  • 如果指定了非異常型別,將丟擲 IllegalArgumentException

  • DefaultAfterRollbackProcessor 僅在啟用事務並停用批處理模式時才配置

  • 此配置確保事務重試行為與非事務重試處理保持一致

© . This site is unofficial and not affiliated with VMware.