測試應用

spring-kafka-test jar 包含一些實用的工具,可幫助您測試應用程式。

嵌入式 Kafka Broker

提供了兩種實現

  • EmbeddedKafkaZKBroker - 遺留實現,啟動一個嵌入式 Zookeeper 例項(使用 EmbeddedKafka 時這仍然是預設設定)。

  • EmbeddedKafkaKraftBroker - 在組合控制器和 Broker 模式下使用 Kraft 而非 Zookeeper(自 3.1 版本起)。

以下部分討論了幾種配置 Broker 的技術。

KafkaTestUtils

org.springframework.kafka.test.utils.KafkaTestUtils 提供了一些靜態輔助方法,用於消費記錄、檢索各種記錄偏移量等。有關完整詳細資訊,請參閱其 Javadocs

JUnit

org.springframework.kafka.test.utils.KafkaTestUtils 還提供了一些靜態方法來設定 Producer 和 Consumer 屬性。以下列出了這些方法簽名:

/**
 * Set up test properties for an {@code <Integer, String>} consumer.
 * @param group the group id.
 * @param autoCommit the auto commit.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> consumerProps(String group, String autoCommit,
                                       EmbeddedKafkaBroker embeddedKafka) { ... }

/**
 * Set up test properties for an {@code <Integer, String>} producer.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }

從 2.5 版本開始,consumerProps 方法將 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 設定為 earliest。這是因為在大多數情況下,您希望 Consumer 能夠消費測試用例中傳送的任何訊息。ConsumerConfig 的預設值是 latest,這意味著測試在 Consumer 啟動之前傳送的訊息將不會被 Consumer 接收。要恢復之前的行為,請在呼叫方法後將該屬性設定為 latest

使用嵌入式 Broker 時,通常最佳實踐是為每個測試使用不同的 Topic,以防止相互干擾。如果由於某些原因無法做到這一點,請注意 consumeFromEmbeddedTopics 方法的預設行為是在分配分割槽後將 Consumer 定位到分割槽起始位置。由於該方法無法訪問 Consumer 屬性,因此您必須使用接受 seekToEnd 布林引數的過載方法來定位到分割槽末尾而非起始位置。

提供了 EmbeddedKafkaZKBroker 的 JUnit 4 @Rule 包裝器,用於建立嵌入式 Kafka 和嵌入式 Zookeeper 伺服器。(有關在 JUnit 5 中使用 @EmbeddedKafka 的資訊,請參閱@EmbeddedKafka 註解)。以下列出了這些方法簽名:

/**
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param topics the topics to create (2 partitions per).
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }

/**
 *
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param partitions partitions per topic.
 * @param topics the topics to create.
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }
JUnit4 不支援 EmbeddedKafkaKraftBroker

EmbeddedKafkaBroker 類有一個實用方法,可以消費它建立的所有 Topic 中的訊息。以下示例展示瞭如何使用它:

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);

KafkaTestUtils 有一些實用方法可以從 Consumer 中獲取結果。以下列出了這些方法簽名:

/**
 * Poll the consumer, expecting a single record for the specified topic.
 * @param consumer the consumer.
 * @param topic the topic.
 * @return the record.
 * @throws org.junit.ComparisonFailure if exactly one record is not received.
 */
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }

/**
 * Poll the consumer for records.
 * @param consumer the consumer.
 * @return the records.
 */
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }

以下示例展示瞭如何使用 KafkaTestUtils

...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...

EmbeddedKafkaBroker 啟動嵌入式 Kafka 和嵌入式 Zookeeper 伺服器時,系統屬性 spring.embedded.kafka.brokers 將被設定為 Kafka Broker 的地址,系統屬性 spring.embedded.zookeeper.connect 將被設定為 Zookeeper 的地址。為此屬性提供了方便的常量(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERSEmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT)。

除了預設的 spring.embedded.kafka.brokers 系統屬性外,Kafka Broker 的地址可以暴露給任何任意且方便的屬性。為此,可以在啟動嵌入式 Kafka 之前設定一個 spring.embedded.kafka.brokers.propertyEmbeddedKafkaBroker.BROKER_LIST_PROPERTY)系統屬性。例如,對於 Spring Boot,期望設定 spring.kafka.bootstrap-servers 配置屬性以自動配置 Kafka 客戶端。因此,在使用隨機埠執行嵌入式 Kafka 進行測試之前,我們可以將 spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers 設定為一個系統屬性 - EmbeddedKafkaBroker 將使用它來暴露其 Broker 地址。現在這是此屬性的預設值(自 3.0.10 版本起)。

透過 EmbeddedKafkaBroker.brokerProperties(Map<String, String>),您可以為 Kafka 伺服器提供額外的屬性。有關可能的 Broker 屬性的更多資訊,請參閱 Kafka 配置

配置 Topic

以下示例配置建立了名為 cathat 的 Topic(各包含五個分割槽)、一個名為 thing1 的 Topic(包含 10 個分割槽)以及一個名為 thing2 的 Topic(包含 15 個分割槽)。

public class MyTests {

    @ClassRule
    private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "cat", "hat");

    @Test
    public void test() {
        embeddedKafkaRule.getEmbeddedKafka()
              .addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
        ...
    }

}

預設情況下,當出現問題時(例如新增已存在的 Topic),addTopics 會丟擲異常。2.6 版本添加了該方法的新版本,返回一個 Map<String, Exception>;Key 是 Topic 名稱,成功時值為 null,失敗時值為 Exception

在多個測試類中使用相同的 Broker

您可以使用類似於以下方式在多個測試類中使用同一個 Broker:

public final class EmbeddedKafkaHolder {

    private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaZKBroker(1, false)
            .brokerListProperty("spring.kafka.bootstrap-servers");

    private static boolean started;

    public static EmbeddedKafkaBroker getEmbeddedKafka() {
        if (!started) {
            try {
                embeddedKafka.afterPropertiesSet();
            }
            catch (Exception e) {
                throw new KafkaException("Embedded broker failed to start", e);
            }
            started = true;
        }
        return embeddedKafka;
    }

    private EmbeddedKafkaHolder() {
        super();
    }

}

這假設了一個 Spring Boot 環境,並且嵌入式 Broker 替換了 bootstrap servers 屬性。

然後,在每個測試類中,您可以使用類似於以下方式:

static {
    EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}

private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();

如果您不使用 Spring Boot,可以使用 broker.getBrokersAsString() 獲取 bootstrap servers。

前面的示例沒有提供在所有測試完成後關閉 Broker 的機制。例如,如果您在 Gradle daemon 中執行測試,這可能會成為一個問題。在這種情況下,您不應使用此技術,或者您應該使用某種方式在測試完成後呼叫 EmbeddedKafkaBroker 上的 destroy() 方法。

從 3.0 版本開始,框架為 JUnit Platform 暴露了一個 GlobalEmbeddedKafkaTestExecutionListener;預設情況下它是停用的。這需要 JUnit Platform 1.8 或更高版本。此監聽器的目的是為整個測試計劃啟動一個全域性 EmbeddedKafkaBroker 並在計劃結束時將其停止。要啟用此監聽器,從而在專案中的所有測試中擁有一個全域性嵌入式 Kafka 叢集,必須透過系統屬性或 JUnit Platform 配置將 spring.kafka.global.embedded.enabled 屬性設定為 true。此外,還可以提供以下屬性:

  • spring.kafka.embedded.count - 要管理的 Kafka Broker 數量;

  • spring.kafka.embedded.ports - 要啟動的每個 Kafka Broker 的埠(逗號分隔值),如果首選隨機埠則為 0;值的數量必須等於上面提到的 count

  • spring.kafka.embedded.topics - 在啟動的 Kafka 叢集中要建立的 Topic(逗號分隔值);

  • spring.kafka.embedded.partitions - 為建立的 Topic 配置的分割槽數量;

  • spring.kafka.embedded.broker.properties.location - 包含額外 Kafka Broker 配置屬性的檔案位置;此屬性的值必須遵循 Spring 資源抽象模式;

  • spring.kafka.embedded.kraft - 預設為 false,當設定為 true 時,使用 EmbeddedKafkaKraftBroker 而非 EmbeddedKafkaZKBroker

這些屬性本質上模仿了 @EmbeddedKafka 的一些屬性。

有關配置屬性以及如何提供它們的更多資訊,請參閱 JUnit 5 使用者指南。例如,可以將 spring.embedded.kafka.brokers.property=my.bootstrap-servers 條目新增到測試 classpath 中的 junit-platform.properties 檔案中。從 3.0.10 版本開始,Broker 預設情況下會自動將其設定為 spring.kafka.bootstrap-servers,用於 Spring Boot 應用程式的測試。

建議不要在同一個測試套件中同時使用全域性嵌入式 Kafka 和每個測試類的嵌入式 Kafka。它們共享相同的系統屬性,因此很可能導致意外行為。
spring-kafka-test 傳遞性依賴於 junit-jupiter-apijunit-platform-launcher(後者用於支援全域性嵌入式 Broker)。如果您希望使用嵌入式 Broker 但不使用 JUnit,則可能需要排除這些依賴。

@EmbeddedKafka 註解

我們通常建議您將規則用作 @ClassRule,以避免在測試之間啟動和停止 Broker(併為每個測試使用不同的 Topic)。從 2.0 版本開始,如果您使用 Spring 的測試應用上下文快取,您也可以宣告一個 EmbeddedKafkaBroker bean,這樣就可以在多個測試類中共享一個 Broker。為了方便起見,我們提供了一個測試類級別的註解 @EmbeddedKafka 來註冊 EmbeddedKafkaBroker bean。以下示例展示瞭如何使用它:

@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
         topics = {
                 KafkaStreamsTests.STREAMING_TOPIC1,
                 KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    public void someTest() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<Integer, String> consumer = cf.createConsumer();
        this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
        ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
        assertThat(replies.count()).isGreaterThanOrEqualTo(1);
    }

    @Configuration
    @EnableKafkaStreams
    public static class TestKafkaStreamsConfiguration {

        @Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
        private String brokerAddresses;

        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration kStreamsConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
            return new KafkaStreamsConfiguration(props);
        }

    }

}

從 2.2.4 版本開始,您還可以使用 @EmbeddedKafka 註解來指定 Kafka 埠屬性。

從 3.2 版本開始,將 kraft 屬性設定為 true 可使用 EmbeddedKafkaKraftBroker 而非 EmbeddedKafkaZKBroker

以下示例設定了 @EmbeddedKafka 支援屬性佔位符解析的 topicsbrokerPropertiesbrokerPropertiesLocation 屬性:

@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
        brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
                            "listeners=PLAINTEXT://:${kafka.broker.port}",
                            "auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
        brokerPropertiesLocation = "classpath:/broker.properties")

在前面的示例中,屬性佔位符 `${kafka.topics.another-topic}`、`${kafka.broker.logs-dir}` 和 `${kafka.broker.port}` 是從 Spring Environment 中解析的。此外,Broker 屬性從由 brokerPropertiesLocation 指定的 broker.properties classpath 資源中載入。屬性佔位符會為 brokerPropertiesLocation URL 以及資源中找到的任何屬性佔位符進行解析。brokerProperties 定義的屬性會覆蓋 brokerPropertiesLocation 中找到的屬性。

您可以在 JUnit 4 或 JUnit 5 中使用 @EmbeddedKafka 註解。

在 JUnit5 中使用 @EmbeddedKafka 註解

從 2.3 版本開始,有兩種方法可以在 JUnit5 中使用 @EmbeddedKafka 註解。當與 @SpringJunitConfig 註解一起使用時,嵌入式 Broker 會新增到測試應用上下文中。您可以在類或方法級別自動注入 Broker 到您的測試中,以獲取 Broker 地址列表。

使用 Spring 測試上下文時,EmbdeddedKafkaCondition 會建立一個 Broker;此條件包含一個引數解析器,因此您可以在測試方法中訪問 Broker。

@EmbeddedKafka
public class EmbeddedKafkaConditionTests {

    @Test
    public void test(EmbeddedKafkaBroker broker) {
        String brokerList = broker.getBrokersAsString();
        ...
    }

}

將建立一個獨立的 Broker(在 Spring 的 TestContext 之外),除非用 @EmbeddedKafka 註解的類也被 ExtendWith(SpringExtension.class) 註解(或元註解)。@SpringJunitConfig@SpringBootTest 就是這樣元註解的,當它們中的任何一個也存在時,將使用基於上下文的 Broker。

當存在 Spring 測試應用上下文時,Topic 和 Broker 屬性可以包含屬性佔位符,只要屬性在某個地方定義了,它們就會被解析。如果不存在 Spring 上下文,這些佔位符將不會被解析。

@SpringBootTest 註解中使用嵌入式 Broker

Spring Initializr 現在會自動將 spring-kafka-test 依賴新增到專案配置的測試範圍中。

如果您的應用程式在 spring-cloud-stream 中使用了 Kafka Binder,並且希望在測試中使用嵌入式 Broker,則必須移除 spring-cloud-stream-test-support 依賴,因為它會用一個測試 Binder 替換實際的 Binder。如果您希望某些測試使用測試 Binder,而另一些使用嵌入式 Broker,則使用實際 Binder 的測試需要透過在測試類中排除 Binder 自動配置來停用測試 Binder。以下示例展示瞭如何操作:

@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.autoconfigure.exclude="
    + "org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration")
public class MyApplicationTests {
    ...
}

在 Spring Boot 應用測試中有幾種使用嵌入式 Broker 的方法。

包括:

JUnit4 類規則

以下示例展示瞭如何使用 JUnit4 類規則建立嵌入式 Broker:

@RunWith(SpringRunner.class)
@SpringBootTest
public class MyApplicationTests {

    @ClassRule
    public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1, false, "someTopic")
            .brokerListProperty("spring.kafka.bootstrap-servers");

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}

請注意,由於這是一個 Spring Boot 應用程式,我們覆蓋了 Broker 列表屬性以設定 Spring Boot 的屬性。

@EmbeddedKafka@SpringJunitConfig

在使用 @EmbeddedKafka@SpringJUnitConfig 時,建議在測試類上使用 @DirtiesContext。這是為了防止在測試套件中執行多個測試後,JVM 關閉期間可能出現的競態條件。例如,如果不使用 @DirtiesContextEmbeddedKafkaBroker 可能會提前關閉,而應用上下文仍然需要其中的資源。由於每次 @EmbeddedKafka 測試執行都會建立自己的臨時目錄,當發生這種競態條件時,會產生錯誤日誌訊息,指示其嘗試刪除或清理的檔案不再可用。新增 @DirtiesContext 將確保在每次測試後清理應用上下文並且不進行快取,從而降低此類潛在資源競態條件的風險。

@EmbeddedKafka 註解或 EmbeddedKafkaBroker Bean

以下示例展示瞭如何使用 @EmbeddedKafka 註解建立嵌入式 Broker:

@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers") // this is now the default
public class MyApplicationTests {

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}
預設情況下,bootstrapServersProperty 會自動設定為 spring.kafka.bootstrap-servers,從 3.0.10 版本開始。

Hamcrest 匹配器

org.springframework.kafka.test.hamcrest.KafkaMatchers 提供了以下匹配器:

/**
 * @param key the key
 * @param <K> the type.
 * @return a Matcher that matches the key in a consumer record.
 */
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Matcher that matches the value in a consumer record.
 */
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }

/**
 * @param partition the partition.
 * @return a Matcher that matches the partition in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
 * {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
 *
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
  return hasTimestamp(TimestampType.CREATE_TIME, ts);
}

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord}
 * @param type timestamp type of the record
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
  return new ConsumerRecordTimestampMatcher(type, ts);
}

AssertJ 條件

您可以使用以下 AssertJ 條件:

/**
 * @param key the key
 * @param <K> the type.
 * @return a Condition that matches the key in a consumer record.
 */
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Condition that matches the value in a consumer record.
 */
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }

/**
 * @param key the key.
 * @param value the value.
 * @param <K> the key type.
 * @param <V> the value type.
 * @return a Condition that matches the key in a consumer record.
 * @since 2.2.12
 */
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }

/**
 * @param partition the partition.
 * @return a Condition that matches the partition in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }

/**
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
  return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}

/**
 * @param type the type of timestamp
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
  return new ConsumerRecordTimestampCondition(type, value);
}

示例

以下示例彙集了本章涵蓋的大部分主題:

public class KafkaTemplateTests {

    private static final String TEMPLATE_TOPIC = "templateTopic";

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);

    @Test
    public void testTemplate() throws Exception {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
            embeddedKafka.getEmbeddedKafka());
        DefaultKafkaConsumerFactory<Integer, String> cf =
                            new DefaultKafkaConsumerFactory<>(consumerProps);
        ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
        KafkaMessageListenerContainer<Integer, String> container =
                            new KafkaMessageListenerContainer<>(cf, containerProperties);
        final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
        container.setupMessageListener(new MessageListener<Integer, String>() {

            @Override
            public void onMessage(ConsumerRecord<Integer, String> record) {
                System.out.println(record);
                records.add(record);
            }

        });
        container.setBeanName("templateTests");
        container.start();
        ContainerTestUtils.waitForAssignment(container,
                            embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
        Map<String, Object> producerProps =
                            KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
        ProducerFactory<Integer, String> pf =
                            new DefaultKafkaProducerFactory<>(producerProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
        template.setDefaultTopic(TEMPLATE_TOPIC);
        template.sendDefault("foo");
        assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
        template.sendDefault(0, 2, "bar");
        ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("bar"));
        template.send(TEMPLATE_TOPIC, 0, 2, "baz");
        received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("baz"));
    }

}

前面的示例使用了 Hamcrest 匹配器。使用 AssertJ,最後一部分程式碼如下所示:

assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));

Mock Consumer 和 Producer

kafka-clients 庫提供了 MockConsumerMockProducer 類用於測試目的。

如果您希望在某些使用監聽器容器或 KafkaTemplate 的測試中使用這些類,從 3.0.7 版本開始,框架現在提供了 MockConsumerFactoryMockProducerFactory 實現。

這些工廠可以在監聽器容器和模板中使用,而非需要執行(或嵌入式)Broker 的預設工廠。

這是一個返回單個 Consumer 的簡單實現示例:

@Bean
ConsumerFactory<String, String> consumerFactory() {
    MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    TopicPartition topicPartition0 = new TopicPartition("topic", 0);
    List<TopicPartition> topicPartitions = Collections.singletonList(topicPartition0);
    Map<TopicPartition, Long> beginningOffsets = topicPartitions.stream().collect(Collectors
            .toMap(Function.identity(), tp -> 0L));
    consumer.updateBeginningOffsets(beginningOffsets);
    consumer.schedulePollTask(() -> {
        consumer.addRecord(
                new ConsumerRecord<>("topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test1",
                        new RecordHeaders(), Optional.empty()));
        consumer.addRecord(
                new ConsumerRecord<>("topic", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test2",
                        new RecordHeaders(), Optional.empty()));
    });
    return new MockConsumerFactory(() -> consumer);
}

如果您希望進行併發測試,工廠建構函式中的 Supplier lambda 需要每次都建立一個新例項。

對於 MockProducerFactory,有兩個建構函式;一個用於建立簡單工廠,一個用於建立支援事務的工廠。

示例:

@Bean
ProducerFactory<String, String> nonTransFactory() {
    return new MockProducerFactory<>(() ->
            new MockProducer<>(true, new StringSerializer(), new StringSerializer()));
}

@Bean
ProducerFactory<String, String> transFactory() {
    MockProducer<String, String> mockProducer =
            new MockProducer<>(true, new StringSerializer(), new StringSerializer());
    mockProducer.initTransactions();
    return new MockProducerFactory<String, String>((tx, id) -> mockProducer, "defaultTxId");
}

注意在第二種情況下,lambda 是一個 BiFunction<Boolean, String>,其中第一個引數為 true 表示呼叫者需要一個事務性 Producer;可選的第二個引數包含事務 ID。這可以是預設值(由建構函式提供),或者在配置了 KafkaTransactionManager(或用於本地事務的 KafkaTemplate)的情況下,可以被覆蓋。提供事務 ID 是為了您可能希望根據此值使用不同的 MockProducer

如果您在多執行緒環境中使用 Producer,BiFunction 應該返回多個 Producer(可能使用 ThreadLocal 進行執行緒繫結)。

事務性 MockProducer 必須透過呼叫 initTransaction() 進行事務初始化。

使用 MockProducer 時,如果您不想在每次傳送後關閉 Producer,則可以提供一個自定義 MockProducer 實現,它覆蓋 close 方法,但不呼叫父類的 close 方法。這對於測試很方便,可以在不關閉同一個 Producer 的情況下驗證多次釋出。

示例:

@Bean
MockProducer<String, String> mockProducer() {
    return new MockProducer<>(false, new StringSerializer(), new StringSerializer()) {
        @Override
        public void close() {

        }
    };
}

@Bean
ProducerFactory<String, String> mockProducerFactory(MockProducer<String, String> mockProducer) {
    return new MockProducerFactory<>(() -> mockProducer);
}