定製 Kafka 繫結器健康指示器

覆蓋預設 Kafka Binder 健康指示器

當 Spring Boot Actuator 在類路徑上時,Kafka binder 會啟用一個預設的健康指示器。該健康指示器會檢查 binder 的健康狀況以及與 Kafka broker 的任何通訊問題。如果應用程式想要停用此預設健康檢查實現幷包含自定義實現,則可以為 KafkaBinderHealth 介面提供一個實現。KafkaBinderHealth 是一個從 HealthIndicator 擴充套件而來的標記介面。在自定義實現中,它必須提供 health() 方法的實現。自定義實現必須作為 bean 存在於應用程式配置中。當 binder 發現自定義實現時,它將使用自定義實現而不是預設實現。以下是應用程式中此類自定義實現 bean 的示例。

@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator() {
    return new KafkaBinderHealth() {
        @Override
        public Health health() {
            // custom implementation details.
        }
    };
}

自定義 Kafka Binder 健康指示器示例

以下是編寫自定義 Kafka binder HealthIndicator 的虛擬碼。在此示例中,我們嘗試透過首先檢查叢集連線性,然後檢查與主題相關的問題來覆蓋 binder 提供的 Kafka HealthIndicator。

首先,我們需要建立 KafkaBinderHealth 介面的自定義實現。

public class KafkaBinderHealthImplementation implements KafkaBinderHealth {
    @Value("${spring.cloud.bus.destination}")
    private String topic;
    private final AdminClient client;

    public KafkaBinderHealthImplementation(final KafkaAdmin admin) {
		// More about configuring Kafka
		// https://docs.springframework.tw/spring-kafka/reference/html/#configuring-topics
        this.client = AdminClient.create(admin.getConfigurationProperties());
    }

    @Override
    public Health health() {
        if (!checkBrokersConnection()) {
            logger.error("Error when connect brokers");
			return Health.down().withDetail("BrokersConnectionError", "Error message").build();
        }
		if (!checkTopicConnection()) {
			logger.error("Error when trying to connect with specific topic");
			return Health.down().withDetail("TopicError", "Error message with topic name").build();
		}
        return Health.up().build();
    }

    public boolean checkBrokersConnection() {
        // Your implementation
    }

    public boolean checkTopicConnection() {
		// Your implementation
    }
}

然後我們需要為自定義實現建立一個 bean。

@Configuration
public class KafkaBinderHealthIndicatorConfiguration {
	@Bean
	public KafkaBinderHealth kafkaBinderHealthIndicator(final KafkaAdmin admin) {
		return new KafkaBinderHealthImplementation(admin);
	}
}
© . This site is unofficial and not affiliated with VMware.