JDBC 訊息儲存
Spring Integration 提供了兩種 JDBC 特定的訊息儲存實現。JdbcMessageStore
適用於聚合器和認領單模式。JdbcChannelMessageStore
實現則為訊息通道提供了更具針對性和可擴充套件性的實現。
請注意,你可以使用 JdbcMessageStore
來支援訊息通道,但 JdbcChannelMessageStore
是為此目的而最佳化的。
從 5.0.11、5.1.2 版本開始,JdbcChannelMessageStore 的索引已得到最佳化。如果你的儲存中有大型訊息組,你可能希望修改索引。此外,PriorityChannel 的索引已被註釋掉,因為除非你使用由 JDBC 支援的此類通道,否則不需要該索引。 |
當使用 OracleChannelMessageStoreQueryProvider 時,必須新增優先順序通道索引,因為它包含在查詢的提示中。 |
初始化資料庫
在使用 JDBC 訊息儲存元件之前,你應該為目標資料庫配置適當的物件。
Spring Integration 附帶了一些可用於初始化資料庫的示例指令碼。在 spring-integration-jdbc
JAR 檔案中,你可以在 org.springframework.integration.jdbc
包中找到指令碼。它為一系列常見資料庫平臺提供了建立和刪除指令碼示例。使用這些指令碼的一種常見方法是在Spring JDBC 資料來源初始化器中引用它們。請注意,這些指令碼作為示例和所需表及列名的規範提供。你可能需要對其進行增強以用於生產環境(例如,新增索引宣告)。
從版本 6.2 開始,JdbcMessageStore
、JdbcChannelMessageStore
、JdbcMetadataStore
和 DefaultLockRepository
實現了 SmartLifecycle
,並在 start()
方法中對其各自的表執行 SELECT COUNT
查詢,以確保目標資料庫中存在所需的表(根據提供的表字首)。如果所需的表不存在,應用程式上下文將無法啟動。可以透過 setCheckDatabaseOnStart(false)
停用此檢查。
通用 JDBC 訊息儲存
JDBC 模組提供了一個由資料庫支援的 Spring Integration MessageStore
(在認領單模式中很重要)和 MessageGroupStore
(在聚合器等有狀態模式中很重要)的實現。JdbcMessageStore
實現了這兩個介面,並且支援在 XML 中配置儲存例項,示例如下
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
你可以指定一個 JdbcTemplate
而不是 DataSource
。
以下示例顯示了一些其他可選屬性
<int-jdbc:message-store id="messageStore" data-source="dataSource" table-prefix="MY_INT_"/>
在前面的示例中,我們為儲存生成的查詢中的表名指定了一個字首。表名預設字首為 INT_
。
支援訊息通道
如果你打算使用 JDBC 支援訊息通道,我們建議使用 JdbcChannelMessageStore
實現。它僅與訊息通道一起使用。
支援的資料庫
JdbcChannelMessageStore
使用資料庫特定的 SQL 查詢從資料庫檢索訊息。因此,你必須在 JdbcChannelMessageStore
上設定 ChannelMessageStoreQueryProvider
屬性。此 channelMessageStoreQueryProvider
為你指定的特定資料庫提供 SQL 查詢。Spring Integration 支援以下關係資料庫
-
PostgreSQL
-
HSQLDB
-
MySQL
-
Oracle
-
Derby
-
H2
-
SqlServer
-
Sybase
-
DB2
如果你的資料庫未列出,你可以實現 ChannelMessageStoreQueryProvider
介面並提供你自己的自定義查詢。
版本 4.0 在表中添加了 MESSAGE_SEQUENCE
列,以確保即使在同一毫秒記憶體儲訊息時也能實現先進先出 (FIFO) 佇列。
從版本 6.2 開始,ChannelMessageStoreQueryProvider
公開了 isSingleStatementForPoll
標誌,其中 PostgresChannelMessageStoreQueryProvider
返回 true
,並且其輪詢查詢現在基於單個 DELETE…RETURNING
語句。如果僅支援單個輪詢語句,JdbcChannelMessageStore
會參考 isSingleStatementForPoll
選項並跳過單獨的 DELETE
語句。
自定義訊息插入
從版本 5.0 開始,透過過載 ChannelMessageStorePreparedStatementSetter
類,你可以在 JdbcChannelMessageStore
中提供自定義的訊息插入實現。你可以使用它來設定不同的列,更改表結構或序列化策略。例如,你可以將其結構儲存為 JSON 字串,而不是預設序列化為 byte[]
。
以下示例使用 setValues
的預設實現來儲存常用列,並覆蓋行為以將訊息載荷儲存為 varchar
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {
@Override
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
Object groupId, String region, boolean priorityEnabled) throws SQLException {
// Populate common columns
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
// Store message payload as varchar
preparedStatement.setString(6, requestMessage.getPayload().toString());
}
}
通常,我們不建議使用關係資料庫作為佇列。相反,如果可能,請考慮使用 JMS 或 AMQP 支援的通道。更多參考,請參閱以下資源 如果你仍然計劃將資料庫用作佇列,請考慮使用 PostgreSQL 及其通知機制,這將在後續章節中介紹。 |
併發輪詢
在輪詢訊息通道時,你可以選擇使用 TaskExecutor
引用配置關聯的 Poller
。
但請記住,如果你使用由 JDBC 支援的訊息通道,並且計劃使用多執行緒以事務方式輪詢通道和訊息儲存,你應該確保使用支援多版本併發控制 (MVCC) 的關係資料庫。否則,鎖定可能是一個問題,並且在使用多執行緒時的效能可能無法達到預期。例如,Apache Derby 在這方面存在問題。 為了獲得更好的 JDBC 佇列吞吐量,並避免在不同執行緒可能從佇列輪詢同一
|
優先順序通道
從版本 4.0 開始,JdbcChannelMessageStore
實現了 PriorityCapableChannelMessageStore
並提供了 priorityEnabled
選項,使其可以用作 priority-queue
例項的 message-store
引用。為此,INT_CHANNEL_MESSAGE
表包含一個 MESSAGE_PRIORITY
列來儲存 PRIORITY
訊息頭的值。此外,一個新的 MESSAGE_SEQUENCE
列使我們能夠實現健壯的先進先出 (FIFO) 輪詢機制,即使在同一毫秒記憶體儲具有相同優先順序的多個訊息時也是如此。訊息從資料庫中按 order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
順序輪詢(選擇)。
我們不建議將同一個 JdbcChannelMessageStore bean 用於優先順序和非優先順序佇列通道,因為 priorityEnabled 選項應用於整個儲存,並且無法為佇列通道保留正確的 FIFO 佇列語義。但是,可以使用同一個 INT_CHANNEL_MESSAGE 表(甚至 region )來支援兩種 JdbcChannelMessageStore 型別。要配置這種情況,你可以像以下示例所示,從一個訊息儲存 bean 擴充套件另一個。 |
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
分割槽訊息儲存
通常,將 JdbcMessageStore
用作一組應用程式或同一應用程式中節點的全域性儲存。為了提供一些防止命名衝突的保護並控制資料庫元資料配置,訊息儲存允許以兩種方式對錶進行分割槽。一種方法是透過更改字首來使用單獨的表名(如前所述)。另一種方法是指定 region
名稱來在單個表中對資料進行分割槽。第二種方法的一個重要用例是當 MessageStore
管理支援 Spring Integration 訊息通道的持久化佇列時。持久化通道的訊息資料在儲存中以通道名稱為鍵。因此,如果通道名稱不是全域性唯一的,通道可能會獲取並非為其準備的資料。為了避免這種危險,你可以使用訊息儲存 region
來為具有相同邏輯名稱的不同物理通道分開資料。
PostgreSQL:接收推送通知
PostgreSQL 提供了一個監聽和通知框架,用於在資料庫表操作時接收推送通知。Spring Integration 利用此機制(從版本 6.0 開始)來允許在向 JdbcChannelMessageStore
新增新訊息時接收推送通知。使用此功能時,必須定義一個數據庫觸發器,該觸發器可以在 spring-integration-jdbc 模組中包含的 schema-postgresql.sql
檔案的註釋中找到。
推送通知透過 PostgresChannelMessageTableSubscriber
類接收,該類允許其訂閱者在任何給定的 region
和 groupId
有新訊息到達時接收回調。即使訊息是在不同的 JVM 上追加到同一個資料庫中,也能收到這些通知。PostgresSubscribableChannel
實現使用 PostgresChannelMessageTableSubscriber.Subscription
契約來從儲存中拉取訊息,作為對上述 PostgresChannelMessageTableSubscriber
通知的回應。
例如,可以按如下方式接收某些組的推送通知
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return messageStore;
}
@Bean
public PostgresChannelMessageTableSubscriber subscriber(
@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
return new PostgresChannelMessageTableSubscriber(() ->
DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}
@Bean
public PostgresSubscribableChannel channel(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore) {
return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}
事務支援
從版本 6.0.5 開始,在 PostgresSubscribableChannel
上指定 PlatformTransactionManager
將在事務中通知訂閱者。訂閱者中的異常將導致事務回滾並將訊息放回訊息儲存中。事務支援預設不啟用。
重試
從版本 6.0.5 開始,透過向 PostgresSubscribableChannel
提供 RetryTemplate
,可以指定重試策略。預設情況下,不執行重試。
任何活動的 對於獨佔連線的這種需求,也建議一個 JVM 只執行一個 |