Hazelcast 支援

Spring Integration 提供通道介面卡和其他實用元件,用於與記憶體資料網格 Hazelcast 互動。

專案需要此依賴項

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-hazelcast</artifactId>
    <version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-hazelcast:7.0.0"

Hazelcast 元件的 XML 名稱空間和 schemaLocation 定義是

xmlns:int-hazelcast="http://www.springframework.org/schema/integration/hazelcast"
xsi:schemaLocation="http://www.springframework.org/schema/integration/hazelcast
          https://www.springframework.org/schema/integration/hazelcast/spring-integration-hazelcast.xsd"

Hazelcast 事件驅動入站通道介面卡

Hazelcast 提供分散式資料結構,例如

  • com.hazelcast.map.IMap

  • com.hazelcast.multimap.MultiMap

  • com.hazelcast.collection.IList

  • com.hazelcast.collection.ISet

  • com.hazelcast.collection.IQueue

  • com.hazelcast.topic.ITopic

  • com.hazelcast.replicatedmap.ReplicatedMap

它還提供事件監聽器,用於監聽對這些資料結構的修改。

  • com.hazelcast.core.EntryListener<K, V>

  • com.hazelcast.collection.ItemListener

  • com.hazelcast.topic.MessageListener

Hazelcast 事件驅動入站通道介面卡監聽相關的快取事件,並將事件訊息傳送到定義的通道。它支援 XML 和 JavaConfig 驅動的配置。

XML 配置

<int-hazelcast:inbound-channel-adapter channel="mapChannel"
                      cache="map"
                      cache-events="UPDATED, REMOVED"
                      cache-listening-policy="SINGLE" />

Hazelcast 事件驅動入站通道介面卡需要以下屬性

  • channel:指定訊息傳送到的通道;

  • cache:指定被監聽的分散式物件引用。這是一個強制屬性;

  • cache-events:指定被監聽的快取事件。這是一個可選屬性,其預設值為 ADDED。其支援的值如下

  • IMapMultiMap 支援的快取事件型別:ADDEDREMOVEDUPDATEDEVICTEDEVICT_ALLCLEAR_ALL

  • ReplicatedMap 支援的快取事件型別:ADDEDREMOVEDUPDATEDEVICTED

  • IListISetIQueue 支援的快取事件型別:ADDEDREMOVEDITopic 沒有快取事件型別。

  • cache-listening-policy:指定快取監聽策略為 SINGLEALL。這是一個可選屬性,其預設值為 SINGLE。每個監聽相同快取物件且具有相同 cache-events 屬性的 Hazelcast 入站通道介面卡可以接收單個事件訊息或所有事件訊息。如果設定為 ALL,所有監聽相同快取物件且具有相同 cache-events 屬性的 Hazelcast 入站通道介面卡將接收所有事件訊息。如果設定為 SINGLE,它們將接收唯一的事件訊息。

一些配置示例

分散式 Map
<int:channel id="mapChannel"/>

<int-hazelcast:inbound-channel-adapter channel="mapChannel"
                              cache="map"
                              cache-events="UPDATED, REMOVED" />

<bean id="map" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="map"/>
</bean>

<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>
分散式 MultiMap
<int-hazelcast:inbound-channel-adapter channel="multiMapChannel"
                              cache="multiMap"
                              cache-events="ADDED, REMOVED, CLEAR_ALL" />

<bean id="multiMap" factory-bean="instance" factory-method="getMultiMap">
    <constructor-arg value="multiMap"/>
</bean>
分散式 List
<int-hazelcast:inbound-channel-adapter  channel="listChannel"
                               cache="list"
                               cache-events="ADDED, REMOVED"
                               cache-listening-policy="ALL" />

<bean id="list" factory-bean="instance" factory-method="getList">
    <constructor-arg value="list"/>
</bean>
分散式 Set
<int-hazelcast:inbound-channel-adapter channel="setChannel" cache="set" />

<bean id="set" factory-bean="instance" factory-method="getSet">
    <constructor-arg value="set"/>
</bean>
分散式 Queue
<int-hazelcast:inbound-channel-adapter  channel="queueChannel"
                               cache="queue"
                               cache-events="REMOVED"
                               cache-listening-policy="ALL" />

<bean id="queue" factory-bean="instance" factory-method="getQueue">
    <constructor-arg value="queue"/>
</bean>
分散式 Topic
<int-hazelcast:inbound-channel-adapter channel="topicChannel" cache="topic" />

<bean id="topic" factory-bean="instance" factory-method="getTopic">
    <constructor-arg value="topic"/>
</bean>
複製 Map
<int-hazelcast:inbound-channel-adapter channel="replicatedMapChannel"
                              cache="replicatedMap"
                              cache-events="ADDED, UPDATED, REMOVED"
                              cache-listening-policy="SINGLE"  />

<bean id="replicatedMap" factory-bean="instance" factory-method="getReplicatedMap">
    <constructor-arg value="replicatedMap"/>
</bean>

Java 配置示例

以下示例展示了 DistributedMap 配置。相同的配置可用於其他分散式資料結構(IMapMultiMapReplicatedMapIListISetIQueueITopic

@Bean
public PollableChannel distributedMapChannel() {
    return new QueueChannel();
}

@Bean
public IMap<Integer, String> distributedMap() {
    return hazelcastInstance().getMap("Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() {
    final HazelcastEventDrivenMessageProducer producer = new HazelcastEventDrivenMessageProducer(distributedMap());
    producer.setOutputChannel(distributedMapChannel());
    producer.setCacheEventTypes("ADDED,REMOVED,UPDATED,CLEAR_ALL");
    producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);

    return producer;
}

Hazelcast 連續查詢入站通道介面卡

Hazelcast 連續查詢允許監聽對特定 Map 條目執行的修改。Hazelcast 連續查詢入站通道介面卡是一個事件驅動的通道介面卡,它根據定義的謂詞監聽相關的分散式 Map 事件。

  • Java

  • XML

@Bean
public PollableChannel cqDistributedMapChannel() {
    return new QueueChannel();
}

@Bean
public IMap<Integer, String> cqDistributedMap() {
    return hazelcastInstance().getMap("CQ_Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastContinuousQueryMessageProducer hazelcastContinuousQueryMessageProducer() {
    final HazelcastContinuousQueryMessageProducer producer =
        new HazelcastContinuousQueryMessageProducer(cqDistributedMap(), "surname=TestSurname");
    producer.setOutputChannel(cqDistributedMapChannel());
    producer.setCacheEventTypes("UPDATED");
    producer.setIncludeValue(false);

    return producer;
}
<int:channel id="cqMapChannel"/>

<int-hazelcast:cq-inbound-channel-adapter
                channel="cqMapChannel"
                cache="cqMap"
                cache-events="UPDATED, REMOVED"
                predicate="name=TestName AND surname=TestSurname"
                include-value="true"
                cache-listening-policy="SINGLE"/>

<bean id="cqMap" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="cqMap"/>
</bean>

<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>

它支援以下六個屬性

  • channel:指定訊息傳送到的通道;

  • cache:指定被監聽的分散式 Map 引用。強制;

  • cache-events:指定被監聽的快取事件。可選屬性,預設值為 ADDED。支援的值有 ADDEDREMOVEDUPDATEDEVICTEDEVICT_ALLCLEAR_ALL

  • predicate:指定一個謂詞以監聽對特定 Map 條目執行的修改。強制;

  • include-value:指定在連續查詢結果中包含值和舊值。可選,預設值為 true

  • cache-listening-policy:指定快取監聽策略為 SINGLEALL。可選,預設值為 SINGLE。每個監聽相同快取物件且具有相同 cache-events 屬性的 Hazelcast CQ 入站通道介面卡可以接收單個事件訊息或所有事件訊息。如果設定為 ALL,所有監聽相同快取物件且具有相同 cache-events 屬性的 Hazelcast CQ 入站通道介面卡將接收所有事件訊息。如果設定為 SINGLE,它們將接收唯一的事件訊息。

Hazelcast 叢集監控入站通道介面卡

Hazelcast 叢集監控器支援監聽對叢集執行的修改。Hazelcast 叢集監控入站通道介面卡是一個事件驅動的通道介面卡,它監聽相關的成員資格、分散式物件、遷移、生命週期和客戶端事件

  • Java

  • XML

@Bean
public PollableChannel eventChannel() {
    return new QueueChannel();
}

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastClusterMonitorMessageProducer hazelcastClusterMonitorMessageProducer() {
    HazelcastClusterMonitorMessageProducer producer = new HazelcastClusterMonitorMessageProducer(hazelcastInstance());
    producer.setOutputChannel(eventChannel());
    producer.setMonitorEventTypes("DISTRIBUTED_OBJECT");

    return producer;
}
<int:channel id="monitorChannel"/>

<int-hazelcast:cm-inbound-channel-adapter
                 channel="monitorChannel"
                 hazelcast-instance="instance"
                 monitor-types="MEMBERSHIP, DISTRIBUTED_OBJECT" />

<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>

它支援以下三個屬性

  • channel:指定訊息傳送到的通道;

  • hazelcast-instance:指定用於監聽叢集事件的 Hazelcast 例項引用。這是一個強制屬性;

  • monitor-types:指定要監聽的監控型別。這是一個可選屬性,預設值為 MEMBERSHIP。支援的值有 MEMBERSHIPDISTRIBUTED_OBJECTMIGRATIONLIFECYCLECLIENT

Hazelcast 分散式 SQL 入站通道介面卡

Hazelcast 允許在分散式 Map 上執行分散式查詢。Hazelcast 分散式 SQL 入站通道介面卡是一個輪詢入站通道介面卡。它執行定義的分散式 SQL 命令,並根據迭代型別返回結果。

  • Java

  • XML

@Bean
public PollableChannel dsDistributedMapChannel() {
    return new QueueChannel();
}

@Bean
public IMap<Integer, String> dsDistributedMap() {
    return hazelcastInstance().getMap("DS_Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
@InboundChannelAdapter(value = "dsDistributedMapChannel", poller = @Poller(maxMessagesPerPoll = "1"))
public HazelcastDistributedSQLMessageSource hazelcastDistributedSQLMessageSource() {
    final HazelcastDistributedSQLMessageSource messageSource =
        new HazelcastDistributedSQLMessageSource(dsDistributedMap(),
            "name='TestName' AND surname='TestSurname'");
    messageSource.setIterationType(DistributedSQLIterationType.ENTRY);

    return messageSource;
}
<int:channel id="dsMapChannel"/>

<int-hazelcast:ds-inbound-channel-adapter
            channel="dsMapChannel"
            cache="dsMap"
            iteration-type="ENTRY"
            distributed-sql="active=false OR age >= 25 OR name = 'TestName'">
    <int:poller fixed-delay="100"/>
</int-hazelcast:ds-inbound-channel-adapter>

<bean id="dsMap" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="dsMap"/>
</bean>

<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>

它需要一個輪詢器,並支援四個屬性

  • channel:指定訊息傳送到的通道。這是一個強制屬性;

  • cache:指定要查詢的分散式 IMap 引用。這是一個強制屬性;

  • iteration-type:指定結果型別。分散式 SQL 可以在 EntrySetKeySetLocalKeySetValues 上執行。這是一個可選屬性,預設值為 VALUE。支援的值有 ENTRYKEYLOCAL_KEYVALUE

  • distributed-sql:指定 SQL 語句的 where 子句。這是一個強制屬性。

Hazelcast 出站通道介面卡

Hazelcast 出站通道介面卡監聽其定義的通道,並將傳入訊息寫入相關的分散式快取。它需要 cachecache-expressionHazelcastHeaders.CACHE_NAME 中的一個用於分散式物件定義。支援的分散式物件有:IMapMultiMapReplicatedMapIListISetIQueueITopic

  • Java

  • XML

@Bean
public MessageChannel distributedMapChannel() {
    return new DirectChannel();
}

@Bean
public IMap<Integer, String> distributedMap() {
    return hzInstance().getMap("Distributed_Map");
}

@Bean
public HazelcastInstance hzInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
@ServiceActivator(inputChannel = "distributedMapChannel")
public HazelcastCacheWritingMessageHandler hazelcastCacheWritingMessageHandler() {
    HazelcastCacheWritingMessageHandler handler = new HazelcastCacheWritingMessageHandler();
    handler.setDistributedObject(distributedMap());
    handler.setKeyExpression(new SpelExpressionParser().parseExpression("payload.id"));
    handler.setExtractPayload(true);
    return handler;
}
<int-hazelcast:outbound-channel-adapter channel="mapChannel"
                    cache-expression="headers['CACHE_HEADER']"
                    key-expression="payload.key"
                    extract-payload="true"/>

它需要以下屬性

  • channel:指定訊息傳送到的通道;

  • cache:指定分散式物件引用。可選;

  • cache-expression:透過 Spring 表示式語言 (SpEL) 指定分散式物件。可選;

  • key-expression:透過 Spring 表示式語言 (SpEL) 指定鍵值對的鍵。可選,僅對 IMapMultiMapReplicatedMap 分散式資料結構是必需的。

  • extract-payload:指定是傳送整個訊息還是僅傳送有效負載。可選屬性,預設值為 true。如果為 true,則只有有效負載會被寫入分散式物件。否則,透過轉換訊息頭和有效負載,整個訊息將被寫入。

透過在訊息頭中設定分散式物件名稱,訊息可以透過同一通道寫入不同的分散式物件。如果未定義 cachecache-expression 屬性,則必須在請求 Message 中設定 HazelcastHeaders.CACHE_NAME 訊息頭。

Hazelcast 訊息儲存

對於分散式訊息狀態管理,例如持久化 QueueChannel 或跟蹤 Aggregator 訊息組,提供了 HazelcastMessageStore 實現

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public MessageGroupStore messageStore() {
    return new HazelcastMessageStore(hazelcastInstance());
}

預設情況下,使用 SPRING_INTEGRATION_MESSAGE_STORE IMap 作為鍵/值儲存訊息和組。可以向 HazelcastMessageStore 提供任何自定義的 IMap

Hazelcast 元資料儲存

可以使用支援的 Hazelcast IMap 來實現 ListenableMetadataStore。預設的 Map 名稱為 SPRING_INTEGRATION_METADATA_STORE,該名稱可以自定義。

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public MetadataStore metadataStore() {
    return new HazelcastMetadataStore(hazelcastInstance());
}

HazelcastMetadataStore 實現了 ListenableMetadataStore,允許您註冊自己的 MetadataStoreListener 型別的監聽器,透過 addListener(MetadataStoreListener callback) 監聽事件。

帶有 Hazelcast 的訊息通道

Hazelcast 的 IQueueITopic 分散式物件本質上是訊息傳遞原語,可以在此 Hazelcast 模組中無需額外實現即可與 Spring Integration 核心元件一起使用。

QueueChannel 可以由任何 java.util.Queue 提供,包括上述 Hazelcast 分散式 IQueue

@Bean
PollableChannel hazelcastQueueChannel(HazelcastInstance hazelcastInstance) {
    return new QueueChannel(hazelcastInstance.getQueue("springIntegrationQueue"));
}

將此配置放置在應用程式的 Hazelcast 叢集的多個節點上將使 QueueChannel 成為分散式,並且只有一個節點能夠從該 IQueue 輪詢單個 Message。這類似於 PollableJmsChannelPollableKafkaChannelPollableAmqpChannel 的工作方式。

如果生產者端不是 Spring Integration 應用程式,則無法配置 QueueChannel,因此使用純 Hazelcast IQueue API 來生成資料。在這種情況下,QueueChannel 方法在消費者端是錯誤的:必須使用入站通道介面卡解決方案來代替

@Bean
public IQueue<String> myStringHzQueue(HazelcastInstance hazelcastInstance) {
    return hazelcastInstance.getQueue("springIntegrationQueue");
}

@Bean
@InboundChannelAdapter(channel = "stringValuesFromHzQueueChannel")
Supplier<String> fromHzIQueueSource(IQueue<String> myStringHzQueue) {
    return myStringHzQueue::poll;
}

Hazelcast 中的 ITopic 抽象與 JMS 中的 Topic 具有相似的語義:所有訂閱者都接收已釋出的訊息。透過一對簡單的 MessageChannel bean,此機制作為開箱即用的功能得到支援

@Bean
public ITopic<Message<?>> springIntegrationTopic(HazelcastInstance hazelcastInstance,
        MessageChannel fromHazelcastTopicChannel) {

    ITopic<Message<?>> topic = hazelcastInstance.getTopic("springIntegrationTopic");
	topic.addMessageListener(m -> fromHazelcastTopicChannel.send(m.getMessageObject()));
	return topic;
}

@Bean
public MessageChannel publishToHazelcastTopicChannel(ITopic<Message<?>> springIntegrationTopic) {
    return new FixedSubscriberChannel(springIntegrationTopic::publish);
}

@Bean
public MessageChannel fromHazelcastTopicChannel() {
    return new DirectChannel();
}

FixedSubscriberChannelDirectChannel 的最佳化變體,它在初始化時需要一個 MessageHandler。由於 MessageHandler 是一個函式式介面,因此可以為 handleMessage 方法提供一個簡單的 lambda 表示式。當訊息傳送到 publishToHazelcastTopicChannel 時,它只是釋出到 Hazelcast ITopiccom.hazelcast.topic.MessageListener 也是一個函式式介面,因此可以為 ITopic#addMessageListener 提供一個 lambda 表示式。這樣,fromHazelcastTopicChannel 的訂閱者將消費所有傳送到上述 ITopic 的訊息。

ExecutorChannel 可以與 IExecutorService 一起提供。例如,透過相應的配置可以實現叢集範圍的單例

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance(
                new Config()
                    .addExecutorConfig(new ExecutorConfig()
                         .setName("singletonExecutor")
                         .setPoolSize(1)));
}

@Bean
public MessageChannel hazelcastSingletonExecutorChannel(HazelcastInstance hazelcastInstance) {
    return new ExecutorChannel(hazelcastInstance.getExecutorService("singletonExecutor"));
}
© . This site is unofficial and not affiliated with VMware.