程式設計模型

使用 Kafka Streams 繫結器提供的程式設計模型時,可以使用高階的 Streams DSL,也可以混合使用高階和低階 Processor-API。當混合使用高階和低階 API 時,通常透過在 KStream 上呼叫 transformprocess API 方法來實現。

函式式風格

從 Spring Cloud Stream 3.0.0 開始,Kafka Streams 繫結器允許應用程式使用 Java 8 中提供的函數語言程式設計風格進行設計和開發。這意味著應用程式可以簡潔地表示為 java.util.function.Functionjava.util.function.Consumer 型別的 lambda 表示式。

讓我們看一個非常基本的例子。

@SpringBootApplication
public class SimpleConsumerApplication {

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input ->
                input.foreach((key, value) -> {
                    System.out.println("Key: " + key + " Value: " + value);
                });
    }
}

儘管簡單,但這是一個完整的獨立 Spring Boot 應用程式,它利用 Kafka Streams 進行流處理。這是一個消費者應用程式,沒有出站繫結,只有一個入站繫結。該應用程式消費資料,並簡單地將 KStream 的鍵和值資訊記錄到標準輸出。該應用程式包含 SpringBootApplication 註解和一個標記為 Bean 的方法。該 Bean 方法的型別是 java.util.function.Consumer,並使用 KStream 進行引數化。然後在實現中,我們返回一個 Consumer 物件,它本質上是一個 lambda 表示式。在 lambda 表示式內部,提供了處理資料的程式碼。

在此應用程式中,有一個型別為 KStream 的單一輸入繫結。繫結器為應用程式建立此繫結,名稱為 process-in-0,即函式 Bean 名稱後跟一個短劃線(-),然後是文字 in,再跟一個短劃線,最後是引數的序數位置。您可以使用此繫結名稱設定其他屬性,例如目標。例如,spring.cloud.stream.bindings.process-in-0.destination=my-topic

如果未在繫結上設定目標屬性,則會建立一個與繫結同名的主題(如果應用程式具有足夠的許可權)或該主題預計已可用。

構建為 uber-jar(例如,kstream-consumer-app.jar)後,您可以按以下方式執行上述示例。

如果應用程式選擇使用 Spring 的 Component 註解定義函式式 Bean,繫結器也支援該模型。上述函式式 Bean 可以重寫如下。

@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {

    @Override
    public void accept(KStream<Object, String> input) {
        input.foreach((key, value) -> {
            System.out.println("Key: " + key + " Value: " + value);
        });
    }
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic

這是另一個示例,它是一個具有輸入和輸出繫結的完整處理器。這是經典的詞頻統計示例,應用程式從一個主題接收資料,然後在一個翻轉時間視窗中計算每個單詞的出現次數。

@SpringBootApplication
public class WordCountProcessorApplication {

  @Bean
  public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    return input -> input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, value) -> new KeyValue<>(value, value))
                .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("word-counts-state-store"))
                .toStream()
                .map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))));
  }

	public static void main(String[] args) {
		SpringApplication.run(WordCountProcessorApplication.class, args);
	}
}

這裡又是一個完整的 Spring Boot 應用程式。與第一個應用程式的區別在於,這裡的 Bean 方法型別為 java.util.function.FunctionFunction 的第一個引數化型別用於輸入 KStream,第二個用於輸出。在方法體中,提供了一個型別為 Function 的 lambda 表示式,並作為實現,給出了實際的業務邏輯。與前面討論的基於 Consumer 的應用程式類似,這裡的輸入繫結預設命名為 process-in-0。對於輸出,繫結名稱也自動設定為 process-out-0

構建為 uber-jar(例如,wordcount-processor.jar)後,您可以按以下方式執行上述示例。

java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts

此應用程式將從 Kafka 主題 words 消費訊息,並將計算結果釋出到輸出主題 counts

Spring Cloud Stream 將確保來自入站和出站主題的訊息自動繫結為 KStream 物件。作為開發人員,您可以專注於程式碼的業務方面,即編寫處理器中所需的邏輯。Kafka Streams 基礎設施所需的 Kafka Streams 特定配置將由框架自動處理。

我們上面看到的兩個示例都有一個單一的 KStream 輸入繫結。在這兩種情況下,繫結都從一個主題接收記錄。如果您想將多個主題複用到一個 KStream 繫結中,您可以在下面提供逗號分隔的 Kafka 主題作為目標。

spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3

此外,如果您想根據正則表示式匹配主題,還可以將主題模式作為目標。

spring.cloud.stream.bindings.process-in-0.destination=input.*

多個輸入繫結

許多非平凡的 Kafka Streams 應用程式通常透過多個繫結從多個主題消費資料。例如,一個主題作為 Kstream 消費,另一個作為 KTableGlobalKTable 消費。應用程式可能希望以表型別接收資料的原因有很多。考慮一個用例,其中底層主題透過資料庫的變更資料捕獲 (CDC) 機制填充,或者應用程式只關心最新更新以進行下游處理。如果應用程式指定需要將資料繫結為 KTableGlobalKTable,那麼 Kafka Streams 繫結器將正確地將目標繫結到 KTableGlobalKTable,並使其可供應用程式操作。我們將看看 Kafka Streams 繫結器中如何處理多個輸入繫結的幾種不同場景。

Kafka Streams 繫結器中的 BiFunction

這是一個我們有兩個輸入和一個輸出的例子。在這種情況下,應用程式可以利用 java.util.function.BiFunction

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
                    regionWithClicks.getClicks()))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum)
            .toStream());
}

這裡,基本主題與前面的示例相同,但這裡我們有兩個輸入。Java 的 BiFunction 支援用於將輸入繫結到所需的目標。繫結器為輸入生成的預設繫結名稱分別是 process-in-0process-in-1。預設輸出繫結是 process-out-0。在此示例中,BiFunction 的第一個引數繫結為第一個輸入的 KStream,第二個引數繫結為第二個輸入的 KTable

Kafka Streams 繫結器中的 BiConsumer

如果有兩個輸入,但沒有輸出,在這種情況下,我們可以使用 java.util.function.BiConsumer,如下所示。

@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
    return (userClicksStream, userRegionsTable) -> {}
}

超過兩個輸入

如果您有兩個以上的輸入怎麼辦?在某些情況下,您需要兩個以上的輸入。在這種情況下,繫結器允許您連結部分函式。在函數語言程式設計術語中,這種技術通常稱為柯里化(currying)。隨著 Java 8 中新增的函數語言程式設計支援,Java 現在使您能夠編寫柯里化函式。Spring Cloud Stream Kafka Streams 繫結器可以利用此功能啟用多個輸入繫結。

讓我們看一個例子。

@Bean
public Function<KStream<Long, Order>,
        Function<GlobalKTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    return orders -> (
              customers -> (
                    products -> (
                        orders.join(customers,
                            (orderId, order) -> order.getCustomerId(),
                                (order, customer) -> new CustomerOrder(customer, order))
                                .join(products,
                                        (orderId, customerOrder) -> customerOrder
                                                .productId(),
                                        (customerOrder, product) -> {
                                            EnrichedOrder enrichedOrder = new EnrichedOrder();
                                            enrichedOrder.setProduct(product);
                                            enrichedOrder.setCustomer(customerOrder.customer);
                                            enrichedOrder.setOrder(customerOrder.order);
                                            return enrichedOrder;
                                        })
                        )
                )
    );
}

讓我們看看上面介紹的繫結模型的細節。在這個模型中,我們有 3 個應用於入站的部分函式。我們稱它們為 f(x)f(y)f(z)。如果我們將這些函式按真實數學函式的意義展開,它將看起來像這樣:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>。變數 x 代表 KStream<Long, Order>,變數 y 代表 GlobalKTable<Long, Customer>,變數 z 代表 GlobalKTable<Long, Product>。第一個函式 f(x) 具有應用程式的第一個輸入繫結(KStream<Long, Order>),其輸出是函式 f(y)。函式 f(y) 具有應用程式的第二個輸入繫結(GlobalKTable<Long, Customer>),其輸出是另一個函式 f(z)。函式 f(z) 的輸入是應用程式的第三個輸入(GlobalKTable<Long, Product>),其輸出是 KStream<Long, EnrichedOrder>,這是應用程式的最終輸出繫結。來自三個部分函式的輸入,分別是 KStreamGlobalKTableGlobalKTable,在方法體中可供您用於實現作為 lambda 表示式一部分的業務邏輯。

輸入繫結分別命名為 enrichOrder-in-0enrichOrder-in-1enrichOrder-in-2。輸出繫結命名為 enrichOrder-out-0

使用柯里化函式,您可以幾乎擁有任意數量的輸入。但是,請記住,如果輸入的數量超過少數幾個,並且像 Java 中那樣部分應用函式,可能會導致程式碼難以閱讀。因此,如果您的 Kafka Streams 應用程式需要超過合理數量的輸入繫結,並且您想使用這種函式式模型,那麼您可能需要重新考慮您的設計並適當分解應用程式。

輸出繫結

Kafka Streams 繫結器允許 KStreamKTable 型別的作為輸出繫結。在幕後,繫結器使用 KStream 上的 to 方法將結果記錄傳送到輸出主題。如果應用程式在函式中提供了 KTable 作為輸出,繫結器仍然透過委託給 KStreamto 方法來使用此技術。

例如,以下兩個函式都將起作用

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}

多重輸出繫結

Kafka Streams 允許將出站資料寫入多個主題。此功能在 Kafka Streams 中稱為分支。使用多個輸出繫結時,您需要提供一個 KStream 陣列(KStream[])作為出站返回型別。

這是一個例子

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {

    Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
    Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
    Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

    return input -> {
        final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                .count(Materialized.as("WordCounts-branch"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))))
                .split()
                .branch(isEnglish)
                .branch(isFrench)
                .branch(isSpanish)
                .noDefaultBranch();

        return stringKStreamMap.values().toArray(new KStream[0]);
    };
}

程式設計模型保持不變,但是出站引數化型別是 KStream[]。對於上面的函式,預設的輸出繫結名稱分別是 process-out-0process-out-1process-out-2。繫結器生成三個輸出繫結的原因是它檢測到返回的 KStream 陣列的長度為三。請注意,在此示例中,我們提供了 noDefaultBranch();如果我們使用 defaultBranch(),那將需要一個額外的輸出繫結,本質上返回一個長度為四的 KStream 陣列。

Kafka Streams 基於函式的程式設計風格總結

總而言之,下表顯示了可以在函式式正規化中使用的各種選項。

輸入數量 輸出數量 要使用的元件

1

0

java.util.function.Consumer

2

0

java.util.function.BiConsumer

1

1..n

java.util.function.Function

2

1..n

java.util.function.BiFunction

>= 3

0..n

使用柯里化函式

  • 在此表中,如果輸出多於一個,則型別簡單地變為 KStream[]

Kafka Streams 繫結器中的函式組合

Kafka Streams 繫結器支援線性拓撲的最小形式函式組合。使用 Java 函式式 API 支援,您可以編寫多個函式,然後使用 andThen 方法自行組合它們。例如,假設您有以下兩個函式。

@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
    return input -> input.peek((s, s2) -> {});
}

@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
    return input -> input.peek((s, s2) -> {});
}

即使繫結器中沒有函式組合支援,您也可以如下組合這兩個函式。

@Bean
public Function<KStream<String, String>, KStream<String, Long>> composed() {
    foo().andThen(bar());
}

然後您可以提供形式為 spring.cloud.function.definition=foo;bar;composed 的定義。有了繫結器中的函式組合支援,您無需編寫進行顯式函式組合的第三個函式。

您可以簡單地這樣做

spring.cloud.function.definition=foo|bar

你甚至可以這樣做

spring.cloud.function.definition=foo|bar;foo;bar

此示例中組合函式的預設繫結名稱變為 foobar-in-0foobar-out-0

Kafka Streams 繫結器中函式組合的侷限性

當您有一個 java.util.function.Function bean 時,它可以與另一個函式或多個函式組合。同一個函式 bean 也可以與 java.util.function.Consumer 組合。在這種情況下,消費者是最後一個組合的元件。一個函式可以與多個函式組合,然後以 java.util.function.Consumer bean 結尾。

組合 java.util.function.BiFunction 型別的 Bean 時,BiFunction 必須是定義中的第一個函式。組合的實體必須是 java.util.function.Functionjava.util.function.Consumer 型別。換句話說,您不能接受一個 BiFunction Bean,然後與另一個 BiFunction 組合。

您不能與 BiConsumer 型別或 Consumer 作為第一個元件的定義進行組合。您也不能與輸出為陣列(用於分支的 KStream[])的函式進行組合,除非這是定義中的最後一個元件。

函式定義中的第一個 FunctionBiFunction 也可以使用柯里化形式。例如,以下是可能的。

@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
    return a -> b ->
            a.join(b, (value1, value2) -> value1 + value2);
}

@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
    return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}

函式定義可以是 curriedFoo|bar。在幕後,繫結器將為柯里化函式建立兩個輸入繫結,並根據定義中的最終函式建立一個輸出繫結。在這種情況下,預設的輸入繫結將是 curriedFoobar-in-0curriedFoobar-in-1。此示例的預設輸出繫結將是 curriedFoobar-out-0

關於在函式組合中使用 KTable 作為輸出的特別說明

假設您有以下兩個函式。

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}

您可以將它們組合為 foo|bar,但請記住,第二個函式(本例中的 bar)必須將 KTable 作為輸入,因為第一個函式(foo)將 KTable 作為輸出。

© . This site is unofficial and not affiliated with VMware.