測試應用程式

spring-kafka-test jar 包含一些有用的工具來幫助測試您的應用程式。

嵌入式 Kafka Broker

由於 Kafka 4.0 已完全過渡到 KRaft 模式,現在只有 EmbeddedKafkaKraftBroker 實現可用。

  • EmbeddedKafkaKraftBroker - 在組合的控制器和代理模式下使用 Kraft

有幾種配置代理的技術,如下所述。

KafkaTestUtils

org.springframework.kafka.test.utils.KafkaTestUtils 提供了許多靜態輔助方法來消費記錄、檢索各種記錄偏移量等。請參閱其 Javadocs 獲取完整詳情。

JUnit

org.springframework.kafka.test.utils.KafkaTestUtils 提供了一些靜態方法來設定生產者和消費者屬性。以下清單顯示了這些方法簽名

/**
 * Set up test properties for an {@code <Integer, String>} consumer.
 * @param group the group id.
 * @param autoCommit the auto commit.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> consumerProps(String group, String autoCommit,
                                       EmbeddedKafkaBroker embeddedKafka) { ... }

/**
 * Set up test properties for an {@code <Integer, String>} producer.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }

從 2.5 版本開始,consumerProps 方法將 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 設定為 earliest。這是因為在大多數情況下,您希望消費者消費測試用例中傳送的任何訊息。ConsumerConfig 預設值為 latest,這意味著測試在消費者啟動之前傳送的訊息將不會收到這些記錄。要恢復到以前的行為,請在呼叫方法後將屬性設定為 latest

使用嵌入式代理時,通常最佳實踐是為每個測試使用不同的主題,以防止串擾。如果由於某種原因無法做到這一點,請注意 consumeFromEmbeddedTopics 方法的預設行為是在分配後將分配的分割槽查詢至開頭。由於它無法訪問消費者屬性,因此您必須使用帶有 seekToEnd 布林引數的過載方法,以便查詢至末尾而不是開頭。

Spring for Apache Kafka 不再支援 JUnit 4。建議遷移到 JUnit Jupiter。

EmbeddedKafkaBroker 類有一個實用方法,可以讓您消費它建立的所有主題。以下示例顯示瞭如何使用它

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);

KafkaTestUtils 有一些實用方法可以從消費者那裡獲取結果。以下清單顯示了這些方法簽名

/**
 * Poll the consumer, expecting a single record for the specified topic.
 * @param consumer the consumer.
 * @param topic the topic.
 * @return the record.
 * @throws org.junit.ComparisonFailure if exactly one record is not received.
 */
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }

/**
 * Poll the consumer for records.
 * @param consumer the consumer.
 * @return the records.
 */
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }

以下示例顯示瞭如何使用 KafkaTestUtils

...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...

EmbeddedKafkaBroker 啟動嵌入式 Kafka 代理時,一個名為 spring.embedded.kafka.brokers 的系統屬性被設定為 Kafka 代理的地址。為該屬性提供了方便的常量(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS)。

除了預設的 spring.embedded.kafka.brokers 系統屬性之外,Kafka 代理的地址還可以暴露給任何任意且方便的屬性。為此,可以在啟動嵌入式 Kafka 之前設定 spring.embedded.kafka.brokers.property (EmbeddedKafkaBroker.BROKER_LIST_PROPERTY) 系統屬性。例如,對於 Spring Boot,spring.kafka.bootstrap-servers 配置屬性預計會為 Kafka 客戶端的自動配置而設定。因此,在隨機埠上執行嵌入式 Kafka 的測試之前,我們可以將 spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers 設定為系統屬性 - EmbeddedKafkaBroker 將使用它來暴露其代理地址。現在這是此屬性的預設值(從 3.0.10 版本開始)。

透過 EmbeddedKafkaBroker.brokerProperties(Map<String, String>),您可以為 Kafka 伺服器提供額外的屬性。有關可能的代理屬性的更多資訊,請參閱 Kafka 配置

配置主題

以下示例配置建立了名為 cathat 的主題,有五個分割槽;名為 thing1 的主題,有 10 個分割槽;以及名為 thing2 的主題,有 15 個分割槽。

@SpringJUnitConfig
@EmbeddedKafka(
        partitions = 5,
        topics = {"cat", "hat"}
)
public class MyTests {

    @Autowired
    private EmbeddedKafkaBroker broker;

    @Test
    public void test() {
        broker.addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
        ...
    }

}

預設情況下,當出現問題(例如新增已存在的主題)時,addTopics 將丟擲異常。2.6 版本添加了該方法的新版本,它返回一個 Map<String, Exception>;鍵是主題名稱,值為 null 表示成功,或 Exception 表示失敗。

為多個測試類使用相同的 Broker

您可以使用類似以下內容的方法為多個測試類使用相同的 broker

public final class EmbeddedKafkaHolder {

    private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaZKBroker(1, false)
            .brokerListProperty("spring.kafka.bootstrap-servers");

    private static volatile boolean started;

    public static EmbeddedKafkaBroker getEmbeddedKafka() {
        if (!started) {
            synchronized (EmbeddedKafkaBroker.class) {
		    	try {
	        		embeddedKafka.afterPropertiesSet();
		    	}
	        	catch (Exception e) {
	        		throw new KafkaException("Embedded broker failed to start", e);
		    	}
	        	started = true;
			}
        }
        return embeddedKafka;
    }
}

這假設一個 Spring Boot 環境,並且嵌入式 broker 替換了 bootstrap servers 屬性。

然後,在每個測試類中,您可以使用類似以下內容的方法

static {
    EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}

private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();

如果您沒有使用 Spring Boot,您可以使用 broker.getBrokersAsString() 獲取引導伺服器。

前面的示例沒有提供在所有測試完成後關閉代理的機制。如果在 Gradle daemon 中執行測試,這可能會成為一個問題。在這種情況下不應使用此技術,或者在測試完成後應使用某種方法呼叫 EmbeddedKafkaBroker 上的 destroy()

從 3.0 版本開始,框架為 JUnit Platform 暴露了一個 GlobalEmbeddedKafkaTestExecutionListener;它預設停用。這需要 JUnit Platform 1.8 或更高版本。此監聽器的目的是在整個測試計劃中啟動一個全域性 EmbeddedKafkaBroker,並在計劃結束時停止它。要啟用此監聽器,從而為專案中的所有測試提供一個單一的全域性嵌入式 Kafka 叢集,必須透過系統屬性或 JUnit Platform 配置將 spring.kafka.global.embedded.enabled 屬性設定為 true。此外,還可以提供以下屬性

  • spring.kafka.embedded.count - 要管理的 Kafka 代理數量;

  • spring.kafka.embedded.ports - 每個要啟動的 Kafka 代理的埠(逗號分隔值),如果首選隨機埠則為 0;值的數量必須等於上述 count

  • spring.kafka.embedded.topics - 在啟動的 Kafka 叢集中要建立的主題(逗號分隔值);

  • spring.kafka.embedded.partitions - 為建立的主題分配的分割槽數量;

  • spring.kafka.embedded.broker.properties.location - 額外 Kafka 代理配置屬性檔案的位置;此屬性的值必須遵循 Spring 資源抽象模式。

本質上,這些屬性模仿了 @EmbeddedKafka 的一些屬性。

有關配置屬性以及如何提供它們的更多資訊,請參閱 JUnit Jupiter 使用者指南。例如,可以將 spring.embedded.kafka.brokers.property=my.bootstrap-servers 條目新增到測試類路徑中的 junit-platform.properties 檔案中。從 3.0.10 版本開始,broker 預設將此設定為 spring.kafka.bootstrap-servers,用於使用 Spring Boot 應用程式進行測試。

建議不要在單個測試套件中結合使用全域性嵌入式 Kafka 和每個測試類。兩者共享相同的系統屬性,因此很可能導致意外行為。
spring-kafka-testjunit-jupiter-apijunit-platform-launcher(後者用於支援全域性嵌入式代理)具有傳遞依賴。如果您希望使用嵌入式代理且不使用 JUnit,則可能需要排除這些依賴。

@EmbeddedKafka 註解

我們通常建議您使用單個代理例項,以避免在測試之間啟動和停止代理(併為每個測試使用不同的主題)。從 2.0 版本開始,如果您使用 Spring 的測試應用程式上下文快取,您還可以宣告一個 EmbeddedKafkaBroker bean,這樣單個代理就可以跨多個測試類使用。為了方便起見,我們提供了一個名為 @EmbeddedKafka 的測試類級別註解來註冊 EmbeddedKafkaBroker bean。以下示例顯示瞭如何使用它

@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(partitions = 1,
         topics = {
                 KafkaStreamsTests.STREAMING_TOPIC1,
                 KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    void someTest() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<Integer, String> consumer = cf.createConsumer();
        this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
        ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
        assertThat(replies.count()).isGreaterThanOrEqualTo(1);
    }

    @Configuration
    @EnableKafkaStreams
    public static class TestKafkaStreamsConfiguration {

        @Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
        private String brokerAddresses;

        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration kStreamsConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
            return new KafkaStreamsConfiguration(props);
        }

    }

}

從 2.2.4 版本開始,您還可以使用 @EmbeddedKafka 註解來指定 Kafka 埠屬性。

從 4.0 版本開始,所有與 ZooKeeper 相關的屬性都已從 @EmbeddedKafka 註解中刪除,因為 Kafka 4.0 僅使用 KRaft。

以下示例設定了 @EmbeddedKafkatopicsbrokerPropertiesbrokerPropertiesLocation 屬性,支援屬性佔位符解析

@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
        brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
                            "listeners=PLAINTEXT://:${kafka.broker.port}",
                            "auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
        brokerPropertiesLocation = "classpath:/broker.properties")

在前面的示例中,屬性佔位符 ${kafka.topics.another-topic}${kafka.broker.logs-dir}${kafka.broker.port} 從 Spring Environment 解析。此外,代理屬性從 brokerPropertiesLocation 指定的 broker.properties 類路徑資源載入。屬性佔位符會為 brokerPropertiesLocation URL 和資源中找到的任何屬性佔位符解析。由 brokerProperties 定義的屬性會覆蓋 brokerPropertiesLocation 中找到的屬性。

您可以將 @EmbeddedKafka 註解與 JUnit Jupiter 一起使用。

@EmbeddedKafka 註解與 JUnit Jupiter

從 2.3 版本開始,有兩種方法可以將 @EmbeddedKafka 註解與 JUnit Jupiter 結合使用。當與 @SpringJunitConfig 註解一起使用時,嵌入式代理被新增到測試應用程式上下文中。您可以將代理自動注入到您的測試中,在類級別或方法級別,以獲取代理地址列表。

使用 spring 測試上下文時,EmbdeddedKafkaCondition 會建立一個代理;該條件包含一個引數解析器,因此您可以在測試方法中訪問該代理。

@EmbeddedKafka
public class EmbeddedKafkaConditionTests {

    @Test
    public void test(EmbeddedKafkaBroker broker) {
        String brokerList = broker.getBrokersAsString();
        ...
    }

}

除非帶有 @EmbeddedKafka 註解的類也帶有 ExtendWith(SpringExtension.class) 註解(或元註解),否則將建立一個獨立代理(在 Spring 的 TestContext 之外)。@SpringJunitConfig@SpringBootTest 都是元註解的,當這些註解中的任何一個也存在時,將使用基於上下文的代理。

當存在 Spring 測試應用程式上下文時,主題和代理屬性可以包含屬性佔位符,只要屬性在某處定義,這些佔位符就會被解析。如果 Spring 上下文不可用,這些佔位符將不會被解析。

@SpringBootTest 註解中的嵌入式 Broker

Spring Initializr 現在會自動將 spring-kafka-test 依賴項以測試範圍新增到專案配置中。

如果您的應用程式在 spring-cloud-stream 中使用 Kafka 繫結器,並且您想在測試中使用嵌入式 broker,則必須刪除 spring-cloud-stream-test-support 依賴項,因為它用測試繫結器替換了真正的繫結器。如果您希望某些測試使用測試繫結器而某些測試使用嵌入式 broker,則使用真實繫結器的測試需要透過在測試類中排除繫結器自動配置來停用測試繫結器。以下示例顯示瞭如何操作

@SpringJUnitConfig
@SpringBootTest(properties = "spring.autoconfigure.exclude="
    + "org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration")
public class MyApplicationTests {
    ...
}

在 Spring Boot 應用程式測試中使用嵌入式代理有幾種方法。

它們包括

@EmbeddedKafka@SpringJunitConfig

當將 @EmbeddedKafka@SpringJUnitConfig 結合使用時,建議在測試類上使用 @DirtiesContext。這是為了防止在測試套件中執行多個測試後,JVM 關閉期間發生潛在的競態條件。例如,如果不使用 @DirtiesContextEmbeddedKafkaBroker 可能會提前關閉,而應用程式上下文仍然需要它的資源。由於每個 EmbeddedKafka 測試執行都會建立自己的臨時目錄,當發生這種競態條件時,它會產生錯誤日誌訊息,指示它嘗試刪除或清理的檔案不再可用。新增 @DirtiesContext 將確保在每個測試後清理應用程式上下文並且不被快取,從而使其不易受到此類潛在資源競態條件的影響。

@EmbeddedKafka 註解或 EmbeddedKafkaBroker Bean

以下示例顯示瞭如何使用 @EmbeddedKafka 註解建立嵌入式代理

@SpringJUnitConfig
@EmbeddedKafka(topics = "someTopic",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers") // this is now the default
public class MyApplicationTests {

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    void test() {
        ...
    }

}
從 3.0.10 版本開始,bootstrapServersProperty 預設自動設定為 spring.kafka.bootstrap-servers

Hamcrest 匹配器

org.springframework.kafka.test.hamcrest.KafkaMatchers 提供以下匹配器

/**
 * @param key the key
 * @param <K> the type.
 * @return a Matcher that matches the key in a consumer record.
 */
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Matcher that matches the value in a consumer record.
 */
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }

/**
 * @param partition the partition.
 * @return a Matcher that matches the partition in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
 * {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
 *
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
  return hasTimestamp(TimestampType.CREATE_TIME, ts);
}

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord}
 * @param type timestamp type of the record
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
  return new ConsumerRecordTimestampMatcher(type, ts);
}

AssertJ 條件

您可以使用以下 AssertJ 條件

/**
 * @param key the key
 * @param <K> the type.
 * @return a Condition that matches the key in a consumer record.
 */
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Condition that matches the value in a consumer record.
 */
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }

/**
 * @param key the key.
 * @param value the value.
 * @param <K> the key type.
 * @param <V> the value type.
 * @return a Condition that matches the key in a consumer record.
 * @since 2.2.12
 */
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }

/**
 * @param partition the partition.
 * @return a Condition that matches the partition in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }

/**
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
  return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}

/**
 * @param type the type of timestamp
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
  return new ConsumerRecordTimestampCondition(type, value);
}

示例

以下示例彙集了本章涵蓋的大部分主題

@EmbeddedKafka(topics = KafkaTemplateTests.TEMPLATE_TOPIC)
public class KafkaTemplateTests {

    public static final String TEMPLATE_TOPIC = "templateTopic";
    public static EmbeddedKafkaBroker embeddedKafka;

    @BeforeAll
	public static void setUp() {
		embeddedKafka = EmbeddedKafkaCondition.getBroker();
	}

    @Test
    public void testTemplate() throws Exception {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
            embeddedKafka);
        DefaultKafkaConsumerFactory<Integer, String> cf =
                            new DefaultKafkaConsumerFactory<>(consumerProps);
        ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
        KafkaMessageListenerContainer<Integer, String> container =
                            new KafkaMessageListenerContainer<>(cf, containerProperties);
        final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
        container.setupMessageListener(new MessageListener<Integer, String>() {

            @Override
            public void onMessage(ConsumerRecord<Integer, String> record) {
                System.out.println(record);
                records.add(record);
            }

        });
        container.setBeanName("templateTests");
        container.start();
        ContainerTestUtils.waitForAssignment(container,
                            embeddedKafka.getPartitionsPerTopic());
        Map<String, Object> producerProps =
                            KafkaTestUtils.producerProps(embeddedKafka);
        ProducerFactory<Integer, String> pf =
                            new DefaultKafkaProducerFactory<>(producerProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
        template.setDefaultTopic(TEMPLATE_TOPIC);
        template.sendDefault("foo");
        assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
        template.sendDefault(0, 2, "bar");
        ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("bar"));
        template.send(TEMPLATE_TOPIC, 0, 2, "baz");
        received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("baz"));
    }
}

前面的示例使用了 Hamcrest 匹配器。使用 AssertJ,最後一部分看起來像以下程式碼

assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));

Mock 消費者和生產者

kafka-clients 庫提供了 MockConsumerMockProducer 類用於測試目的。

如果您希望在某些測試中將這些類分別與監聽器容器或 KafkaTemplate 一起使用,從 3.0.7 版本開始,框架現在提供了 MockConsumerFactoryMockProducerFactory 實現。

這些工廠可以在監聽器容器和模板中使用,而不是預設的工廠,後者需要一個執行中的(或嵌入式)代理。

以下是一個返回單個消費者的簡單實現示例

@Bean
ConsumerFactory<String, String> consumerFactory() {
    MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    TopicPartition topicPartition0 = new TopicPartition("topic", 0);
    List<TopicPartition> topicPartitions = Collections.singletonList(topicPartition0);
    Map<TopicPartition, Long> beginningOffsets = topicPartitions.stream().collect(Collectors
            .toMap(Function.identity(), tp -> 0L));
    consumer.updateBeginningOffsets(beginningOffsets);
    consumer.schedulePollTask(() -> {
        consumer.addRecord(
                new ConsumerRecord<>("topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test1",
                        new RecordHeaders(), Optional.empty()));
        consumer.addRecord(
                new ConsumerRecord<>("topic", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test2",
                        new RecordHeaders(), Optional.empty()));
    });
    return new MockConsumerFactory(() -> consumer);
}

如果您希望進行併發測試,工廠建構函式中的 Supplier lambda 需要每次建立一個新例項。

對於 MockProducerFactory,有兩個建構函式;一個用於建立簡單工廠,另一個用於建立支援事務的工廠。

以下是示例

@Bean
ProducerFactory<String, String> nonTransFactory() {
    return new MockProducerFactory<>(() ->
            new MockProducer<>(true, new StringSerializer(), new StringSerializer()));
}

@Bean
ProducerFactory<String, String> transFactory() {
    MockProducer<String, String> mockProducer =
            new MockProducer<>(true, new StringSerializer(), new StringSerializer());
    mockProducer.initTransactions();
    return new MockProducerFactory<String, String>((tx, id) -> mockProducer, "defaultTxId");
}

請注意,在第二種情況下,lambda 是一個 BiFunction<Boolean, String>,其中第一個引數為 true,如果呼叫方需要事務性生產者;可選的第二個引數包含事務 ID。這可以是預設值(如建構函式中提供的),也可以由 KafkaTransactionManager(或用於本地事務的 KafkaTemplate)覆蓋,如果已配置。提供事務 ID 以防您希望基於此值使用不同的 MockProducer

如果您在多執行緒環境中使用生產者,BiFunction 應該返回多個生產者(可能使用 ThreadLocal 繫結到執行緒)。

事務性 MockProducer 必須透過呼叫 initTransaction() 進行事務初始化。

使用 MockProducer 時,如果您不想在每次傳送後關閉生產者,那麼您可以提供一個自定義的 MockProducer 實現,它覆蓋了 close 方法,並且不呼叫父類的 close 方法。這對於測試來說很方便,可以在不關閉生產者的情況下驗證對同一個生產者的多次釋出。

這是一個例子

@Bean
MockProducer<String, String> mockProducer() {
    return new MockProducer<>(false, new StringSerializer(), new StringSerializer()) {
        @Override
        public void close() {

        }
    };
}

@Bean
ProducerFactory<String, String> mockProducerFactory(MockProducer<String, String> mockProducer) {
    return new MockProducerFactory<>(() -> mockProducer);
}
© . This site is unofficial and not affiliated with VMware.