Hazelcast 支援
Spring Integration 提供了通道介面卡及其他實用元件,用於與記憶體資料網格 Hazelcast 進行互動。
您需要將此依賴新增到您的專案
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-hazelcast</artifactId>
<version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-hazelcast:6.4.4"
Hazelcast 元件的 XML 名稱空間和 schemaLocation 定義為
xmlns:int-hazelcast="http://www.springframework.org/schema/integration/hazelcast"
xsi:schemaLocation="http://www.springframework.org/schema/integration/hazelcast
https://www.springframework.org/schema/integration/hazelcast/spring-integration-hazelcast.xsd"
Hazelcast 事件驅動入站通道介面卡
Hazelcast 提供了分散式資料結構,例如
-
com.hazelcast.map.IMap
-
com.hazelcast.multimap.MultiMap
-
com.hazelcast.collection.IList
-
com.hazelcast.collection.ISet
-
com.hazelcast.collection.IQueue
-
com.hazelcast.topic.ITopic
-
com.hazelcast.replicatedmap.ReplicatedMap
它還提供了事件監聽器,用於監聽對這些資料結構的修改。
-
com.hazelcast.core.EntryListener<K, V>
-
com.hazelcast.collection.ItemListener
-
com.hazelcast.topic.MessageListener
Hazelcast 事件驅動入站通道介面卡監聽相關的快取事件,並將事件訊息傳送到指定的通道。它支援 XML 和 JavaConfig 驅動的配置方式。
XML 配置
<int-hazelcast:inbound-channel-adapter channel="mapChannel"
cache="map"
cache-events="UPDATED, REMOVED"
cache-listening-policy="SINGLE" />
Hazelcast 事件驅動入站通道介面卡需要以下屬性
-
channel
: 指定傳送訊息的通道; -
cache
: 指定監聽的分散式物件引用。這是必填屬性; -
cache-events
: 指定監聽的快取事件。這是可選屬性,預設值為ADDED
。支援的值如下 -
IMap
和MultiMap
支援的快取事件型別:ADDED
,REMOVED
,UPDATED
,EVICTED
,EVICT_ALL
和CLEAR_ALL
; -
ReplicatedMap
支援的快取事件型別:ADDED
,REMOVED
,UPDATED
,EVICTED
; -
IList
,ISet
和IQueue
支援的快取事件型別:ADDED
,REMOVED
。ITopic
沒有快取事件型別。 -
cache-listening-policy
: 指定快取監聽策略為SINGLE
或ALL
。這是可選屬性,預設值為SINGLE
。每個監聽相同快取物件和相同cache-events
屬性的 Hazelcast 入站通道介面卡,可以接收單個事件訊息或所有事件訊息。如果設定為ALL
,則監聽相同快取物件和相同cache-events
屬性的所有 Hazelcast 入站通道介面卡都將接收所有事件訊息。如果設定為SINGLE
,它們將接收唯一的事件訊息。
一些配置示例
<int:channel id="mapChannel"/>
<int-hazelcast:inbound-channel-adapter channel="mapChannel"
cache="map"
cache-events="UPDATED, REMOVED" />
<bean id="map" factory-bean="instance" factory-method="getMap">
<constructor-arg value="map"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
<int-hazelcast:inbound-channel-adapter channel="multiMapChannel"
cache="multiMap"
cache-events="ADDED, REMOVED, CLEAR_ALL" />
<bean id="multiMap" factory-bean="instance" factory-method="getMultiMap">
<constructor-arg value="multiMap"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="listChannel"
cache="list"
cache-events="ADDED, REMOVED"
cache-listening-policy="ALL" />
<bean id="list" factory-bean="instance" factory-method="getList">
<constructor-arg value="list"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="setChannel" cache="set" />
<bean id="set" factory-bean="instance" factory-method="getSet">
<constructor-arg value="set"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="queueChannel"
cache="queue"
cache-events="REMOVED"
cache-listening-policy="ALL" />
<bean id="queue" factory-bean="instance" factory-method="getQueue">
<constructor-arg value="queue"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="topicChannel" cache="topic" />
<bean id="topic" factory-bean="instance" factory-method="getTopic">
<constructor-arg value="topic"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="replicatedMapChannel"
cache="replicatedMap"
cache-events="ADDED, UPDATED, REMOVED"
cache-listening-policy="SINGLE" />
<bean id="replicatedMap" factory-bean="instance" factory-method="getReplicatedMap">
<constructor-arg value="replicatedMap"/>
</bean>
Java 配置示例
以下示例展示了 DistributedMap
配置。相同的配置可用於其他分散式資料結構(IMap
, MultiMap
, ReplicatedMap
, IList
, ISet
, IQueue
和 ITopic
)
@Bean
public PollableChannel distributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> distributedMap() {
return hazelcastInstance().getMap("Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() {
final HazelcastEventDrivenMessageProducer producer = new HazelcastEventDrivenMessageProducer(distributedMap());
producer.setOutputChannel(distributedMapChannel());
producer.setCacheEventTypes("ADDED,REMOVED,UPDATED,CLEAR_ALL");
producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);
return producer;
}
Hazelcast 連續查詢入站通道介面卡
Hazelcast 連續查詢允許監聽對特定 Map 條目執行的修改。Hazelcast 連續查詢入站通道介面卡是一種事件驅動的通道介面卡,它根據定義的謂詞監聽相關的分散式 Map 事件。
-
Java
-
XML
@Bean
public PollableChannel cqDistributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> cqDistributedMap() {
return hazelcastInstance().getMap("CQ_Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastContinuousQueryMessageProducer hazelcastContinuousQueryMessageProducer() {
final HazelcastContinuousQueryMessageProducer producer =
new HazelcastContinuousQueryMessageProducer(cqDistributedMap(), "surname=TestSurname");
producer.setOutputChannel(cqDistributedMapChannel());
producer.setCacheEventTypes("UPDATED");
producer.setIncludeValue(false);
return producer;
}
<int:channel id="cqMapChannel"/>
<int-hazelcast:cq-inbound-channel-adapter
channel="cqMapChannel"
cache="cqMap"
cache-events="UPDATED, REMOVED"
predicate="name=TestName AND surname=TestSurname"
include-value="true"
cache-listening-policy="SINGLE"/>
<bean id="cqMap" factory-bean="instance" factory-method="getMap">
<constructor-arg value="cqMap"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
它支援以下六個屬性
-
channel
: 指定傳送訊息的通道; -
cache
: 指定監聽的分散式 Map 引用。必填; -
cache-events
: 指定監聽的快取事件。可選屬性,預設值為ADDED
。支援的值包括ADDED
,REMOVED
,UPDATED
,EVICTED
,EVICT_ALL
和CLEAR_ALL
; -
predicate
: 指定用於監聽特定 Map 條目修改的謂詞。必填; -
include-value
: 指定在連續查詢結果中包含 value 和 oldValue。可選,預設值為true
; -
cache-listening-policy
: 指定快取監聽策略為SINGLE
或ALL
。可選,預設值為SINGLE
。每個監聽相同快取物件和相同cache-events
屬性的 Hazelcast CQ 入站通道介面卡,可以接收單個事件訊息或所有事件訊息。如果設定為ALL
,則監聽相同快取物件和相同cache-events
屬性的所有 Hazelcast CQ 入站通道介面卡都將接收所有事件訊息。如果設定為SINGLE
,它們將接收唯一的事件訊息。
Hazelcast 叢集監控入站通道介面卡
Hazelcast 叢集監控支援監聽對叢集執行的修改。Hazelcast 叢集監控入站通道介面卡是一種事件驅動的通道介面卡,監聽相關的 Membership, Distributed Object, Migration, Lifecycle 和 Client 事件
-
Java
-
XML
@Bean
public PollableChannel eventChannel() {
return new QueueChannel();
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastClusterMonitorMessageProducer hazelcastClusterMonitorMessageProducer() {
HazelcastClusterMonitorMessageProducer producer = new HazelcastClusterMonitorMessageProducer(hazelcastInstance());
producer.setOutputChannel(eventChannel());
producer.setMonitorEventTypes("DISTRIBUTED_OBJECT");
return producer;
}
<int:channel id="monitorChannel"/>
<int-hazelcast:cm-inbound-channel-adapter
channel="monitorChannel"
hazelcast-instance="instance"
monitor-types="MEMBERSHIP, DISTRIBUTED_OBJECT" />
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
它支援以下三個屬性
-
channel
: 指定傳送訊息的通道; -
hazelcast-instance
: 指定用於監聽叢集事件的 Hazelcast Instance 引用。這是必填屬性; -
monitor-types
: 指定監聽的監控型別。這是可選屬性,預設值為MEMBERSHIP
。支援的值包括MEMBERSHIP
,DISTRIBUTED_OBJECT
,MIGRATION
,LIFECYCLE
,CLIENT
。
Hazelcast 分散式 SQL 入站通道介面卡
Hazelcast 允許在分散式 Map 上執行分散式查詢。Hazelcast 分散式 SQL 入站通道介面卡是一種輪詢入站通道介面卡。它執行定義的 distributed-sql 命令,並根據迭代型別返回結果。
-
Java
-
XML
@Bean
public PollableChannel dsDistributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> dsDistributedMap() {
return hazelcastInstance().getMap("DS_Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
@InboundChannelAdapter(value = "dsDistributedMapChannel", poller = @Poller(maxMessagesPerPoll = "1"))
public HazelcastDistributedSQLMessageSource hazelcastDistributedSQLMessageSource() {
final HazelcastDistributedSQLMessageSource messageSource =
new HazelcastDistributedSQLMessageSource(dsDistributedMap(),
"name='TestName' AND surname='TestSurname'");
messageSource.setIterationType(DistributedSQLIterationType.ENTRY);
return messageSource;
}
<int:channel id="dsMapChannel"/>
<int-hazelcast:ds-inbound-channel-adapter
channel="dsMapChannel"
cache="dsMap"
iteration-type="ENTRY"
distributed-sql="active=false OR age >= 25 OR name = 'TestName'">
<int:poller fixed-delay="100"/>
</int-hazelcast:ds-inbound-channel-adapter>
<bean id="dsMap" factory-bean="instance" factory-method="getMap">
<constructor-arg value="dsMap"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
它需要一個輪詢器,支援以下四個屬性
-
channel
: 指定傳送訊息的通道。這是必填屬性; -
cache
: 指定查詢的分散式IMap
引用。這是必填屬性; -
iteration-type
: 指定結果型別。分散式 SQL 可以在EntrySet
,KeySet
,LocalKeySet
或Values
上執行。這是可選屬性,預設值為VALUE
。支援的值包括ENTRY, `KEY
,LOCAL_KEY
和VALUE
; -
distributed-sql
: 指定 SQL 語句的 where 子句。這是必填屬性。
Hazelcast 出站通道介面卡
Hazelcast 出站通道介面卡監聽其定義的通道,並將傳入訊息寫入相關的分散式快取。它需要 cache
, cache-expression
或 HazelcastHeaders.CACHE_NAME
中的一個來定義分散式物件。支援的分散式物件包括:IMap
, MultiMap
, ReplicatedMap
, IList
, ISet
, IQueue
和 ITopic
。
-
Java
-
XML
@Bean
public MessageChannel distributedMapChannel() {
return new DirectChannel();
}
@Bean
public IMap<Integer, String> distributedMap() {
return hzInstance().getMap("Distributed_Map");
}
@Bean
public HazelcastInstance hzInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
@ServiceActivator(inputChannel = "distributedMapChannel")
public HazelcastCacheWritingMessageHandler hazelcastCacheWritingMessageHandler() {
HazelcastCacheWritingMessageHandler handler = new HazelcastCacheWritingMessageHandler();
handler.setDistributedObject(distributedMap());
handler.setKeyExpression(new SpelExpressionParser().parseExpression("payload.id"));
handler.setExtractPayload(true);
return handler;
}
<int-hazelcast:outbound-channel-adapter channel="mapChannel"
cache-expression="headers['CACHE_HEADER']"
key-expression="payload.key"
extract-payload="true"/>
它需要以下屬性
-
channel
: 指定傳送訊息的通道; -
cache
: 指定分散式物件引用。可選; -
cache-expression
: 透過 Spring Expression Language (SpEL) 指定分散式物件。可選; -
key-expression
: 透過 Spring Expression Language (SpEL) 指定鍵值對的鍵。可選,僅對IMap
,MultiMap
和ReplicatedMap
分散式資料結構需要。 -
extract-payload
: 指定是傳送整個訊息還是僅傳送載荷。可選屬性,預設值為true
。如果為 true,則只將載荷寫入分散式物件。否則,透過轉換訊息頭和載荷來寫入整個訊息。
透過在訊息頭中設定分散式物件名稱,可以透過同一個通道將訊息寫入不同的分散式物件。如果未定義 cache
或 cache-expression
屬性,則必須在請求 Message
中設定 HazelcastHeaders.CACHE_NAME
訊息頭。
Hazelcast 領導者選舉
如果需要領導者選舉(例如對於高可用訊息消費者,只有一個節點應該接收訊息),可以使用基於 Hazelcast 的 LeaderInitiator
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public LeaderInitiator initiator() {
return new LeaderInitiator(hazelcastInstance());
}
當一個節點被選舉為領導者時,它會向所有應用程式監聽器傳送一個 OnGrantedEvent
。
Hazelcast 訊息儲存
對於分散式訊息狀態管理,例如持久化 QueueChannel
或跟蹤 Aggregator
訊息組,提供了 HazelcastMessageStore
實現
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public MessageGroupStore messageStore() {
return new HazelcastMessageStore(hazelcastInstance());
}
預設情況下,使用 SPRING_INTEGRATION_MESSAGE_STORE
IMap
作為鍵/值儲存訊息和組。可以將任何自定義 IMap
提供給 HazelcastMessageStore
。
Hazelcast 元資料儲存
提供了一個 ListenableMetadataStore
的實現,它使用 Hazelcast IMap
作為後端。預設的 Map 名稱為 SPRING_INTEGRATION_METADATA_STORE
,該名稱可以自定義。
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public MetadataStore metadataStore() {
return new HazelcastMetadataStore(hazelcastInstance());
}
HazelcastMetadataStore
實現了 ListenableMetadataStore
介面,允許您註冊自己的 MetadataStoreListener
型別的監聽器,透過 addListener(MetadataStoreListener callback)
監聽事件。
Hazelcast 鎖註冊中心
提供了一個 LockRegistry
的實現,它使用 Hazelcast 分散式 ILock
支援作為後端
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public LockRegistry lockRegistry() {
return new HazelcastLockRegistry(hazelcastInstance());
}
當與共享的 MessageGroupStore
(例如 Aggregator
儲存管理) 一起使用時,可以使用 HazelcastLockRegistry
在多個應用程式例項之間提供此功能,確保在同一時間只有一個例項能夠操作該組。
對於所有分散式操作,必須在 HazelcastInstance 上啟用 CP 子系統。 |
使用 Hazelcast 的訊息通道
Hazelcast 的 IQueue
和 ITopic
分散式物件本質上是訊息傳遞原語,可以在 Spring Integration 核心元件中使用,無需 Hazelcast 模組中的額外實現。
可以透過任何 java.util.Queue
提供 QueueChannel
的支援,包括提到的 Hazelcast 分散式 IQueue
@Bean
PollableChannel hazelcastQueueChannel(HazelcastInstance hazelcastInstance) {
return new QueueChannel(hazelcastInstance.getQueue("springIntegrationQueue"));
}
將此配置放置在應用程式的 Hazelcast 叢集中的多個節點上,將使 QueueChannel
成為分散式,並且只有一個節點能夠從該 IQueue
輪詢單個 Message
。這類似於 PollableJmsChannel
、PollableKafkaChannel
或 PollableAmqpChannel
的工作方式。
如果生產者側不是 Spring Integration 應用程式,則無法配置 QueueChannel
,因此使用原生的 Hazelcast IQueue
API 生成資料。在這種情況下,消費者側使用 QueueChannel
方法是錯誤的:應改為使用 入站通道介面卡 解決方案
@Bean
public IQueue<String> myStringHzQueue(HazelcastInstance hazelcastInstance) {
return hazelcastInstance.getQueue("springIntegrationQueue");
}
@Bean
@InboundChannelAdapter(channel = "stringValuesFromHzQueueChannel")
Supplier<String> fromHzIQueueSource(IQueue<String> myStringHzQueue) {
return myStringHzQueue::poll;
}
Hazelcast 中的 ITopic
抽象在語義上類似於 JMS 中的 Topic
:所有訂閱者都會收到釋出的訊息。透過一對簡單的 MessageChannel
bean,該機制作為開箱即用的功能得到支援
@Bean
public ITopic<Message<?>> springIntegrationTopic(HazelcastInstance hazelcastInstance,
MessageChannel fromHazelcastTopicChannel) {
ITopic<Message<?>> topic = hazelcastInstance.getTopic("springIntegrationTopic");
topic.addMessageListener(m -> fromHazelcastTopicChannel.send(m.getMessageObject()));
return topic;
}
@Bean
public MessageChannel publishToHazelcastTopicChannel(ITopic<Message<?>> springIntegrationTopic) {
return new FixedSubscriberChannel(springIntegrationTopic::publish);
}
@Bean
public MessageChannel fromHazelcastTopicChannel() {
return new DirectChannel();
}
FixedSubscriberChannel
是 DirectChannel
的最佳化變體,它在初始化時需要一個 MessageHandler
。由於 MessageHandler
是一個函式式介面,因此可以為 handleMessage
方法提供一個簡單的 lambda 表示式。當訊息傳送到 publishToHazelcastTopicChannel
時,它只會釋出到 Hazelcast 的 ITopic
上。com.hazelcast.topic.MessageListener
也是一個函式式介面,因此可以向 ITopic#addMessageListener
提供一個 lambda 表示式。因此,fromHazelcastTopicChannel
的訂閱者將消費所有傳送到上述 ITopic
的訊息。
ExecutorChannel
可以透過 IExecutorService
提供支援。例如,透過相應的配置可以實現叢集範圍的單例。
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(
new Config()
.addExecutorConfig(new ExecutorConfig()
.setName("singletonExecutor")
.setPoolSize(1)));
}
@Bean
public MessageChannel hazelcastSingletonExecutorChannel(HazelcastInstance hazelcastInstance) {
return new ExecutorChannel(hazelcastInstance.getExecutorService("singletonExecutor"));
}