配置主題
如果您在應用上下文中定義一個 `KafkaAdmin` bean,它可以自動向 broker 新增主題。為此,您可以在應用上下文中為每個主題新增一個 `NewTopic` `@Bean`。版本 2.3 引入了一個新的類 `TopicBuilder`,使建立此類 bean 更加方便。以下示例展示瞭如何做到這一點
-
Java
-
Kotlin
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, List.of(0, 1))
.assignReplicas(1, List.of(1, 2))
.assignReplicas(2, List.of(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
fun admin() = KafkaAdmin(mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"))
@Bean
fun topic1() =
TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build()
@Bean
fun topic2() =
TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build()
@Bean
fun topic3() =
TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build()
從版本 2.6 開始,您可以省略 `partitions()` 和/或 `replicas()`,broker 的預設設定將應用於這些屬性。broker 版本必須至少為 2.4.0 才能支援此功能 - 請參閱 KIP-464。
-
Java
-
Kotlin
@Bean
public NewTopic topic4() {
return TopicBuilder.name("defaultBoth")
.build();
}
@Bean
public NewTopic topic5() {
return TopicBuilder.name("defaultPart")
.replicas(1)
.build();
}
@Bean
public NewTopic topic6() {
return TopicBuilder.name("defaultRepl")
.partitions(3)
.build();
}
@Bean
fun topic4() = TopicBuilder.name("defaultBoth").build()
@Bean
fun topic5() = TopicBuilder.name("defaultPart").replicas(1).build()
@Bean
fun topic6() = TopicBuilder.name("defaultRepl").partitions(3).build()
從版本 2.7 開始,您可以在單個 `KafkaAdmin.NewTopics` bean 定義中宣告多個 `NewTopic`。
-
Java
-
Kotlin
@Bean
public KafkaAdmin.NewTopics topics456() {
return new NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
@Bean
fun topics456() = KafkaAdmin.NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build()
)
使用 Spring Boot 時,`KafkaAdmin` bean 會自動註冊,因此您只需要 `NewTopic`(和/或 `NewTopics`)`@Bean`。 |
預設情況下,如果 broker 不可用,會記錄一條訊息,但上下文會繼續載入。您可以程式設計式呼叫 admin 的 `initialize()` 方法稍後重試。如果您希望此條件被視為致命錯誤,請將 admin 的 `fatalIfBrokerNotAvailable` 屬性設定為 `true`。此時上下文將無法初始化。
如果 broker 支援(1.0.0 或更高版本),admin 會在發現現有主題的分割槽少於 `NewTopic.numPartitions` 時增加分割槽數量。 |
從版本 2.7 開始,`KafkaAdmin` 提供了在執行時建立和檢查主題的方法。
-
createOrModifyTopics
-
describeTopics
對於更高階的功能,您可以直接使用 `AdminClient`。以下示例展示瞭如何做到這一點
@Autowired
private KafkaAdmin admin;
...
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
...
client.close();
從版本 2.9.10、3.0.9 開始,您可以提供一個 `Predicate<NewTopic>`,用於確定是否應考慮建立或修改特定的 `NewTopic` bean。例如,如果您有指向不同叢集的多個 `KafkaAdmin` 例項,並且希望選擇應由每個 admin 建立或修改的主題,這將非常有用。
admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));