Redis 支援
Spring Integration 2.1 引入了對 Redis 的支援:“一個開源的高階鍵值儲存”。這種支援以基於 Redis 的 MessageStore
以及透過 Redis 的 PUBLISH
、SUBSCRIBE
和 UNSUBSCRIBE
命令支援的釋出/訂閱訊息介面卡形式提供。
您需要在專案中包含此依賴
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
<version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-redis:6.4.4"
您還需要包含 Redis 客戶端依賴,例如 Lettuce。
要下載、安裝和執行 Redis,請參閱 Redis 文件。
連線到 Redis
要開始與 Redis 互動,首先需要連線到它。Spring Integration 使用另一個 Spring 專案提供的支援,即 Spring Data Redis,它提供了典型的 Spring 構造:ConnectionFactory
和 Template
。這些抽象簡化了與多種 Redis 客戶端 Java API 的整合。目前,Spring Data Redis 支援 Jedis 和 Lettuce。
使用 RedisConnectionFactory
要連線到 Redis,您可以使用 RedisConnectionFactory
介面的其中一個實現。以下清單顯示了介面定義
public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
/**
* Provides a suitable connection for interacting with Redis.
* @return connection for interacting with Redis.
*/
RedisConnection getConnection();
}
以下示例展示瞭如何在 Java 中建立 LettuceConnectionFactory
LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();
以下示例展示瞭如何在 Spring XML 配置中建立 LettuceConnectionFactory
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
RedisConnectionFactory
的實現提供了一組屬性,例如埠和主機,您可以根據需要進行設定。一旦擁有 RedisConnectionFactory
例項,就可以建立一個 RedisTemplate
例項,並透過 RedisConnectionFactory
注入它。
使用 RedisTemplate
與 Spring 中的其他模板類(例如 JdbcTemplate
和 JmsTemplate
)一樣,RedisTemplate
是一個幫助類,簡化了 Redis 資料訪問程式碼。有關 RedisTemplate
及其變體(例如 StringRedisTemplate
)的更多資訊,請參閱 Spring Data Redis 文件。
以下示例展示瞭如何在 Java 中建立 RedisTemplate
例項
RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);
以下示例展示瞭如何在 Spring XML 配置中建立 RedisTemplate
例項
<bean id="redisTemplate"
class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
使用 Redis 進行訊息處理
如引言中所述,Redis 透過其 PUBLISH
、SUBSCRIBE
和 UNSUBSCRIBE
命令提供了釋出/訂閱訊息支援。與 JMS 和 AMQP 一樣,Spring Integration 提供了訊息通道和介面卡,用於透過 Redis 傳送和接收訊息。
Redis 釋出/訂閱通道
與 JMS 類似,在某些情況下,生產者和消費者都旨在成為同一應用程式的一部分,並在同一程序中執行。您可以透過使用一對入站和出站通道介面卡來實現這一點。然而,與 Spring Integration 的 JMS 支援一樣,有一種更簡單的方法來解決此用例。您可以建立一個釋出/訂閱通道,如下例所示
<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>
publish-subscribe-channel
的行為與主 Spring Integration 名稱空間中的普通 <publish-subscribe-channel/>
元素非常相似。它可以透過任何端點的 input-channel
和 output-channel
屬性來引用。不同之處在於,此通道由 Redis 主題名稱支援:一個由 topic-name
屬性指定的 String
值。然而,與 JMS 不同,此主題不必提前建立,甚至不必由 Redis 自動建立。在 Redis 中,主題是簡單的 String
值,扮演地址的角色。生產者和消費者可以使用相同的主題名稱 String
值進行通訊。簡單訂閱此通道意味著在生產者和消費者端點之間可以進行非同步釋出/訂閱訊息傳遞。然而,與在簡單的 Spring Integration <channel/>
元素內新增 <queue/>
元素建立的非同步訊息通道不同,訊息不會儲存在記憶體佇列中。相反,這些訊息會透過 Redis 傳遞,這使您可以依賴其對持久化和叢集的支援以及與非 Java 平臺的互操作性。
Redis 入站通道介面卡
Redis 入站通道介面卡 (RedisInboundChannelAdapter
) 以與其他入站介面卡相同的方式將傳入的 Redis 訊息轉換為 Spring 訊息。它接收特定於平臺的訊息(在本例中為 Redis),並使用 MessageConverter
策略將它們轉換為 Spring 訊息。以下示例展示瞭如何配置 Redis 入站通道介面卡
<int-redis:inbound-channel-adapter id="redisAdapter"
topics="thing1, thing2"
channel="receiveChannel"
error-channel="testErrorChannel"
message-converter="testConverter" />
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
前面的示例展示了 Redis 入站通道介面卡簡單但完整的配置。請注意,前面的配置依賴於 Spring 熟悉的自動發現某些 bean 的範例。在本例中,redisConnectionFactory
會隱式注入到介面卡中。您可以透過使用 connection-factory
屬性來顯式指定它。
另外請注意,前面的配置將介面卡注入了自定義的 MessageConverter
。這種方法類似於 JMS,其中使用 MessageConverter
例項在 Redis 訊息和 Spring Integration 訊息載荷之間進行轉換。預設是一個 SimpleMessageConverter
。
入站介面卡可以訂閱多個主題名稱,因此 topics
屬性中的值是用逗號分隔的集合。
從 3.0 版本開始,入站介面卡除了現有的 topics
屬性外,現在還有 topic-patterns
屬性。此屬性包含一組用逗號分隔的 Redis 主題模式。有關 Redis 釋出/訂閱的更多資訊,請參閱 Redis Pub/Sub。
入站介面卡可以使用 RedisSerializer
來反序列化 Redis 訊息體。<int-redis:inbound-channel-adapter>
的 serializer
屬性可以設定為空字串,這將導致 RedisSerializer
屬性的值為 null
。在這種情況下,Redis 訊息的原始 byte[]
訊息體將作為訊息載荷提供。
從 5.0 版本開始,您可以透過使用 <int-redis:inbound-channel-adapter>
的 task-executor
屬性為入站介面卡提供 Executor
例項。此外,接收到的 Spring Integration 訊息現在具有 RedisHeaders.MESSAGE_SOURCE
頭部,以指示釋出訊息的來源:主題或模式。您可以在下游使用它進行路由邏輯。
Redis 出站通道介面卡
Redis 出站通道介面卡以與其他出站介面卡相同的方式將傳出的 Spring Integration 訊息轉換為 Redis 訊息。它接收 Spring Integration 訊息,並使用 MessageConverter
策略將它們轉換為特定於平臺的訊息(在本例中為 Redis)。以下示例展示瞭如何配置 Redis 出站通道介面卡
<int-redis:outbound-channel-adapter id="outboundAdapter"
channel="sendChannel"
topic="thing1"
message-converter="testConverter"/>
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379"/>
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
此配置與 Redis 入站通道介面卡類似。介面卡隱式注入了 RedisConnectionFactory
,其 bean 名稱定義為 redisConnectionFactory
。此示例還包括可選(和自定義)的 MessageConverter
(即 testConverter
bean)。
從 Spring Integration 3.0 開始,<int-redis:outbound-channel-adapter>
提供了一個替代 topic
屬性的選項:您可以使用 topic-expression
屬性在執行時確定訊息的 Redis 主題。這些屬性是互斥的。
Redis 佇列入站通道介面卡
Spring Integration 3.0 引入了佇列入站通道介面卡,用於從 Redis 列表“彈出”訊息。預設情況下,它使用“右彈出”,但您可以配置它使用“左彈出”。該介面卡是訊息驅動的。它使用內部監聽器執行緒,不使用輪詢器。
以下列表顯示了 queue-inbound-channel-adapter
所有可用的屬性
<int-redis:queue-inbound-channel-adapter id="" (1)
channel="" (2)
auto-startup="" (3)
phase="" (4)
connection-factory="" (5)
queue="" (6)
error-channel="" (7)
serializer="" (8)
receive-timeout="" (9)
recovery-interval="" (10)
expect-message="" (11)
task-executor="" (12)
right-pop=""/> (13)
1 | 元件 bean 名稱。如果您未提供 channel 屬性,則會在應用程式上下文中建立一個 DirectChannel 並使用此 id 屬性作為 bean 名稱進行註冊。在這種情況下,端點本身將使用 bean 名稱 id 加上 .adapter 進行註冊。(如果 bean 名稱是 thing1 ,則端點將註冊為 thing1.adapter 。) |
2 | 該端點將 Message 例項傳送到的 MessageChannel 。 |
3 | 一個 SmartLifecycle 屬性,用於指定該端點是否應在應用程式上下文啟動後自動啟動。預設為 true 。 |
4 | 一個 SmartLifecycle 屬性,用於指定該端點啟動的階段。預設為 0 。 |
5 | 對 RedisConnectionFactory bean 的引用。預設為 redisConnectionFactory 。 |
6 | 執行基於佇列的 'pop' 操作以獲取 Redis 訊息的 Redis 列表的名稱。 |
7 | 當從端點的監聽任務接收到異常時,用於傳送 ErrorMessage 例項的 MessageChannel 。預設情況下,底層 MessagePublishingErrorHandler 使用應用程式上下文中的預設 errorChannel 。 |
8 | RedisSerializer bean 引用。它可以是一個空字串,表示“無序列化器”。在這種情況下,入站 Redis 訊息中的原始 byte[] 將作為 Message 載荷傳送到 channel 。預設情況下,它是 JdkSerializationRedisSerializer 。 |
9 | 'pop' 操作等待從佇列中獲取 Redis 訊息的超時時間(毫秒)。預設為 1 秒。 |
10 | 在 'pop' 操作發生異常後,監聽任務在重啟之前應休眠的時間(毫秒)。 |
11 | 指定此端點是否期望 Redis 佇列中的資料包含完整的 Message 例項。如果此屬性設定為 true ,則 serializer 不能是空字串,因為訊息需要某種形式的反序列化(預設為 JDK 序列化)。預設值為 false 。 |
12 | 對 Spring TaskExecutor (或標準 JDK 1.5+ Executor )bean 的引用。它用於底層的監聽任務。預設為 SimpleAsyncTaskExecutor 。 |
13 | 指定此端點應使用“右彈出”(當 true 時)還是“左彈出”(當 false 時)從 Redis 列表中讀取訊息。如果為 true ,則與預設的 Redis 隊列出站通道介面卡一起使用時,Redis List 充當 FIFO 佇列。將其設定為 false 以與使用“右推入”寫入列表的軟體一起使用或實現棧式訊息順序。預設值為 true 。從版本 4.3 開始。 |
task-executor 必須配置多個執行緒進行處理;否則,當 RedisQueueMessageDrivenEndpoint 嘗試在錯誤後重新啟動監聽任務時,可能會發生死鎖。可以使用 errorChannel 處理這些錯誤以避免重啟,但最好不要讓應用程式暴露在可能的死鎖情況中。有關可能的 TaskExecutor 實現,請參閱 Spring Framework 參考手冊。 |
Redis 隊列出站通道介面卡
Spring Integration 3.0 引入了隊列出站通道介面卡,用於將 Spring Integration 訊息“推入”到 Redis 列表。預設情況下,它使用“左推入”,但您可以配置它使用“右推入”。以下列表顯示了 Redis queue-outbound-channel-adapter
所有可用的屬性
<int-redis:queue-outbound-channel-adapter id="" (1)
channel="" (2)
connection-factory="" (3)
queue="" (4)
queue-expression="" (5)
serializer="" (6)
extract-payload="" (7)
left-push=""/> (8)
1 | 元件 bean 名稱。如果您未提供 channel 屬性,則會在應用程式上下文中建立一個 DirectChannel 並使用此 id 屬性作為 bean 名稱進行註冊。在這種情況下,端點將使用 bean 名稱 id 加上 .adapter 進行註冊。(如果 bean 名稱是 thing1 ,則端點將註冊為 thing1.adapter 。) |
2 | 該端點從中接收 Message 例項的 MessageChannel 。 |
3 | 對 RedisConnectionFactory bean 的引用。預設為 redisConnectionFactory 。 |
4 | 執行基於佇列的 'push' 操作以傳送 Redis 訊息的 Redis 列表的名稱。此屬性與 queue-expression 互斥。 |
5 | 一個 SpEL Expression ,用於確定 Redis 列表的名稱。它在執行時使用傳入的 Message 作為 #root 變數。此屬性與 queue 互斥。 |
6 | 一個 RedisSerializer bean 引用。預設為 JdkSerializationRedisSerializer 。但是,對於 String 載荷,如果未提供 serializer 引用,則使用 StringRedisSerializer 。 |
7 | 指定此端點應將僅載荷還是完整的 Message 傳送到 Redis 佇列。預設值為 true 。 |
8 | 指定此端點應使用“左推入”(當 true 時)還是“右推入”(當 false 時)將訊息寫入 Redis 列表。如果為 true ,則與預設的 Redis 佇列入站通道介面卡一起使用時,Redis 列表充當 FIFO 佇列。將其設定為 false 以與使用“左彈出”從列表讀取的軟體一起使用或實現棧式訊息順序。預設值為 true 。從版本 4.3 開始。 |
Redis 應用事件
從 Spring Integration 3.0 開始,Redis 模組提供了 IntegrationEvent
的實現,後者又是一個 org.springframework.context.ApplicationEvent
。RedisExceptionEvent
封裝了 Redis 操作中的異常(端點是事件的“來源”)。例如,<int-redis:queue-inbound-channel-adapter/>
在捕獲 BoundListOperations.rightPop
操作的異常後會發出這些事件。異常可以是任何通用的 org.springframework.data.redis.RedisSystemException
或 org.springframework.data.redis.RedisConnectionFailureException
。使用 <int-event:inbound-channel-adapter/>
處理這些事件對於確定後臺 Redis 任務的問題和採取管理操作非常有用。
Redis 訊息儲存
正如《企業整合模式》(EIP) 一書中所述,訊息儲存使您可以持久化訊息。當處理具有訊息緩衝能力(聚合器、重排序器等)的元件且關注可靠性時,這非常有用。在 Spring Integration 中,MessageStore
策略也為 存根(Claim Check)模式提供了基礎,該模式也在 EIP 中有所描述。
Spring Integration 的 Redis 模組提供了 RedisMessageStore
。以下示例展示瞭如何將其與聚合器一起使用
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="redisMessageStore"/>
前面的示例是一個 bean 配置,它期望 RedisConnectionFactory
作為建構函式引數。
預設情況下,RedisMessageStore
使用 Java 序列化來序列化訊息。然而,如果您想使用不同的序列化技術(例如 JSON),您可以透過設定 RedisMessageStore
的 valueSerializer
屬性來提供您自己的序列化器。
從 4.3.10 版本開始,框架提供了 Message
例項和 MessageHeaders
例項的 Jackson 序列化器和反序列化器實現,分別是 MessageJacksonDeserializer
和 MessageHeadersJacksonSerializer
。它們需要使用 SimpleModule
選項配置 ObjectMapper
。此外,您應該在 ObjectMapper
上設定 enableDefaultTyping
,為每個序列化的複雜物件新增型別資訊(如果您信任來源)。然後,該型別資訊將在反序列化期間使用。框架提供了一個名為 JacksonJsonUtils.messagingAwareMapper()
的實用方法,該方法已經提供了所有前面提到的屬性和序列化器。該實用方法帶有 trustedPackages
引數,用於限制 Java 包的反序列化範圍,以避免安全漏洞。預設的受信任包包括:java.util
、java.lang
、org.springframework.messaging.support
、org.springframework.integration.support
、org.springframework.integration.message
、org.springframework.integration.store
。要在 RedisMessageStore
中管理 JSON 序列化,您必須按照類似於以下示例的方式進行配置
RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);
從 4.3.12 版本開始,RedisMessageStore
支援 prefix
選項,以允許在同一 Redis 伺服器上區分不同的儲存例項。
Redis 通道訊息儲存
前面展示的 RedisMessageStore
將每個組維護為一個單獨鍵(組 ID)下的值。雖然您可以使用它來支援 QueueChannel
進行持久化,但為此目的提供了一個專門的 RedisChannelMessageStore
(從 4.0 版本開始)。此儲存為每個通道使用一個 LIST
,傳送訊息時使用 LPUSH
,接收訊息時使用 RPOP
。預設情況下,此儲存也使用 JDK 序列化,但您可以修改值序列化器,如前面所述。
我們建議使用此儲存來支援通道,而不是使用通用的 RedisMessageStore
。以下示例定義了一個 Redis 訊息儲存,並在帶有佇列的通道中使用了它
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="redisMessageStore"/>
<int:channel>
用於儲存資料的鍵採用以下形式:<storeBeanName>:<channelId>
(在前面的示例中為 redisMessageStore:somePersistentQueueChannel
)。
此外,還提供了一個子類 RedisChannelPriorityMessageStore
。當您將其與 QueueChannel
一起使用時,訊息會按照(FIFO)優先順序順序接收。它使用標準的 IntegrationMessageHeaderAccessor.PRIORITY
頭部,並支援優先順序值(0 - 9
)。具有其他優先順序(以及沒有優先順序)的訊息在所有具有優先順序的訊息之後按 FIFO 順序檢索。
這些儲存僅實現了 BasicMessageGroupStore ,而未實現 MessageGroupStore 。它們只能用於支援 QueueChannel 等情況。 |
Redis 元資料儲存
Spring Integration 3.0 引入了一個新的基於 Redis 的 MetadataStore
(參見 Metadata Store)實現。您可以使用 RedisMetadataStore
在應用程式重啟後維護 MetadataStore
的狀態。您可以將此新的 MetadataStore
實現與以下介面卡一起使用:
要指示這些介面卡使用新的 RedisMetadataStore
,請宣告一個名為 metadataStore
的 Spring Bean。Feed 入站通道介面卡和 feed 入站通道介面卡都會自動獲取並使用宣告的 RedisMetadataStore
。以下示例展示瞭如何宣告這樣的 Bean:
<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
<constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
RedisMetadataStore
由 RedisProperties
提供支援。與之互動使用 BoundHashOperations
,這反過來要求整個 Properties
儲存需要一個 key
。對於 MetadataStore
,這個 key
扮演著區域(region)的角色,這在分散式環境中,當多個應用程式使用同一個 Redis 伺服器時非常有用。預設情況下,這個 key
的值為 MetaData
。
從版本 4.0 開始,這個儲存實現了 ConcurrentMetadataStore
,使其能夠在多個應用程式例項之間可靠地共享,其中只有一個例項被允許儲存或修改鍵的值。
您不能在 Redis 叢集中使用 RedisMetadataStore.replace() (例如,在 AbstractPersistentAcceptOnceFileListFilter 中),因為目前不支援用於原子性的 WATCH 命令。 |
Redis Store 入站通道介面卡
Redis 儲存入站通道介面卡是一個輪詢消費者(polling consumer),它從 Redis 集合讀取資料並將其作為 Message
載荷傳送。以下示例展示瞭如何配置 Redis 儲存入站通道介面卡:
<int-redis:store-inbound-channel-adapter id="listAdapter"
connection-factory="redisConnectionFactory"
key="myCollection"
channel="redisChannel"
collection-type="LIST" >
<int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>
前面的示例展示瞭如何使用 store-inbound-channel-adapter
元素配置 Redis 儲存入站通道介面卡,併為各種屬性提供值,例如:
-
key
或key-expression
:所使用的集合的鍵名。 -
collection-type
:此介面卡支援的集合型別的列舉。支援的集合型別有LIST
、SET
、ZSET
、PROPERTIES
和MAP
。 -
connection-factory
:對o.s.data.redis.connection.RedisConnectionFactory
例項的引用。 -
redis-template
:對o.s.data.redis.core.RedisTemplate
例項的引用。 -
所有其他入站介面卡通用的其他屬性(例如 'channel')。
您不能同時設定 redis-template 和 connection-factory 。 |
預設情況下,介面卡使用
|
由於 key
具有字面值,前面的示例相對簡單和靜態。有時,您可能需要在執行時根據某些條件更改鍵的值。為此,請改用 key-expression
,其中提供的表示式可以是任何有效的 SpEL 表示式。
此外,您可能希望對從 Redis 集合讀取的成功處理的資料執行一些後處理。例如,您可能希望在處理後移動或刪除該值。您可以使用 Spring Integration 2.2 新增的事務同步功能來執行此操作。以下示例使用 key-expression
和事務同步:
<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
connection-factory="redisConnectionFactory"
key-expression="'presidents'"
channel="otherRedisChannel"
auto-startup="false"
collection-type="ZSET">
<int:poller fixed-rate="1000" max-messages-per-poll="2">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-redis:store-inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
您可以使用 transactional
元素將輪詢器宣告為事務性的。此元素可以引用真實的事務管理器(例如,如果您的流程的某些其他部分呼叫 JDBC)。如果您沒有“真實”事務,則可以使用 o.s.i.transaction.PseudoTransactionManager
,它是 Spring 的 PlatformTransactionManager
的實現,並在沒有實際事務時啟用 Redis 介面卡的事務同步功能。
這並不會使 Redis 活動本身具有事務性。它允許在成功(提交)之前或之後,或者在失敗(回滾)之後進行操作的同步。 |
一旦您的輪詢器是事務性的,您可以在 transactional
元素上設定一個 o.s.i.transaction.TransactionSynchronizationFactory
例項。TransactionSynchronizationFactory
建立 TransactionSynchronization
例項。為了方便起見,我們公開了一個預設的基於 SpEL 的 TransactionSynchronizationFactory
,它允許您配置 SpEL 表示式,並將其執行與事務協調(同步)。支援 before-commit、after-commit 和 after-rollback 的表示式,以及傳送評估結果(如果有)的通道(每種事件一個)。對於每個子元素,您可以指定 expression
和 channel
屬性。如果只存在 channel
屬性,接收到的訊息將作為特定同步場景的一部分發送到該通道。如果只存在 expression
屬性並且表示式結果為非空值,則會生成一個以結果作為載荷的訊息,併發送到預設通道(NullChannel
)並在日誌中顯示(在 DEBUG
級別)。如果您希望評估結果傳送到特定通道,請新增 channel
屬性。如果表示式結果為 null 或 void,則不生成訊息。
RedisStoreMessageSource
添加了一個 store
屬性,其中包含繫結到事務 IntegrationResourceHolder
的 RedisStore
例項,可以從 TransactionSynchronizationProcessor
實現中訪問該例項。
有關事務同步的更多資訊,請參閱 事務同步。
RedisStore 出站通道介面卡
RedisStore 出站通道介面卡允許您將訊息載荷寫入 Redis 集合,如下面的示例所示:
<int-redis:store-outbound-channel-adapter id="redisListAdapter"
collection-type="LIST"
channel="requestChannel"
key="myCollection" />
前面的配置透過使用 store-inbound-channel-adapter
元素配置了 Redis 儲存出站通道介面卡。它提供了各種屬性的值,例如:
-
key
或key-expression
:所使用的集合的鍵名。 -
extract-payload-elements
:如果設定為true
(預設值)且載荷是“多值”物件(即Collection
或Map
)的例項,則使用“addAll”和“putAll”語義儲存。否則,如果設定為false
,無論載荷的型別如何,都將作為單個條目儲存。如果載荷不是“多值”物件的例項,則忽略此屬性的值,載荷始終作為單個條目儲存。 -
collection-type
:此介面卡支援的Collection
型別列舉。支援的集合型別有LIST
、SET
、ZSET
、PROPERTIES
和MAP
。 -
map-key-expression
:SpEL 表示式,返回儲存條目的鍵名。它僅在collection-type
是MAP
或PROPERTIES
且 'extract-payload-elements' 為 false 時應用。 -
connection-factory
:對o.s.data.redis.connection.RedisConnectionFactory
例項的引用。 -
redis-template
:對o.s.data.redis.core.RedisTemplate
例項的引用。 -
所有其他入站介面卡通用的其他屬性(例如 'channel')。
您不能同時設定 redis-template 和 connection-factory 。 |
預設情況下,介面卡使用 StringRedisTemplate 。它對鍵、值、雜湊鍵和雜湊值使用 StringRedisSerializer 例項。但是,如果 extract-payload-elements 設定為 false ,則將使用一個 RedisTemplate ,該 RedisTemplate 對鍵和雜湊鍵使用 StringRedisSerializer 例項,對值和雜湊值使用 JdkSerializationRedisSerializer 例項。使用 JDK 序列化器時,重要的是要理解,無論值是否實際上是集合,都對所有值使用 Java 序列化。如果您需要對值的序列化進行更多控制,請考慮提供自己的 RedisTemplate ,而不是依賴這些預設設定。 |
由於 key
和其他屬性具有字面值,前面的示例相對簡單和靜態。有時,您可能需要在執行時根據某些條件動態更改值。為此,請使用它們的 -expression
等價物(key-expression
、map-key-expression
等),其中提供的表示式可以是任何有效的 SpEL 表示式。
Redis 出站命令閘道器
Spring Integration 4.0 引入了 Redis 命令閘道器,允許您使用通用的 RedisConnection#execute
方法執行任何標準 Redis 命令。以下列表顯示了 Redis 出站閘道器的可用屬性:
<int-redis:outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
redis-template="" (6)
arguments-serializer="" (7)
command-expression="" (8)
argument-expressions="" (9)
use-command-variable="" (10)
arguments-strategy="" /> (11)
1 | 該端點從中接收 Message 例項的 MessageChannel 。 |
2 | 此端點發送回復 Message 例項的 MessageChannel 。 |
3 | 指定此出站閘道器是否必須返回非 null 值。預設為 true 。當 Redis 返回 null 值時,會丟擲 ReplyRequiredException 。 |
4 | 等待發送回復訊息的超時時間(以毫秒為單位)。通常用於基於佇列的有限回覆通道。 |
5 | 對 RedisConnectionFactory Bean 的引用。預設為 redisConnectionFactory 。它與 'redis-template' 屬性互斥。 |
6 | 對 RedisTemplate Bean 的引用。它與 'connection-factory' 屬性互斥。 |
7 | 對 org.springframework.data.redis.serializer.RedisSerializer 例項的引用。如有必要,它用於將每個命令引數序列化為 byte[]。 |
8 | 返回命令鍵的 SpEL 表示式。預設為 redis_command 訊息頭。它的評估結果不得為 null 。 |
9 | 逗號分隔的 SpEL 表示式,評估後作為命令引數。與 arguments-strategy 屬性互斥。如果兩個屬性都不提供,則使用 payload 作為命令引數。引數表示式可以評估為 'null' 以支援可變數量的引數。 |
10 | 一個 boolean 標誌,用於指定當配置了 argument-expressions 時,評估後的 Redis 命令字串是否在 o.s.i.redis.outbound.ExpressionArgumentsStrategy 中的表示式評估上下文中作為 #cmd 變數可用。否則,此屬性將被忽略。 |
11 | 對 o.s.i.redis.outbound.ArgumentsStrategy 例項的引用。它與 argument-expressions 屬性互斥。如果兩個屬性都不提供,則使用 payload 作為命令引數。 |
您可以使用 <int-redis:outbound-gateway>
作為通用元件來執行任何所需的 Redis 操作。以下示例展示瞭如何從 Redis 原子數獲取遞增的值:
<int-redis:outbound-gateway request-channel="requestChannel"
reply-channel="replyChannel"
command-expression="'INCR'"/>
Message
載荷的名稱應為 redisCounter
,該名稱可由 org.springframework.data.redis.support.atomic.RedisAtomicInteger
bean 定義提供。
RedisConnection#execute
方法的返回型別是通用的 Object
。實際結果取決於命令型別。例如,MGET
返回 List<byte[]>
。有關命令、其引數和結果型別的更多資訊,請參閱 Redis 規範。
Redis 隊列出站閘道器
Spring Integration 引入了 Redis 隊列出站閘道器,用於執行請求和回覆場景。它將對話 UUID
推送到提供的 queue
,將以該 UUID
為鍵的值推送到 Redis 列表,並等待來自鍵為 UUID
加上 .reply
的 Redis 列表的回覆。每次互動使用不同的 UUID。以下列表顯示了 Redis 出站閘道器的可用屬性:
<int-redis:queue-outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
extract-payload=""/> (9)
1 | 該端點從中接收 Message 例項的 MessageChannel 。 |
2 | 此端點發送回復 Message 例項的 MessageChannel 。 |
3 | 指定此出站閘道器是否必須返回非 null 值。此值預設為 false 。否則,當 Redis 返回 null 值時,會丟擲 ReplyRequiredException 。 |
4 | 等待發送回復訊息的超時時間(以毫秒為單位)。通常用於基於佇列的有限回覆通道。 |
5 | 對 RedisConnectionFactory Bean 的引用。預設為 redisConnectionFactory 。它與 'redis-template' 屬性互斥。 |
6 | 出站閘道器向其傳送對話 UUID 的 Redis 列表的名稱。 |
7 | 當註冊多個閘道器時,此出站閘道器的順序。 |
8 | RedisSerializer Bean 引用。它可以是空字串,表示“沒有序列化器”。在這種情況下,來自入站 Redis 訊息的原始 byte[] 作為 Message 載荷傳送到 channel 。預設情況下,它是 JdkSerializationRedisSerializer 。 |
9 | 指定此端點是否期望 Redis 佇列中的資料包含完整的 Message 例項。如果此屬性設定為 true ,則 serializer 不能是空字串,因為訊息需要某種形式的反序列化(預設為 JDK 序列化)。 |
Redis 佇列入站閘道器
Spring Integration 4.1 引入了 Redis 佇列入站閘道器,用於執行請求和回覆場景。它從提供的 queue
中彈出一個對話 UUID
,從 Redis 列表中彈出以該 UUID
為鍵的值,並將回覆推送到鍵為 UUID
加上 .reply
的 Redis 列表。以下列表顯示了 Redis 佇列入站閘道器的可用屬性:
<int-redis:queue-inbound-gateway
request-channel="" (1)
reply-channel="" (2)
executor="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
receive-timeout="" (9)
expect-message="" (10)
recovery-interval=""/> (11)
1 | 此端點向其傳送從 Redis 資料建立的 Message 例項的 MessageChannel 。 |
2 | 此端點等待回覆 Message 例項的 MessageChannel 。可選 - replyChannel 訊息頭仍然在使用。 |
3 | 對 Spring TaskExecutor (或標準 JDK Executor )Bean 的引用。它用於底層的監聽任務。預設為 SimpleAsyncTaskExecutor 。 |
4 | 等待發送回復訊息的超時時間(以毫秒為單位)。通常用於基於佇列的有限回覆通道。 |
5 | 對 RedisConnectionFactory Bean 的引用。預設為 redisConnectionFactory 。它與 'redis-template' 屬性互斥。 |
6 | 用於對話 UUID 的 Redis 列表的名稱。 |
7 | 當註冊多個閘道器時,此入站閘道器的順序。 |
8 | RedisSerializer Bean 引用。它可以是空字串,表示“沒有序列化器”。在這種情況下,來自入站 Redis 訊息的原始 byte[] 作為 Message 載荷傳送到 channel 。預設為 JdkSerializationRedisSerializer 。(注意,在版本 4.3 之前的版本中,它預設為 StringRedisSerializer 。要恢復該行為,請提供對 StringRedisSerializer 的引用。) |
9 | 等待接收訊息被獲取的超時時間(以毫秒為單位)。通常用於基於佇列的有限請求通道。 |
10 | 指定此端點是否期望 Redis 佇列中的資料包含完整的 Message 例項。如果此屬性設定為 true ,則 serializer 不能是空字串,因為訊息需要某種形式的反序列化(預設為 JDK 序列化)。 |
11 | 監聽任務在“right pop”操作出現異常後,在重新啟動監聽任務之前應該睡眠的時間(以毫秒為單位)。 |
task-executor 必須配置多個執行緒進行處理;否則,當 RedisQueueMessageDrivenEndpoint 嘗試在錯誤後重新啟動監聽任務時,可能會發生死鎖。可以使用 errorChannel 處理這些錯誤以避免重啟,但最好不要讓應用程式暴露在可能的死鎖情況中。有關可能的 TaskExecutor 實現,請參閱 Spring Framework 參考手冊。 |
Redis Stream 出站通道介面卡
Spring Integration 5.4 引入了 Reactive Redis Stream 出站通道介面卡,用於將訊息載荷寫入 Redis Stream。出站通道介面卡使用 ReactiveStreamOperations.add(…)
將 Record
新增到流中。以下示例展示瞭如何為 Redis Stream 出站通道介面卡使用 Java 配置和服務類。
@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); (1)
reactiveStreamMessageHandler.setSerializationContext(serializationContext); (2)
reactiveStreamMessageHandler.setHashMapper(hashMapper); (3)
reactiveStreamMessageHandler.setExtractPayload(true); (4)
return reactiveStreamMessageHandler;
}
1 | 使用 ReactiveRedisConnectionFactory 和流名稱構造 ReactiveRedisStreamMessageHandler 例項以新增記錄。另一個建構函式變體基於 SpEL 表示式,用於根據請求訊息評估流鍵。 |
2 | 設定一個 RedisSerializationContext ,用於在將記錄鍵和值新增到流之前對其進行序列化。 |
3 | 設定 HashMapper ,它提供 Java 型別與 Redis 雜湊/對映之間的契約。 |
4 | 如果為 'true',通道介面卡將從請求訊息中提取載荷作為要新增的流記錄。否則,將整個訊息用作值。預設為 true 。 |
Redis Stream 入站通道介面卡
Spring Integration 5.4 引入了 Reactive Stream 入站通道介面卡,用於從 Redis Stream 讀取訊息。入站通道介面卡根據自動確認標誌使用 StreamReceiver.receive(…)
或 StreamReceiver.receiveAutoAck()
從 Redis Stream 讀取記錄。以下示例展示瞭如何為 Redis Stream 入站通道介面卡使用 Java 配置。
@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); (1)
messageProducer.setStreamReceiverOptions( (2)
StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.build());
messageProducer.setAutoStartup(true); (3)
messageProducer.setAutoAck(false); (4)
messageProducer.setCreateConsumerGroup(true); (5)
messageProducer.setConsumerGroup("my-group"); (6)
messageProducer.setConsumerName("my-consumer"); (7)
messageProducer.setOutputChannel(fromRedisStreamChannel); (8)
messageProducer.setReadOffset(ReadOffset.latest()); (9)
messageProducer.extractPayload(true); (10)
return messageProducer;
}
1 | 使用 ReactiveRedisConnectionFactory 和流鍵構造 ReactiveRedisStreamMessageProducer 例項以讀取記錄。 |
2 | 一個 StreamReceiver.StreamReceiverOptions ,用於使用反應式基礎設施消費 Redis 流。 |
3 | 一個 SmartLifecycle 屬性,用於指定此端點是否應在應用程式上下文啟動後自動啟動。預設為 true 。如果為 false ,則應手動啟動 RedisStreamMessageProducer :messageProducer.start() 。 |
4 | 如果為 false ,則接收到的訊息不會自動確認。訊息的確認將延遲到消費訊息的客戶端。預設為 true 。 |
5 | 如果為 true ,將建立一個消費者組。在建立消費者組期間,如果流尚不存在,也會建立流。消費者組跟蹤訊息傳遞並區分消費者。預設為 false 。 |
6 | 設定消費者組名稱。預設為定義的 Bean 名稱。 |
7 | 設定消費者名稱。從組 my-group 以 my-consumer 讀取訊息。 |
8 | 此端點將訊息傳送到的訊息通道。 |
9 | 定義讀取訊息的偏移量。預設為 ReadOffset.latest() 。 |
10 | 如果為 'true',通道介面卡將從 Record 中提取載荷值。否則,將整個 Record 用作載荷。預設為 true 。 |
如果 autoAck
設定為 false
,Redis Stream 中的 Record
不會被 Redis 驅動程式自動確認,而是在要生成的訊息中新增一個 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
訊息頭,其值為 SimpleAcknowledgment
例項。當基於該記錄的訊息的業務邏輯完成後,目標整合流負責呼叫其 acknowledge()
回撥。即使在反序列化期間發生異常並且配置了 errorChannel
時,也需要類似的邏輯。因此,目標錯誤處理器必須決定是 ack 還是 nack 該失敗訊息。除了 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
,ReactiveRedisStreamMessageProducer
還會將這些訊息頭填充到要生成的訊息中:RedisHeaders.STREAM_KEY
、RedisHeaders.STREAM_MESSAGE_ID
、RedisHeaders.CONSUMER_GROUP
和 RedisHeaders.CONSUMER
。
從版本 5.5 開始,您可以在 ReactiveRedisStreamMessageProducer
上顯式配置 StreamReceiver.StreamReceiverOptionsBuilder
選項,包括新引入的 onErrorResume
函式,如果在反序列化錯誤發生時 Redis Stream 消費者應繼續輪詢,則需要此函式。預設函式將訊息傳送到錯誤通道(如果提供),並可能對失敗的訊息進行確認,如上所述。所有這些 StreamReceiver.StreamReceiverOptionsBuilder
都與外部提供的 StreamReceiver.StreamReceiverOptions
互斥。
Redis 鎖登錄檔
Spring Integration 4.0 引入了 RedisLockRegistry
。某些元件(例如,聚合器和重排序器)使用從 LockRegistry
例項獲取的鎖,以確保一次只有一個執行緒操作一個組。DefaultLockRegistry
在單個元件內執行此功能。您現在可以在這些元件上配置外部鎖登錄檔。當您將其與共享 MessageGroupStore
一起使用時,可以使用 RedisLockRegistry
在多個應用程式例項之間提供此功能,這樣一次只有一個例項可以操作該組。
當本地執行緒釋放鎖時,另一個本地執行緒通常可以立即獲取鎖。如果使用不同登錄檔例項的執行緒釋放鎖,則可能需要長達 100ms 才能獲取鎖。
為避免“掛起”的鎖(當伺服器故障時),此登錄檔中的鎖在預設 60 秒後過期,但您可以在登錄檔上配置此值。鎖通常只持有更短的時間。
由於鍵可能會過期,嘗試解鎖已過期的鎖會導致丟擲異常。但是,受此類鎖保護的資源可能已遭到破壞,因此應將此類異常視為嚴重情況。您應將過期時間設定得足夠大以防止這種情況發生,但也要設定得足夠低,以便在伺服器故障後能在合理的時間內恢復鎖。 |
從版本 5.0 開始,RedisLockRegistry
實現了 ExpirableLockRegistry
,它會移除距離上次獲取時間超過 age
且當前未被鎖定的鎖。
從版本 5.5.6 開始,RedisLockRegistry
支援透過 RedisLockRegistry.setCacheCapacity()
自動清理 RedisLockRegistry.locks
中 redis 鎖的快取。有關更多資訊,請參閱其 JavaDocs。
從版本 5.5.13 開始,RedisLockRegistry
暴露了一個 setRedisLockType(RedisLockType)
選項,用於確定應以哪種模式進行 Redis 鎖獲取:
-
RedisLockType.SPIN_LOCK
- 透過週期性迴圈(100ms)檢查是否可以獲取鎖來獲取鎖。預設值。 -
RedisLockType.PUB_SUB_LOCK
- 透過 Redis 釋出/訂閱(pub-sub)訂閱來獲取鎖。
釋出/訂閱是首選模式 - 客戶端與 Redis 伺服器之間的網路通訊更少,效能更高 - 當訂閱在其他程序中收到解鎖通知時,鎖會立即獲取。但是,Redis 在主/副本連線(例如在 AWS ElastiCache 環境中)中不支援釋出/訂閱,因此預設選擇忙等待模式,以使登錄檔在任何環境中都能工作。
從版本 6.4 開始,RedisLockRegistry.RedisLock.unlock()
方法不再丟擲 IllegalStateException
,而是丟擲 ConcurrentModificationException
,如果鎖的所有權已過期。
從版本 6.4 開始,添加了 RedisLockRegistry.setRenewalTaskScheduler()
以配置用於定期續訂鎖的排程器。設定後,鎖成功獲取後,將每隔過期時間的 1/3
自動續訂,直到解鎖或 Redis 鍵被移除。