Apache Kafka 支援

透過提供 spring-kafka 專案的自動配置來支援 Apache Kafka

Kafka 配置由 spring.kafka.* 中的外部配置屬性控制。例如,你可以在 application.properties 中宣告以下部分

  • 屬性

  • YAML

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"
要在啟動時建立主題,請新增型別為 NewTopic 的 bean。如果主題已存在,則忽略該 bean。

有關更多支援的選項,請參閱 KafkaProperties

傳送訊息

Spring 的 KafkaTemplate 已自動配置,你可以直接在自己的 bean 中自動裝配它,如以下示例所示

  • Java

  • Kotlin

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final KafkaTemplate<String, String> kafkaTemplate;

	public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
		this.kafkaTemplate = kafkaTemplate;
	}

	// ...

	public void someMethod() {
		this.kafkaTemplate.send("someTopic", "Hello");
	}

}
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {

	// ...

	fun someMethod() {
		kafkaTemplate.send("someTopic", "Hello")
	}

}
如果定義了屬性 spring.kafka.producer.transaction-id-prefix,則會自動配置一個 KafkaTransactionManager。此外,如果定義了 RecordMessageConverter bean,它會自動與自動配置的 KafkaTemplate 關聯。

接收訊息

當存在 Apache Kafka 基礎結構時,任何 bean 都可以使用 @KafkaListener 進行註釋,以建立偵聽器端點。如果未定義 KafkaListenerContainerFactory,則會使用 spring.kafka.listener.* 中定義的鍵自動配置一個預設的。

以下元件在 someTopic 主題上建立偵聽器端點

  • Java

  • Kotlin

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@KafkaListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@KafkaListener(topics = ["someTopic"])
	fun processMessage(content: String?) {
		// ...
	}

}

如果定義了 KafkaTransactionManager bean,它會自動與容器工廠關聯。同樣,如果定義了 RecordFilterStrategyCommonErrorHandlerAfterRollbackProcessorConsumerAwareRebalanceListener bean,它會自動與預設工廠關聯。

根據偵聽器型別,RecordMessageConverterBatchMessageConverter bean 與預設工廠關聯。如果批處理偵聽器只存在 RecordMessageConverter bean,它將包裝在 BatchMessageConverter 中。

自定義的 ChainedKafkaTransactionManager 必須標記為 @Primary,因為它通常引用自動配置的 KafkaTransactionManager bean。

Kafka Streams

Spring for Apache Kafka 提供了一個工廠 bean 來建立 StreamsBuilder 物件並管理其流的生命週期。只要 kafka-streams 位於類路徑上並且 Kafka Streams 透過 @EnableKafkaStreams 註解啟用,Spring Boot 就會自動配置所需的 KafkaStreamsConfiguration bean。

啟用 Kafka Streams 意味著必須設定應用程式 ID 和引導伺服器。前者可以使用 spring.kafka.streams.application-id 進行配置,如果未設定,則預設為 spring.application.name。後者可以全域性設定或專門為流覆蓋。

可以使用專用屬性設定幾個附加屬性;其他任意 Kafka 屬性可以使用 spring.kafka.streams.properties 名稱空間設定。有關更多資訊,另請參閱 附加 Kafka 屬性

要使用工廠 bean,請將 StreamsBuilder 裝配到你的 @Bean 中,如以下示例所示

  • Java

  • Kotlin

import java.util.Locale;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {

	@Bean
	public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
		KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
		stream.map(this::uppercaseValue)
			.to("ks1Out",
					Produced.with(Serdes.Integer(), new org.springframework.kafka.support.serializer.JsonSerde<>()));
		return stream;
	}

	private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
		return new KeyValue<>(key, value.toUpperCase(Locale.getDefault()));
	}

}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Produced
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafkaStreams

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {

	@Bean
	fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
		val stream = streamsBuilder.stream<Int, String>("ks1In")
		stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(),
			org.springframework.kafka.support.serializer.JsonSerde()))
		return stream
	}

	private fun uppercaseValue(key: Int, value: String): KeyValue<Int, String> {
		return KeyValue(key, value.uppercase())
	}

}

預設情況下,由 StreamsBuilder 物件管理的流會自動啟動。你可以使用 spring.kafka.streams.auto-startup 屬性自定義此行為。

你還可以註冊任意數量的實現 StreamsBuilderFactoryBeanConfigurer 的 bean,以進行更高階的自定義。

附加 Kafka 屬性

自動配置支援的屬性在附錄的 整合屬性 部分中顯示。請注意,在大多數情況下,這些屬性(帶連字元或駝峰式命名)直接對映到 Apache Kafka 的點分屬性。有關詳細資訊,請參閱 Apache Kafka 文件。

名稱中不包含客戶端型別(producerconsumeradminstreams)的屬性被視為通用屬性,並適用於所有客戶端。如果需要,這些通用屬性中的大多數可以被一個或多個客戶端型別覆蓋。

Apache Kafka 將屬性指定為 HIGH、MEDIUM 或 LOW 重要性。Spring Boot 自動配置支援所有 HIGH 重要性屬性、一些選定的 MEDIUM 和 LOW 屬性,以及任何沒有預設值的屬性。

Kafka 支援的屬性中只有一部分可以透過 KafkaProperties 類直接使用。如果你希望使用不直接支援的附加屬性來配置各個客戶端型別,請使用以下屬性

  • 屬性

  • YAML

spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
spring:
  kafka:
    properties:
      "[prop.one]": "first"
    admin:
      properties:
        "[prop.two]": "second"
    consumer:
      properties:
        "[prop.three]": "third"
    producer:
      properties:
        "[prop.four]": "fourth"
    streams:
      properties:
        "[prop.five]": "fifth"

這會將公共 prop.one Kafka 屬性設定為 first(適用於生產者、消費者、管理員和流),將 prop.two 管理員屬性設定為 second,將 prop.three 消費者屬性設定為 third,將 prop.four 生產者屬性設定為 fourth,將 prop.five 流屬性設定為 fifth

你還可以按如下方式配置 Spring Kafka JsonDeserializer

  • 屬性

  • YAML

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
spring:
  kafka:
    consumer:
      value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      properties:
        "[spring.json.value.default.type]": "com.example.Invoice"
        "[spring.json.trusted.packages]": "com.example.main,com.example.another"

同樣,你可以停用 JsonSerializer 在訊息頭中傳送型別資訊的預設行為

  • 屬性

  • YAML

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
spring:
  kafka:
    producer:
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
      properties:
        "[spring.json.add.type.headers]": false
以這種方式設定的屬性將覆蓋 Spring Boot 明確支援的任何配置項。

使用嵌入式 Kafka 進行測試

Spring for Apache Kafka 提供了一種方便的方法,可以使用嵌入式 Apache Kafka 代理測試專案。要使用此功能,請使用來自 spring-kafka-test 模組的 @EmbeddedKafka 註解測試類。有關更多資訊,請參閱 Spring for Apache Kafka 參考手冊

為了使 Spring Boot 自動配置與上述嵌入式 Apache Kafka 代理一起工作,你需要將嵌入式代理地址(由 EmbeddedKafkaBroker 填充)的系統屬性重新對映到 Apache Kafka 的 Spring Boot 配置屬性中。有幾種方法可以做到這一點

  • 在測試類中提供一個系統屬性,將嵌入式代理地址對映到 spring.kafka.bootstrap-servers

  • Java

  • Kotlin

	static {
		System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
	}
	init {
		System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
	}
  • Java

  • Kotlin

import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

	// ...

}
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.test.context.EmbeddedKafka

@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

	// ...

}
  • 在配置屬性中使用佔位符

  • 屬性

  • YAML

spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring:
  kafka:
    bootstrap-servers: "${spring.embedded.kafka.brokers}"
© . This site is unofficial and not affiliated with VMware.