配置主題
如果您在應用程式上下文中定義了一個 KafkaAdmin bean,它可以自動將主題新增到代理。為此,您可以為每個主題嚮應用程式上下文新增一個 NewTopic @Bean。版本 2.3 引入了一個新類 TopicBuilder,以使此類 bean 的建立更加方便。以下示例展示瞭如何實現:
-
Java
-
Kotlin
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, List.of(0, 1))
.assignReplicas(1, List.of(1, 2))
.assignReplicas(2, List.of(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
fun admin() = KafkaAdmin(mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"))
@Bean
fun topic1() =
TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build()
@Bean
fun topic2() =
TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build()
@Bean
fun topic3() =
TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build()
從版本 2.6 開始,您可以省略 partitions() 和/或 replicas(),代理預設值將應用於這些屬性。代理版本必須至少為 2.4.0 才能支援此功能 - 請參閱 KIP-464。
-
Java
-
Kotlin
@Bean
public NewTopic topic4() {
return TopicBuilder.name("defaultBoth")
.build();
}
@Bean
public NewTopic topic5() {
return TopicBuilder.name("defaultPart")
.replicas(1)
.build();
}
@Bean
public NewTopic topic6() {
return TopicBuilder.name("defaultRepl")
.partitions(3)
.build();
}
@Bean
fun topic4() = TopicBuilder.name("defaultBoth").build()
@Bean
fun topic5() = TopicBuilder.name("defaultPart").replicas(1).build()
@Bean
fun topic6() = TopicBuilder.name("defaultRepl").partitions(3).build()
從版本 2.7 開始,您可以在單個 KafkaAdmin.NewTopics bean 定義中宣告多個 NewTopic。
-
Java
-
Kotlin
@Bean
public KafkaAdmin.NewTopics topics456() {
return new NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
@Bean
fun topics456() = KafkaAdmin.NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build()
)
使用 Spring Boot 時,KafkaAdmin bean 會自動註冊,因此您只需 NewTopic(和/或 NewTopics)@Bean。 |
預設情況下,如果代理不可用,會記錄一條訊息,但上下文會繼續載入。您可以程式設計式地呼叫 admin 的 initialize() 方法,稍後再試。如果您希望此條件被視為致命,請將 admin 的 fatalIfBrokerNotAvailable 屬性設定為 true。然後上下文將無法初始化。
如果代理支援(1.0.0 或更高版本),如果發現現有主題的分割槽少於 NewTopic.numPartitions,admin 會增加分割槽數量。 |
從版本 2.7 開始,KafkaAdmin 提供了在執行時建立和檢查主題的方法。從版本 4.0 開始,它還提供了刪除主題的方法。
-
createOrModifyTopics -
describeTopics -
deleteTopics(自 4.0 起)
對於更高階的管理功能,您可以直接使用 AdminClient。以下示例展示瞭如何實現:
@Autowired
private KafkaAdmin admin;
...
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
...
client.close();
從版本 2.9.10、3.0.9 開始,您可以提供一個 Predicate<NewTopic>,用於確定是否應考慮建立或修改特定的 NewTopic bean。這很有用,例如,如果您有多個 KafkaAdmin 例項指向不同的叢集,並且您希望選擇應由每個 admin 建立或修改的主題。
admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));