在 Spring 中管理的生產者攔截器

從版本 3.0.0 開始,對於生產者攔截器,你可以讓 Spring 直接將其作為 Bean 管理,而不是在 Apache Kafka 生產者配置中提供攔截器的類名。如果採用這種方法,則需要將此生產者攔截器設定到 KafkaTemplate 上。下面是使用與上面相同的 MyProducerInterceptor 的示例,但更改為不使用內部配置屬性。

public class MyProducerInterceptor implements ProducerInterceptor<String, String> {

    private final SomeBean bean;

    public MyProducerInterceptor(SomeBean bean) {
        this.bean = bean;
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        this.bean.someMethod("producer interceptor");
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    public void close() {
    }

}
@Bean
public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) {
  return new MyProducerInterceptor(someBean);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf, MyProducerInterceptor myProducerInterceptor) {
   KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(pf);
   kafkaTemplate.setProducerInterceptor(myProducerInterceptor);
}

在記錄傳送之前,會呼叫生產者攔截器的 onSend 方法。一旦伺服器傳送了資料釋出的確認,就會呼叫 onAcknowledgement 方法。onAcknowledgement 在生產者呼叫任何使用者回撥之前被呼叫。

如果你有多個這樣的生產者攔截器透過 Spring 管理,並且需要應用到 KafkaTemplate 上,則需要使用 CompositeProducerInterceptor 代替。CompositeProducerInterceptor 允許按順序新增單個生產者攔截器。底層 ProducerInterceptor 實現中的方法會按照新增到 CompositeProducerInterceptor 的順序被呼叫。