Redis 支援
Spring Integration 2.1 引入了對 Redis 的支援:“一個開源的高階鍵值儲存”。這種支援以基於 Redis 的 MessageStore 以及釋出-訂閱訊息介面卡的形式出現,這些介面卡透過 Redis 的 PUBLISH、SUBSCRIBE 和 UNSUBSCRIBE 命令得到支援。
專案需要此依賴項
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
<version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-redis:7.0.0"
必須包含 Redis 客戶端依賴,例如 Lettuce。
要下載、安裝和執行 Redis,請參閱 Redis 文件。
連線到 Redis
要開始與 Redis 互動,首先必須獲得一個連線。Spring Integration 使用另一個 Spring 專案 Spring Data Redis 提供的支援,該專案提供了典型的 Spring 構造:ConnectionFactory 和 Template。這些抽象簡化了與多個 Redis 客戶端 Java API 的整合。目前,Spring Data Redis 支援 Jedis 和 Lettuce。
使用 RedisConnectionFactory
來自 Spring Data Redis 的 RedisConnectionFactory 是用於管理與 Redis 連線的高階抽象。以下列表顯示了介面定義
public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
/**
* Provides a suitable connection for interacting with Redis.
* @return connection for interacting with Redis.
*/
RedisConnection getConnection();
}
以下示例展示瞭如何在 Java 中建立 LettuceConnectionFactory
LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();
以下示例展示瞭如何在 Spring 的 XML 配置中建立 LettuceConnectionFactory
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
RedisConnectionFactory 的實現提供了一組屬性,例如埠和主機。一旦 RedisConnectionFactory 例項存在,就可以建立 RedisTemplate。
使用 RedisTemplate
與 Spring 中的其他模板類(如 JdbcTemplate 和 JmsTemplate)一樣,RedisTemplate 是一個幫助類,它簡化了 Redis 資料訪問程式碼。有關 RedisTemplate 及其變體(如 StringRedisTemplate)的更多資訊,請參閱 Spring Data Redis 文件。
以下示例展示瞭如何在 Java 中建立 RedisTemplate 例項
RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);
以下示例展示瞭如何在 Spring 的 XML 配置中建立 RedisTemplate 例項
<bean id="redisTemplate"
class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
使用 Redis 進行訊息傳遞
如 簡介 中所述,Redis 透過其 PUBLISH、SUBSCRIBE 和 UNSUBSCRIBE 命令提供釋出-訂閱訊息支援。與 JMS 和 AMQP 一樣,Spring Integration 提供訊息通道和介面卡,用於透過 Redis 傳送和接收訊息。
Redis 釋出/訂閱通道
與 JMS 類似,在某些情況下,生產者和消費者都旨在成為同一應用程式的一部分,並在同一程序中執行。這可以透過一對入站和出站通道介面卡來完成。然而,與 Spring Integration 的 JMS 支援一樣,有一種更簡單的方法來解決此用例。相反,可以使用釋出-訂閱通道,如下例所示
<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>
publish-subscribe-channel 的行為與主 Spring Integration 名稱空間中的普通 <publish-subscribe-channel/> 元素非常相似。它可以由任何端點的 input-channel 和 output-channel 屬性引用。不同之處在於,此通道由 Redis 主題名稱支援:由 topic-name 屬性指定的 String 值。然而,與 JMS 不同,此主題不必提前建立,甚至不必由 Redis 自動建立。在 Redis 中,主題是簡單的 String 值,扮演地址的角色。生產者和消費者可以使用相同的 String 值作為其主題名稱進行通訊。對此通道的簡單訂閱意味著在生產端點和消費端點之間可以進行非同步釋出-訂閱訊息傳遞。然而,與透過在簡單 Spring Integration <channel/> 元素中新增 <queue/> 元素建立的非同步訊息通道不同,訊息不儲存在記憶體佇列中。相反,這些訊息透過 Redis 傳遞,這使得我們可以依賴其對永續性和叢集的支援以及與其他非 Java 平臺的互操作性。
Redis 入站通道介面卡
Redis 入站通道介面卡 (RedisInboundChannelAdapter) 以與其他入站介面卡相同的方式將傳入的 Redis 訊息適配到 Spring 訊息中。它接收平臺特定訊息(本例中為 Redis)並透過使用 MessageConverter 策略將其轉換為 Spring 訊息。以下示例展示瞭如何配置 Redis 入站通道介面卡
<int-redis:inbound-channel-adapter id="redisAdapter"
topics="thing1, thing2"
channel="receiveChannel"
error-channel="testErrorChannel"
message-converter="testConverter" />
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
前面的示例展示了 Redis 入站通道介面卡的簡單而完整的配置。請注意,前面的配置依賴於熟悉的 Spring 範例:自動發現某些 bean。在本例中,redisConnectionFactory 被隱式注入到介面卡中。或者,可以透過 connection-factory 屬性注入自定義 RedisConnectionFactory。
此外,請注意,前面的配置為介面卡注入了一個自定義 MessageConverter。該方法類似於 JMS,其中 MessageConverter 例項用於在 Redis 訊息和 Spring Integration 訊息有效負載之間進行轉換。預設值為 SimpleMessageConverter。
入站介面卡可以訂閱多個主題名稱,因此 topics 屬性中是逗號分隔的值集。
從 3.0 版開始,入站介面卡除了現有的 topics 屬性外,現在還具有 topic-patterns 屬性。此屬性包含一個逗號分隔的 Redis 主題模式集。有關 Redis 釋出-訂閱的更多資訊,請參閱 Redis 釋出/訂閱。
入站介面卡可以使用 RedisSerializer 反序列化 Redis 訊息的正文。<int-redis:inbound-channel-adapter> 的 serializer 屬性可以設定為空字串,這將導致 RedisSerializer 屬性的值為 null。在這種情況下,Redis 訊息的原始 byte[] 正文作為訊息有效負載提供。
從 5.0 版開始,可以透過使用 <int-redis:inbound-channel-adapter> 的 task-executor 屬性將 Executor 例項注入到入站介面卡中。此外,收到的 Spring Integration 訊息現在具有 RedisHeaders.MESSAGE_SOURCE 頭,以指示已釋出訊息的來源:主題或模式。這可以在下游用於路由邏輯。
Redis 出站通道介面卡
Redis 出站通道介面卡以與其他出站介面卡相同的方式將傳出的 Spring Integration 訊息適配到 Redis 訊息中。它接收 Spring Integration 訊息並透過使用 MessageConverter 策略將其轉換為平臺特定訊息(本例中為 Redis)。以下示例展示瞭如何配置 Redis 出站通道介面卡
<int-redis:outbound-channel-adapter id="outboundAdapter"
channel="sendChannel"
topic="thing1"
message-converter="testConverter"/>
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379"/>
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
該配置與 Redis 入站通道介面卡並行。介面卡被隱式注入 RedisConnectionFactory,該工廠的 bean 名稱為 redisConnectionFactory。此示例還包括可選(和自定義)的 MessageConverter(testConverter bean)。
自 Spring Integration 3.0 起,<int-redis:outbound-channel-adapter> 提供了 topic 屬性的替代方案:存在 topic-expression 屬性,用於在執行時確定訊息的 Redis 主題。這些屬性是互斥的。
Redis 佇列入站通道介面卡
Spring Integration 3.0 引入了佇列入站通道介面卡,用於從 Redis 列表中“彈出”訊息。預設情況下,它使用“右彈出”,但可以配置為使用“左彈出”。該介面卡是訊息驅動的。它使用內部偵聽器執行緒,不使用輪詢器。
以下列表顯示了 queue-inbound-channel-adapter 的所有可用屬性
<int-redis:queue-inbound-channel-adapter id="" (1)
channel="" (2)
auto-startup="" (3)
phase="" (4)
connection-factory="" (5)
queue="" (6)
error-channel="" (7)
serializer="" (8)
receive-timeout="" (9)
recovery-interval="" (10)
expect-message="" (11)
task-executor="" (12)
right-pop=""/> (13)
| 1 | 元件 bean 名稱。如果未提供 channel 屬性,則會建立一個 DirectChannel 並使用此 id 屬性作為 bean 名稱在應用程式上下文中註冊。在這種情況下,端點本身將以 bean 名稱 id 加上 .adapter 進行註冊。(如果 bean 名稱是 thing1,則端點註冊為 thing1.adapter。) |
| 2 | 此端點向其傳送 Message 例項的 MessageChannel。 |
| 3 | 一個 SmartLifecycle 屬性,用於指定此端點是否應在應用程式上下文啟動後自動啟動。它預設為 true。 |
| 4 | 一個 SmartLifecycle 屬性,用於指定此端點啟動的階段。它預設為 0。 |
| 5 | 對 RedisConnectionFactory bean 的引用。它預設為 redisConnectionFactory。 |
| 6 | 執行基於佇列的“pop”操作以獲取 Redis 訊息的 Redis 列表的名稱。 |
| 7 | 當從端點的偵聽任務接收到異常時,將 ErrorMessage 例項傳送到的 MessageChannel。預設情況下,底層 MessagePublishingErrorHandler 使用應用程式上下文中的預設 errorChannel。 |
| 8 | RedisSerializer bean 引用。它可以是一個空字串,表示“無序列化器”。在這種情況下,來自入站 Redis 訊息的原始 byte[] 將作為 Message 有效負載傳送到 channel。預設情況下,它是 JdkSerializationRedisSerializer。 |
| 9 | “pop”操作等待來自佇列的 Redis 訊息的超時時間(以毫秒為單位)。預設值為 1 秒。 |
| 10 | “pop”操作發生異常後,偵聽器任務應休眠的時間(以毫秒為單位),然後重新啟動偵聽器任務。 |
| 11 | 指定此端點是否期望 Redis 佇列中的資料包含完整的 Message 例項。如果此屬性設定為 true,則 serializer 不能是空字串,因為訊息需要某種形式的反序列化(預設情況下為 JDK 序列化)。其預設值為 false。 |
| 12 | 對 Spring TaskExecutor(或標準 JDK 1.5+ Executor)bean 的引用。它用於底層偵聽任務。它預設為 SimpleAsyncTaskExecutor。 |
| 13 | 指定此端點是應使用“右彈出”(當為 true 時)還是“左彈出”(當為 false 時)從 Redis 列表讀取訊息。如果為 true,則當與預設 Redis 隊列出站通道介面卡一起使用時,Redis 列表充當 FIFO 佇列。將其設定為 false 以與使用“右推”寫入列表的軟體一起使用或實現堆疊狀訊息順序。其預設值為 true。自 4.3 版起。 |
task-executor 必須配置多個執行緒進行處理;否則,當 RedisQueueMessageDrivenEndpoint 嘗試在錯誤後重新啟動偵聽器任務時,可能會出現死鎖。errorChannel 可用於處理這些錯誤,以避免重新啟動,但最好不要使應用程式暴露於可能的死鎖情況。有關可能的 TaskExecutor 實現,請參閱 Spring Framework 參考手冊。 |
Redis 隊列出站通道介面卡
Spring Integration 3.0 引入了隊列出站通道介面卡,用於將 Spring Integration 訊息“推入”Redis 列表。預設情況下,它使用“左推”,但可以配置為使用“右推”。以下列表顯示了 Redis queue-outbound-channel-adapter 的所有可用屬性
<int-redis:queue-outbound-channel-adapter id="" (1)
channel="" (2)
connection-factory="" (3)
queue="" (4)
queue-expression="" (5)
serializer="" (6)
extract-payload="" (7)
left-push=""/> (8)
| 1 | 元件 bean 名稱。如果未提供 channel 屬性,則會建立一個 DirectChannel 並使用此 id 屬性作為 bean 名稱在應用程式上下文中註冊。在這種情況下,端點將以 bean 名稱 id 加上 .adapter 進行註冊。(如果 bean 名稱是 thing1,則端點註冊為 thing1.adapter。) |
| 2 | 此端點從中接收 Message 例項的 MessageChannel。 |
| 3 | 對 RedisConnectionFactory bean 的引用。它預設為 redisConnectionFactory。 |
| 4 | 執行基於佇列的“push”操作以傳送 Redis 訊息的 Redis 列表的名稱。此屬性與 queue-expression 互斥。 |
| 5 | 一個 SpEL Expression,用於確定 Redis 列表的名稱。它在執行時使用傳入的 Message 作為 #root 變數。此屬性與 queue 互斥。 |
| 6 | 一個 RedisSerializer bean 引用。它預設為 JdkSerializationRedisSerializer。但是,對於 String 有效負載,如果未提供 serializer 引用,則使用 StringRedisSerializer。 |
| 7 | 指定此端點是應僅傳送有效負載還是傳送整個 Message 到 Redis 佇列。它預設為 true。 |
| 8 | 指定此端點是應使用“左推”(當為 true 時)還是“右推”(當為 false 時)將訊息寫入 Redis 列表。如果為 true,則當與預設 Redis 佇列入站通道介面卡一起使用時,Redis 列表充當 FIFO 佇列。將其設定為 false 以與使用“左彈出”從列表讀取的軟體一起使用或實現堆疊狀訊息順序。它預設為 true。自 4.3 版起。 |
Redis 應用程式事件
自 Spring Integration 3.0 起,Redis 模組提供了 IntegrationEvent 的實現,它反過來又是 org.springframework.context.ApplicationEvent。RedisExceptionEvent 封裝了 Redis 操作中的異常(端點是事件的“源”)。例如,<int-redis:queue-inbound-channel-adapter/> 在從 BoundListOperations.rightPop 操作捕獲異常後發出這些事件。異常可以是任何泛型 org.springframework.data.redis.RedisSystemException 或 org.springframework.data.redis.RedisConnectionFailureException。使用 <int-event:inbound-channel-adapter/> 處理這些事件對於確定後臺 Redis 任務的問題並採取管理措施很有用。
Redis 訊息儲存
如《企業整合模式》(EIP) 一書所述,訊息儲存 允許持久化訊息。當處理具有緩衝訊息能力的元件(聚合器、重新排序器等)並且可靠性是問題時,這可能很有用。在 Spring Integration 中,MessageStore 策略還為 EIP 中描述的 索賠檢查 模式提供了基礎。
Spring Integration 的 Redis 模組提供了 RedisMessageStore。以下示例展示瞭如何將其與聚合器一起使用
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="redisMessageStore"/>
前面的示例是一個 bean 配置,它需要一個 RedisConnectionFactory 作為建構函式引數。
預設情況下,RedisMessageStore 使用 Java 序列化來序列化訊息。但是,如果需要不同的序列化技術(例如 JSON),則可以將自定義序列化器設定到 RedisMessageStore 的 valueSerializer 屬性中。
框架為 Message 例項和 MessageHeaders 例項提供了 Jackson 序列化器和反序列化器實現——分別是 MessageJsonDeserializer 和 MessageHeadersJsonSerializer。它們必須使用 SimpleModule 選項為 ObjectMapper 進行配置。此外,ObjectMapper 上應設定 enableDefaultTyping 以新增每個序列化複雜物件的型別資訊。該 type 資訊隨後在反序列化期間使用。框架提供了一個名為 JacksonMessagingUtils.messagingAwareMapper() 的實用方法,它已經提供了前面提到的所有屬性和序列化器。此實用方法帶有 trustedPackages 引數,用於限制 Java 包的反序列化以避免安全漏洞。預設的可信包:java.util、java.lang、org.springframework.messaging.support、org.springframework.integration.support、org.springframework.integration.message、org.springframework.integration.store。要在 RedisMessageStore 中管理 JSON 序列化,必須應用如下配置
RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonMessagingUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson3JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);
從 4.3.12 版本開始,RedisMessageStore 支援 prefix 選項,允許區分同一 Redis 伺服器上的儲存例項。
Redis 通道訊息儲存
前面 展示的 RedisMessageStore 將每個組作為單個鍵(組 ID)下的值進行維護。雖然 QueueChannel 可以用於持久化,但為此目的提供了一個專門的 RedisChannelMessageStore(自 4.0 版起)。此儲存為每個通道使用一個 LIST,傳送訊息時使用 LPUSH,接收訊息時使用 RPOP。預設情況下,此儲存也使用 JDK 序列化,但可以修改其值序列化器,如 前面所述。
建議使用支援儲存的通道,而不是使用通用的 RedisMessageStore。以下示例定義了一個 Redis 訊息儲存並在帶有佇列的通道中使用它
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="redisMessageStore"/>
<int:channel>
用於儲存資料的鍵的形式為:<storeBeanName>:<channelId>(在前面的示例中為 redisMessageStore:somePersistentQueueChannel)。
此外,還提供了子類 RedisChannelPriorityMessageStore。當與 QueueChannel 一起使用時,訊息按(FIFO)優先順序順序接收。它使用標準的 IntegrationMessageHeaderAccessor.PRIORITY 頭並支援優先順序值 (0 - 9)。具有其他優先順序(和沒有優先順序)的訊息在任何具有優先順序的訊息之後按 FIFO 順序檢索。
這些儲存僅實現 BasicMessageGroupStore,不實現 MessageGroupStore。它們只能用於支援 QueueChannel 等情況。 |
Redis 元資料儲存
Spring Integration 3.0 引入了一個新的基於 Redis 的 MetadataStore(請參閱 元資料儲存)實現。RedisMetadataStore 可用於在應用程式重新啟動後維護 MetadataStore 的狀態。這樣的 MetadataStore 實現可以與以下介面卡一起使用
要指示這些介面卡使用新的 RedisMetadataStore,請宣告一個名為 metadataStore 的 Spring bean。Feed 入站通道介面卡和 feed 入站通道介面卡都會自動拾取並使用宣告的 RedisMetadataStore。以下示例展示瞭如何宣告此類 bean
<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
<constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
RedisMetadataStore 由 RedisProperties 支援。與其互動使用 BoundHashOperations,這反過來又需要整個 Properties 儲存的 key。在 MetadataStore 的情況下,此 key 扮演區域的角色,這在分散式環境中很有用,當多個應用程式使用同一個 Redis 伺服器時。預設情況下,此 key 的值為 MetaData。
從 4.0 版本開始,此儲存實現了 ConcurrentMetadataStore,允許它在多個應用程式例項之間可靠地共享,其中只有一個例項允許儲存或修改鍵的值。
RedisMetadataStore.replace() 不能與 Redis 叢集一起使用(例如,在 AbstractPersistentAcceptOnceFileListFilter 中),因為目前不支援用於原子性的 WATCH 命令。 |
Redis 儲存入站通道介面卡
Redis 儲存入站通道介面卡是一個輪詢消費者,它從 Redis 集合讀取資料並將其作為 Message 有效負載傳送。以下示例展示瞭如何配置 Redis 儲存入站通道介面卡
<int-redis:store-inbound-channel-adapter id="listAdapter"
connection-factory="redisConnectionFactory"
key="myCollection"
channel="redisChannel"
collection-type="LIST" >
<int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>
前面的示例展示瞭如何使用 store-inbound-channel-adapter 元素配置 Redis 儲存入站通道介面卡,為各種屬性提供值,例如
-
key或key-expression:正在使用的集合的鍵名。 -
collection-type:此介面卡支援的集合型別列舉。支援的集合有LIST、SET、ZSET、PROPERTIES和MAP。 -
connection-factory:對o.s.data.redis.connection.RedisConnectionFactory例項的引用。 -
redis-template:對o.s.data.redis.core.RedisTemplate例項的引用。 -
所有入站介面卡共有的其他屬性(例如“channel”)。
redis-template 和 connection-factory 是互斥的。 |
|
預設情況下,介面卡使用
|
由於 key 具有字面值,因此前面的示例相對簡單和靜態。有時,鍵的值必須根據某些條件在執行時更改。為此,請改用 key-expression,其中提供的表示式可以是任何有效的 SpEL 表示式。
此外,可以對從 Redis 集合讀取的成功處理的資料進行一些後處理。例如,值在處理後可能會被移動或刪除。事務同步功能可用於此類邏輯。以下示例使用 key-expression 和事務同步
<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
connection-factory="redisConnectionFactory"
key-expression="'presidents'"
channel="otherRedisChannel"
auto-startup="false"
collection-type="ZSET">
<int:poller fixed-rate="1000" max-messages-per-poll="2">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-redis:store-inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
輪詢器可以透過使用 transactional 元素進行事務處理。此元素可以引用真正的事務管理器,例如,如果流的其他部分呼叫 JDBC。如果沒有“真正的”事務,則可以使用 o.s.i.transaction.PseudoTransactionManager,它是 Spring 的 PlatformTransactionManager 的實現,並在沒有實際事務時啟用 Redis 介面卡的事務同步功能。
| 這並不會使 Redis 活動本身具有事務性。它允許在成功(提交)之前或之後或在失敗(回滾)之後進行操作同步。 |
一旦輪詢器是事務性的,就可以在 transactional 元素上新增 o.s.i.transaction.TransactionSynchronizationFactory 例項。TransactionSynchronizationFactory 建立 TransactionSynchronization 例項。為方便起見,公開了一個預設的基於 SpEL 的 TransactionSynchronizationFactory,它允許配置 SpEL 表示式,其執行與事務協調(同步)。支援提交前、提交後和回滾後的表示式,以及傳送評估結果(如果有)的通道(每種事件一個)。對於每個子元素,可以指定 expression 和 channel 屬性。如果只存在 channel 屬性,則收到的訊息作為特定同步場景的一部分發送到該通道。如果只存在 expression 屬性,並且表示式的結果是非 null 值,則會生成一個以結果作為有效負載的訊息併發送到預設通道 (NullChannel) 並出現在日誌中(在 DEBUG 級別)。如果表示式的結果為 null 或 void,則不生成訊息。
RedisStoreMessageSource 添加了一個 store 屬性,其中包含一個繫結到事務 IntegrationResourceHolder 的 RedisStore 例項,可以從 TransactionSynchronizationProcessor 實現訪問。
有關事務同步的更多資訊,請參閱 事務同步。
RedisStore 出站通道介面卡
RedisStore 出站通道介面卡允許將訊息有效負載寫入 Redis 集合,如下例所示
<int-redis:store-outbound-channel-adapter id="redisListAdapter"
collection-type="LIST"
channel="requestChannel"
key="myCollection" />
前面的配置透過使用 store-inbound-channel-adapter 元素配置 Redis 儲存出站通道介面卡。它為各種屬性提供值,例如
-
key或key-expression:正在使用的集合的鍵名。 -
extract-payload-elements:如果設定為true(預設值)並且有效負載是“多值”物件(即Collection或Map)的例項,則使用“addAll”和“putAll”語義進行儲存。否則,如果設定為false,則有效負載作為單個條目儲存,無論其型別如何。如果有效負載不是“多值”物件的例項,則此屬性的值將被忽略,並且有效負載始終作為單個條目儲存。 -
collection-type:此介面卡支援的Collection型別列舉。支援的集合有LIST、SET、ZSET、PROPERTIES和MAP。 -
map-key-expression:返回正在儲存的條目的鍵名的 SpEL 表示式。它僅在collection-type是MAP或PROPERTIES並且“extract-payload-elements”為 false 時適用。 -
connection-factory:對o.s.data.redis.connection.RedisConnectionFactory例項的引用。 -
redis-template:對o.s.data.redis.core.RedisTemplate例項的引用。 -
所有入站介面卡共有的其他屬性(例如“channel”)。
redis-template 和 connection-factory 是互斥的。 |
預設情況下,介面卡使用 StringRedisTemplate。它將 StringRedisSerializer 例項用於鍵、值、雜湊鍵和雜湊值。但是,如果 extract-payload-elements 設定為 false,則將使用一個 RedisTemplate,它對鍵和雜湊鍵使用 StringRedisSerializer 例項,對值和雜湊值使用 JdkSerializationRedisSerializer 例項。使用 JDK 序列化器時,重要的是要了解 Java 序列化用於所有值,無論該值實際上是否是集合。如果需要對值的序列化進行更多控制,則可以提供自定義 RedisTemplate,而不是依賴這些預設值。 |
由於 key 和其他屬性具有字面值,因此前面的示例相對簡單和靜態。有時,這些值可能會根據某些條件在執行時動態更改。為此,提供了它們的 -expression 等效項(key-expression、map-key-expression 等),其中表達式可以是任何有效的 SpEL 表示式。
Redis 出站命令閘道器
Spring Integration 4.0 引入了 Redis 命令閘道器,允許使用通用的 RedisConnection#execute 方法執行任何標準 Redis 命令。以下列表顯示了 Redis 出站閘道器的可用屬性
<int-redis:outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
redis-template="" (6)
arguments-serializer="" (7)
command-expression="" (8)
argument-expressions="" (9)
use-command-variable="" (10)
arguments-strategy="" /> (11)
| 1 | 此端點從中接收 Message 例項的 MessageChannel。 |
| 2 | 此端點發送回復 Message 例項的 MessageChannel。 |
| 3 | 指定此出站閘道器是否必須返回非 null 值。它預設為 true。當 Redis 返回 null 值時,將丟擲 ReplyRequiredException。 |
| 4 | 等待發送回復訊息的超時時間(以毫秒為單位)。它通常用於基於佇列的有限回覆通道。 |
| 5 | 對 RedisConnectionFactory bean 的引用。它預設為 redisConnectionFactory。它與 redis-template 屬性互斥。 |
| 6 | 對 RedisTemplate bean 的引用。它與 connection-factory 屬性互斥。 |
| 7 | 對 org.springframework.data.redis.serializer.RedisSerializer 例項的引用。如有必要,它用於將每個命令引數序列化為 byte[]。 |
| 8 | 返回命令鍵的 SpEL 表示式。它預設為 redis_command 訊息頭。它不得評估為 null。 |
| 9 | 逗號分隔的 SpEL 表示式,它們被評估為命令引數。與 arguments-strategy 屬性互斥。如果未提供任何屬性,則 payload 用作命令引數。引數表示式可以評估為“null”以支援可變數量的引數。 |
| 10 | 一個 boolean 標誌,用於指定在配置 argument-expressions 時,評估的 Redis 命令字串是否在 o.s.i.redis.outbound.ExpressionArgumentsStrategy 的表示式評估上下文中作為 #cmd 變數可用。否則,此屬性將被忽略。 |
| 11 | 對 o.s.i.redis.outbound.ArgumentsStrategy 例項的引用。它與 argument-expressions 屬性互斥。如果未提供任何屬性,則 payload 用作命令引數。 |
<int-redis:outbound-gateway> 可以用作執行任何所需 Redis 操作的通用元件。以下示例展示瞭如何從 Redis 原子數獲取增量值
<int-redis:outbound-gateway request-channel="requestChannel"
reply-channel="replyChannel"
command-expression="'INCR'"/>
Message 有效負載應具有 redisCounter 名稱,這可以透過 org.springframework.data.redis.support.atomic.RedisAtomicInteger bean 定義提供。
RedisConnection#execute 方法具有泛型 Object 作為其返回型別。實際結果取決於命令型別。例如,MGET 返回 List<byte[]>。有關命令、其引數和結果型別的更多資訊,請參閱 Redis 規範。
Redis 隊列出站閘道器
Spring Integration 引入了 Redis 隊列出站閘道器以執行請求和回覆場景。它將對話 UUID 推送到提供的 queue,將以該 UUID 作為其鍵的值推送到 Redis 列表,並等待來自以 UUID 加上 .reply 作為鍵的 Redis 列表的回覆。每次互動都使用不同的 UUID。以下列表顯示了 Redis 出站閘道器的可用屬性
<int-redis:queue-outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
extract-payload=""/> (9)
| 1 | 此端點從中接收 Message 例項的 MessageChannel。 |
| 2 | 此端點發送回復 Message 例項的 MessageChannel。 |
| 3 | 指定此出站閘道器是否必須返回非 null 值。此值預設為 false。否則,當 Redis 返回 null 值時,將丟擲 ReplyRequiredException。 |
| 4 | 等待發送回復訊息的超時時間(以毫秒為單位)。它通常用於基於佇列的有限回覆通道。 |
| 5 | 對 RedisConnectionFactory bean 的引用。它預設為 redisConnectionFactory。它與“redis-template”屬性互斥。 |
| 6 | 出站閘道器向其傳送對話 UUID 的 Redis 列表的名稱。 |
| 7 | 當註冊了多個閘道器時,此出站閘道器的順序。 |
| 8 | RedisSerializer bean 引用。它可以是一個空字串,表示“無序列化器”。在這種情況下,來自入站 Redis 訊息的原始 byte[] 將作為 Message 有效負載傳送到 channel。預設情況下,它是 JdkSerializationRedisSerializer。 |
| 9 | 指定此端點是否期望 Redis 佇列中的資料包含完整的 Message 例項。如果此屬性設定為 true,則 serializer 不能是空字串,因為訊息需要某種形式的反序列化(預設情況下為 JDK 序列化)。 |
Redis 佇列入站閘道器
Spring Integration 4.1 引入了 Redis 佇列入站閘道器以執行請求和回覆場景。它從提供的 queue 彈出對話 UUID,從 Redis 列表中彈出以該 UUID 作為其鍵的值,並將回覆推送到以 UUID 加上 .reply 作為鍵的 Redis 列表。以下列表顯示了 Redis 佇列入站閘道器的可用屬性
<int-redis:queue-inbound-gateway
request-channel="" (1)
reply-channel="" (2)
executor="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
receive-timeout="" (9)
expect-message="" (10)
recovery-interval=""/> (11)
| 1 | 此端點發送從 Redis 資料建立的 Message 例項的 MessageChannel。 |
| 2 | 此端點等待回覆 Message 例項的 MessageChannel。可選 - replyChannel 頭仍在使用中。 |
| 3 | 對 Spring TaskExecutor(或標準 JDK Executor)bean 的引用。它用於底層偵聽任務。它預設為 SimpleAsyncTaskExecutor。 |
| 4 | 等待發送回復訊息的超時時間(以毫秒為單位)。它通常用於基於佇列的有限回覆通道。 |
| 5 | 對 RedisConnectionFactory bean 的引用。它預設為 redisConnectionFactory。它與 redis-template 屬性互斥。 |
| 6 | 對話 UUID 的 Redis 列表的名稱。 |
| 7 | 當註冊了多個閘道器時,此入站閘道器的順序。 |
| 8 | RedisSerializer bean 引用。它可以是一個空字串,表示“無序列化器”。在這種情況下,來自入站 Redis 訊息的原始 byte[] 將作為 Message 有效負載傳送到 channel。它預設為 JdkSerializationRedisSerializer。(請注意,在 4.3 版本之前,它預設是 StringRedisSerializer。要恢復該行為,請提供對 StringRedisSerializer 的引用)。 |
| 9 | 等待接收訊息的超時時間(以毫秒為單位)。它通常用於基於佇列的有限請求通道。 |
| 10 | 指定此端點是否期望 Redis 佇列中的資料包含完整的 Message 例項。如果此屬性設定為 true,則 serializer 不能是空字串,因為訊息需要某種形式的反序列化(預設情況下為 JDK 序列化)。 |
| 11 | “右彈出”操作發生異常後,偵聽器任務應休眠的時間(以毫秒為單位),然後重新啟動偵聽器任務。 |
task-executor 必須配置多個執行緒進行處理;否則,當 RedisQueueMessageDrivenEndpoint 嘗試在錯誤後重新啟動偵聽器任務時,可能會出現死鎖。errorChannel 可用於處理這些錯誤,以避免重新啟動,但最好不要使應用程式暴露於可能的死鎖情況。有關可能的 TaskExecutor 實現,請參閱 Spring Framework 參考手冊。 |
Redis 流出站通道介面卡
Spring Integration 5.4 引入了 Reactive Redis Stream 出站通道介面卡,用於將訊息有效負載寫入 Redis 流。出站通道介面卡使用 ReactiveStreamOperations.add(…) 將 Record 新增到流中。以下示例展示瞭如何使用 Java 配置和 Service 類來實現 Redis 流出站通道介面卡。
@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); (1)
reactiveStreamMessageHandler.setSerializationContext(serializationContext); (2)
reactiveStreamMessageHandler.setHashMapper(hashMapper); (3)
reactiveStreamMessageHandler.setExtractPayload(true); (4)
return reactiveStreamMessageHandler;
}
| 1 | 使用 ReactiveRedisConnectionFactory 和流名稱構造 ReactiveRedisStreamMessageHandler 例項以新增記錄。另一個建構函式變體基於 SpEL 表示式,用於根據請求訊息評估流鍵。 |
| 2 | 設定用於在新增到流之前序列化記錄鍵和值的 RedisSerializationContext。 |
| 3 | 設定 HashMapper,它提供 Java 型別和 Redis 雜湊/對映之間的契約。 |
| 4 | 如果為“true”,通道介面卡將從請求訊息中提取有效負載值以用於流記錄。或者使用整個訊息作為值。它預設為 true。 |
從 6.5 版本開始,ReactiveRedisStreamMessageHandler 提供了 setAddOptionsFunction(Function<Message<?>, RedisStreamCommands.XAddOptions> addOptionsFunction) 選項,用於根據請求訊息為內部 ReactiveStreamOperations.add(Record<K, ?> record, XAddOptions xAddOptions) 呼叫構建 RedisStreamCommands.XAddOptions。
Redis 流入站通道介面卡
Spring Integration 5.4 引入了 Reactive Stream 入站通道介面卡,用於從 Redis 流讀取訊息。入站通道介面卡根據自動確認標誌使用 StreamReceiver.receive(…) 或 StreamReceiver.receiveAutoAck() 從 Redis 流讀取記錄。以下示例展示瞭如何使用 Java 配置來實現 Redis 流入站通道介面卡。
@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); (1)
messageProducer.setStreamReceiverOptions( (2)
StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.build());
messageProducer.setAutoStartup(true); (3)
messageProducer.setAutoAck(false); (4)
messageProducer.setCreateConsumerGroup(true); (5)
messageProducer.setConsumerGroup("my-group"); (6)
messageProducer.setConsumerName("my-consumer"); (7)
messageProducer.setOutputChannel(fromRedisStreamChannel); (8)
messageProducer.setReadOffset(ReadOffset.latest()); (9)
messageProducer.extractPayload(true); (10)
return messageProducer;
}
| 1 | 使用 ReactiveRedisConnectionFactory 和流鍵構造 ReactiveRedisStreamMessageProducer 例項以讀取記錄。 |
| 2 | 一個 StreamReceiver.StreamReceiverOptions,用於使用反應式基礎設施消費 redis 流。 |
| 3 | 一個 SmartLifecycle 屬性,用於指定此端點是否應在應用程式上下文啟動後自動啟動。它預設為 true。如果為 false,則應手動啟動 RedisStreamMessageProducer:messageProducer.start()。 |
| 4 | 如果為 false,則接收到的訊息不會自動確認。訊息的確認將推遲到客戶端消費訊息。它預設為 true。 |
| 5 | 如果為 true,則將建立消費者組。在建立消費者組期間,也將建立流(如果尚不存在)。消費者組跟蹤訊息傳遞並區分消費者。它預設為 false。 |
| 6 | 設定消費者組名稱。它預設為定義的 bean 名稱。 |
| 7 | 設定消費者名稱。將訊息作為 my-consumer 從組 my-group 中讀取。 |
| 8 | 此端點發送訊息的通道。 |
| 9 | 定義讀取訊息的偏移量。它預設為 ReadOffset.latest()。 |
| 10 | 如果為“true”,通道介面卡將從 Record 中提取有效負載值。否則,整個 Record 用作有效負載。它預設為 true。 |
如果 autoAck 設定為 false,則 Redis 流中的 Record 不會被 Redis 驅動程式自動確認,而是將 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 標頭新增到要生成的訊息中,其值為 SimpleAcknowledgment 例項。目標整合流負責在根據此類記錄完成訊息的業務邏輯時呼叫其 acknowledge() 回撥。即使在反序列化期間發生異常並配置了 errorChannel,也需要類似的邏輯。因此,目標錯誤處理程式必須決定確認或拒絕此類失敗訊息。除了 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 之外,ReactiveRedisStreamMessageProducer 還將這些標頭填充到要生成的訊息中:RedisHeaders.STREAM_KEY、RedisHeaders.STREAM_MESSAGE_ID、RedisHeaders.CONSUMER_GROUP 和 RedisHeaders.CONSUMER。
從 5.5 版本開始,可以在 ReactiveRedisStreamMessageProducer 上顯式配置 StreamReceiver.StreamReceiverOptionsBuilder 選項,包括新引入的 onErrorResume 函式,如果 Redis 流消費者在發生反序列化錯誤時應繼續輪詢,則需要此函式。預設函式將訊息傳送到錯誤通道(如果提供),並可能對失敗訊息進行確認,如上所述。所有這些 StreamReceiver.StreamReceiverOptionsBuilder 與外部提供的 StreamReceiver.StreamReceiverOptions 互斥。
Redis 鎖登錄檔
Spring Integration 4.0 引入了 RedisLockRegistry。某些元件(例如,聚合器和重新排序器)使用從 LockRegistry 例項獲取的鎖,以確保一次只有一個執行緒操作一個組。DefaultLockRegistry 在單個元件中執行此功能。可以在這些元件上配置外部鎖登錄檔。當與共享的 MessageGroupStore 一起使用時,可以設定 RedisLockRegistry 以在多個應用程式例項之間提供此功能,以便一次只有一個例項可以操作該組。
當本地執行緒釋放鎖時,另一個本地執行緒通常可以立即獲取鎖。如果使用不同登錄檔例項的執行緒釋放鎖,則可能需要長達 100 毫秒才能獲取鎖。
為避免“懸掛”鎖(當伺服器失敗時),此登錄檔中的鎖在預設 60 秒後過期,但可以在登錄檔上配置。鎖通常保持的時間要短得多。
| 由於鍵可能會過期,嘗試解鎖已過期的鎖會導致丟擲異常。但是,受此類鎖保護的資源可能已受到損害,因此此類異常應被視為嚴重異常。過期時間應設定為足夠大的值以防止這種情況,但要設定得足夠低,以便在伺服器故障後能在合理的時間內恢復鎖。 |
從 5.0 版本開始,RedisLockRegistry 實現了 ExpirableLockRegistry,它會刪除最後獲取時間超過 age 且當前未鎖定的鎖。
從 5.5.6 版本開始,RedisLockRegistry 支援透過 RedisLockRegistry.setCacheCapacity() 自動清理 RedisLockRegistry.locks 中的 redisLocks 快取。有關更多資訊,請參閱其 JavaDoc。
從 5.5.13 版本開始,RedisLockRegistry 公開了一個 setRedisLockType(RedisLockType) 選項,用於確定 Redis 鎖獲取應以哪種模式發生
-
RedisLockType.SPIN_LOCK- 鎖透過定期迴圈(100 毫秒)檢查是否可以獲取鎖來獲取。預設。 -
RedisLockType.PUB_SUB_LOCK- 鎖透過 redis 釋出-訂閱訂閱獲取。
pub-sub 是首選模式 - 客戶端 Redis 伺服器之間的網路通訊更少,效能更高 - 當訂閱在另一個程序中收到解鎖通知時,鎖會立即獲取。但是,Redis 不支援 Master/Replica 連線中的 pub-sub(例如,在 AWS ElastiCache 環境中),因此,選擇忙迴圈模式作為預設值,以使登錄檔在任何環境中都能工作。
從 6.4 版本開始,RedisLockRegistry.RedisLock.unlock() 方法在鎖的所有權過期時會丟擲 ConcurrentModificationException,而不是 IllegalStateException。
從 6.4 版本開始,添加了 RedisLockRegistry.setRenewalTaskScheduler() 以配置用於定期續訂鎖的排程器。設定後,鎖將在成功獲取鎖後每 1/3 的過期時間自動續訂,直到解鎖或 Redis 鍵被刪除。
從 7.0 版本開始,RedisLock 實現了 DistributedLock 介面,以支援鎖狀態資料的自定義生存時間 (TTL) 功能。現在可以使用 lock(Duration ttl) 或 tryLock(long time, TimeUnit unit, Duration ttl) 方法獲取 RedisLock,並指定生存時間 (TTL) 值。RedisLockRegistry 現在提供了新的 renewLock(Object lockKey, Duration ttl) 方法,允許使用自定義生存時間值續訂鎖。
叢集模式下 AWS ElastiCache for Valkey 支援
從 6.4.9/6.5.4/7.0.0 版本開始,RedisLockRegistry 支援叢集模式下的 AWS Elasticache for Valkey。在此版本的 valkey(一個 redis 的替代品)中,所有 PubSub 操作(PUBLISH、SUBSCRIBE 等)都在內部使用其分片變體(SPUBLISH、SSUBSCRIBE 等)。如果出現以下形式的任何錯誤
Caused by: io.lettuce.core.RedisCommandExecutionException: ERR Script attempted to access keys that do not hash to the same slot script: b2dedc0ab01c17f9f20e3e6ddb62dcb6afbed0bd, on @user_script:3.
“在 RedisLockRegistry 的 unlock 步驟中,必須提供包含主題標籤 {…} 的鎖鍵,以確保解鎖指令碼中的所有操作都被雜湊到相同的叢集槽/分片,例如
RedisLockRegistry lockRegistry = new RedisLockRegistry("my-lock-key{choose_your_tag}");
lockRegistry.lock();
# critical section
lockRegistry.unlock();