Debezium 支援

Debezium Engine,變更資料捕獲 (CDC) 入站通道介面卡。DebeziumMessageProducer 允許捕獲資料庫變更事件,將它們轉換為訊息並隨後流式傳輸到出站通道。

你需要將 spring integration Debezium 依賴新增到你的專案中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-debezium</artifactId>
    <version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-debezium:6.4.4"

你還需要為你的輸入資料庫包含一個 Debezium 聯結器 依賴。例如,要在 PostgreSQL 中使用 Debezium,你需要 postgres Debezium 聯結器

  • Maven

  • Gradle

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-postgres</artifactId>
    <version>${debezium-version}</version>
</dependency>
compile "io.debezium:debezium-connector-postgres:{debezium-version}"

debezium-version 替換為你正在使用的 spring-integration-debezium 版本相容的版本。

Debezium 入站通道介面卡

Debezium 介面卡需要一個預先配置好的 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> 例項。

Debezium-supplier 提供了一個開箱即用的 DebeziumEngine.Builder Spring Boot 自動配置,以及方便的 DebeziumProperties 配置抽象。

Debezium Java DSL 可以從提供的 DebeziumEngine.Builder 以及純 Debezium 配置(例如 java.util.Properties)建立 DebeziumMessageProducer 例項。後者對於一些具有特定配置和序列化格式的常見用例非常方便。

此外,DebeziumMessageProducer 可以透過以下配置屬性進行調整

  • contentType - 允許處理 JSON(預設)、AVROPROTOBUF 訊息內容。此 contentType 必須 與提供的 DebeziumEngine.Builder 配置的 SerializationFormat 對齊。

  • enableBatch - 當設定為 false(預設)時,Debezium 介面卡會為從源資料庫接收到的每個 ChangeEvent 資料變更事件傳送新的 Message。如果設定為 true,則介面卡會為從 Debezium 引擎接收到的每批 ChangeEvent 傳送一個單獨的 Message。這種 payload 不可序列化,需要自定義的序列化/反序列化實現。

  • enableEmptyPayload - 啟用對墓碑訊息(即刪除訊息)的支援。在資料庫行刪除時,Debezium 可以傳送一個墓碑變更事件,該事件的 key 與被刪除行的 key 相同,值為 Optional.empty。預設為 false

  • headerMapper - 自定義 HeaderMapper 實現,用於選擇並轉換 ChangeEvent 頭到 Message 頭。預設的 DefaultDebeziumHeaderMapper 實現提供了 setHeaderNamesToMap 的 setter 方法。預設情況下,所有頭都會被對映。

  • taskExecutor - 為 Debezium 引擎設定自定義 TaskExecutor

以下程式碼片段演示了此通道介面卡的各種配置

使用 Java 配置

以下 Spring Boot 應用展示瞭如何使用 Java 配置配置入站介面卡

@SpringBootApplication
public class DebeziumJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(DebeziumJavaApplication.class)
                .web(WebApplicationType.NONE)
                .run(args);
    }

    @Bean
    public MessageChannel debeziumInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer debeziumMessageProducer(
            DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
            MessageChannel debeziumInputChannel) {

        DebeziumMessageProducer debeziumMessageProducer =
            new DebeziumMessageProducer(debeziumEngineBuilder);
        debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
        return debeziumMessageProducer;
    }

    @ServiceActivator(inputChannel = "debeziumInputChannel")
    public void handler(Message<?> message) {

        Object destination = message.getHeaders().get(DebeziumHeaders.DESTINATION); (1)

        String key = new String((byte[]) message.getHeaders().get(DebeziumHeaders.KEY)); (2)

        String payload = new String((byte[]) message.getPayload()); (3)

        System.out.println("KEY: " + key + ", DESTINATION: " + destination + ", PAYLOAD: " + payload);
    }

}
1 事件所針對的邏輯目標的名稱。通常,目標由 topic.prefix 配置選項、資料庫名稱和表名稱組成。例如:my-topic.inventory.orders
2 包含變更表的 key 的 schema 以及變更行的實際 key。key schema 及其對應的 key payload 都包含變更表中在聯結器建立事件時 PRIMARY KEY(或唯一約束)中每個列的欄位。
3 與 key 類似,payload 具有 schema 部分和 payload value 部分。schema 部分包含描述 payload value 部分的 Envelope 結構的 schema,包括其巢狀欄位。建立、更新或刪除資料的操作產生的變更事件都具有帶 Envelope 結構的 value payload。

key.converter.schemas.enable=false 和/或 value.converter.schemas.enable=false 允許分別停用 key 或 payload 的訊息內 schema 內容。

類似地,我們可以配置 DebeziumMessageProducer 以批處理方式處理傳入的變更事件

@Bean
public MessageProducer debeziumMessageProducer(
        DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
        MessageChannel debeziumInputChannel) {

    DebeziumMessageProducer debeziumMessageProducer = new DebeziumMessageProducer(debeziumEngineBuilder);
	debeziumMessageProducer.setEnableBatch(true);
    debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
    return debeziumMessageProducer;
}

@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(List<ChangeEvent<Object, Object>> payload) {
    System.out.println(payload);
}

Debezium Java DSL 支援

spring-integration-debezium 透過 Debezium 工廠和 DebeziumMessageProducerSpec 實現提供了方便的 Java DSL 流式 API。

Debezium Java DSL 的入站通道介面卡是

 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>>   debeziumEngineBuilder = ...
 IntegrationFlow.from(
    Debezium.inboundChannelAdapter(debeziumEngineBuilder)
        .headerNames("special*")
        .contentType("application/json")
        .enableBatch(false))
    .handle(m -> System.out.println(new String((byte[]) m.getPayload())))

或者從原生 Debezium 配置屬性建立一個 DebeziumMessageProducerSpec 例項,並預設使用 JSON 序列化格式。

 Properties debeziumConfig = ...
 IntegrationFlow
    .from(Debezium.inboundChannelAdapter(debeziumConfig))
    .handle(m -> System.out.println(new String((byte[]) m.getPayload())))

以下 Spring Boot 應用提供了一個使用 Java DSL 配置入站介面卡的示例

@SpringBootApplication
public class DebeziumJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(DebeziumJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow debeziumInbound(
        DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {

        return IntegrationFlow
                .from(Debezium
                        .inboundChannelAdapter(debeziumEngineBuilder)
					    .headerNames("special*")
					    .contentType("application/json")
					    .enableBatch(false))
                .handle(m -> System.out.println(new String((byte[]) m.getPayload())))
                .get();
    }

}