JDBC 訊息儲存

Spring Integration 提供了兩種 JDBC 特定的訊息儲存實現。JdbcMessageStore 適用於聚合器和認領單模式。JdbcChannelMessageStore 實現則為訊息通道提供了更具針對性和可擴充套件性的實現。

請注意,你可以使用 JdbcMessageStore 來支援訊息通道,但 JdbcChannelMessageStore 是為此目的而最佳化的。

從 5.0.11、5.1.2 版本開始,JdbcChannelMessageStore 的索引已得到最佳化。如果你的儲存中有大型訊息組,你可能希望修改索引。此外,PriorityChannel 的索引已被註釋掉,因為除非你使用由 JDBC 支援的此類通道,否則不需要該索引。
當使用 OracleChannelMessageStoreQueryProvider 時,必須新增優先順序通道索引,因為它包含在查詢的提示中。

初始化資料庫

在使用 JDBC 訊息儲存元件之前,你應該為目標資料庫配置適當的物件。

Spring Integration 附帶了一些可用於初始化資料庫的示例指令碼。在 spring-integration-jdbc JAR 檔案中,你可以在 org.springframework.integration.jdbc 包中找到指令碼。它為一系列常見資料庫平臺提供了建立和刪除指令碼示例。使用這些指令碼的一種常見方法是在Spring JDBC 資料來源初始化器中引用它們。請注意,這些指令碼作為示例和所需表及列名的規範提供。你可能需要對其進行增強以用於生產環境(例如,新增索引宣告)。

從版本 6.2 開始,JdbcMessageStoreJdbcChannelMessageStoreJdbcMetadataStoreDefaultLockRepository 實現了 SmartLifecycle,並在 start() 方法中對其各自的表執行 SELECT COUNT 查詢,以確保目標資料庫中存在所需的表(根據提供的表字首)。如果所需的表不存在,應用程式上下文將無法啟動。可以透過 setCheckDatabaseOnStart(false) 停用此檢查。

通用 JDBC 訊息儲存

JDBC 模組提供了一個由資料庫支援的 Spring Integration MessageStore(在認領單模式中很重要)和 MessageGroupStore(在聚合器等有狀態模式中很重要)的實現。JdbcMessageStore 實現了這兩個介面,並且支援在 XML 中配置儲存例項,示例如下

<int-jdbc:message-store id="messageStore" data-source="dataSource"/>

你可以指定一個 JdbcTemplate 而不是 DataSource

以下示例顯示了一些其他可選屬性

<int-jdbc:message-store id="messageStore" data-source="dataSource" table-prefix="MY_INT_"/>

在前面的示例中,我們為儲存生成的查詢中的表名指定了一個字首。表名預設字首為 INT_

支援訊息通道

如果你打算使用 JDBC 支援訊息通道,我們建議使用 JdbcChannelMessageStore 實現。它僅與訊息通道一起使用。

支援的資料庫

JdbcChannelMessageStore 使用資料庫特定的 SQL 查詢從資料庫檢索訊息。因此,你必須在 JdbcChannelMessageStore 上設定 ChannelMessageStoreQueryProvider 屬性。此 channelMessageStoreQueryProvider 為你指定的特定資料庫提供 SQL 查詢。Spring Integration 支援以下關係資料庫

  • PostgreSQL

  • HSQLDB

  • MySQL

  • Oracle

  • Derby

  • H2

  • SqlServer

  • Sybase

  • DB2

如果你的資料庫未列出,你可以實現 ChannelMessageStoreQueryProvider 介面並提供你自己的自定義查詢。

版本 4.0 在表中添加了 MESSAGE_SEQUENCE 列,以確保即使在同一毫秒記憶體儲訊息時也能實現先進先出 (FIFO) 佇列。

從版本 6.2 開始,ChannelMessageStoreQueryProvider 公開了 isSingleStatementForPoll 標誌,其中 PostgresChannelMessageStoreQueryProvider 返回 true,並且其輪詢查詢現在基於單個 DELETE…​RETURNING 語句。如果僅支援單個輪詢語句,JdbcChannelMessageStore 會參考 isSingleStatementForPoll 選項並跳過單獨的 DELETE 語句。

自定義訊息插入

從版本 5.0 開始,透過過載 ChannelMessageStorePreparedStatementSetter 類,你可以在 JdbcChannelMessageStore 中提供自定義的訊息插入實現。你可以使用它來設定不同的列,更改表結構或序列化策略。例如,你可以將其結構儲存為 JSON 字串,而不是預設序列化為 byte[]

以下示例使用 setValues 的預設實現來儲存常用列,並覆蓋行為以將訊息載荷儲存為 varchar

public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {

    @Override
    public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
        Object groupId, String region, 	boolean priorityEnabled) throws SQLException {
        // Populate common columns
        super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
        // Store message payload as varchar
        preparedStatement.setString(6, requestMessage.getPayload().toString());
    }
}

通常,我們不建議使用關係資料庫作為佇列。相反,如果可能,請考慮使用 JMS 或 AMQP 支援的通道。更多參考,請參閱以下資源

如果你仍然計劃將資料庫用作佇列,請考慮使用 PostgreSQL 及其通知機制,這將在後續章節中介紹。

併發輪詢

在輪詢訊息通道時,你可以選擇使用 TaskExecutor 引用配置關聯的 Poller

但請記住,如果你使用由 JDBC 支援的訊息通道,並且計劃使用多執行緒以事務方式輪詢通道和訊息儲存,你應該確保使用支援多版本併發控制 (MVCC) 的關係資料庫。否則,鎖定可能是一個問題,並且在使用多執行緒時的效能可能無法達到預期。例如,Apache Derby 在這方面存在問題。

為了獲得更好的 JDBC 佇列吞吐量,並避免在不同執行緒可能從佇列輪詢同一 Message 時出現問題,在使用不支援 MVCC 的資料庫時,設定 JdbcChannelMessageStoreusingIdCache 屬性為 true 是很重要的。以下示例顯示瞭如何操作

<bean id="queryProvider"
    class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>

<task:executor id="pool" pool-size="10"
    queue-capacity="10" rejection-policy="CALLER_RUNS" />

<bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
    <property name="region" value="TX_TIMEOUT"/>
    <property name="usingIdCache" value="true"/>
</bean>

<int:channel id="inputChannel">
    <int:queue message-store="store"/>
</int:channel>

<int:bridge input-channel="inputChannel" output-channel="outputChannel">
    <int:poller fixed-delay="500" receive-timeout="500"
        max-messages-per-poll="1" task-executor="pool">
        <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
        isolation="READ_COMMITTED" transaction-manager="transactionManager" />
    </int:poller>
</int:bridge>

<int:channel id="outputChannel" />

優先順序通道

從版本 4.0 開始,JdbcChannelMessageStore 實現了 PriorityCapableChannelMessageStore 並提供了 priorityEnabled 選項,使其可以用作 priority-queue 例項的 message-store 引用。為此,INT_CHANNEL_MESSAGE 表包含一個 MESSAGE_PRIORITY 列來儲存 PRIORITY 訊息頭的值。此外,一個新的 MESSAGE_SEQUENCE 列使我們能夠實現健壯的先進先出 (FIFO) 輪詢機制,即使在同一毫秒記憶體儲具有相同優先順序的多個訊息時也是如此。訊息從資料庫中按 order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE 順序輪詢(選擇)。

我們不建議將同一個 JdbcChannelMessageStore bean 用於優先順序和非優先順序佇列通道,因為 priorityEnabled 選項應用於整個儲存,並且無法為佇列通道保留正確的 FIFO 佇列語義。但是,可以使用同一個 INT_CHANNEL_MESSAGE 表(甚至 region)來支援兩種 JdbcChannelMessageStore 型別。要配置這種情況,你可以像以下示例所示,從一個訊息儲存 bean 擴充套件另一個。
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

分割槽訊息儲存

通常,將 JdbcMessageStore 用作一組應用程式或同一應用程式中節點的全域性儲存。為了提供一些防止命名衝突的保護並控制資料庫元資料配置,訊息儲存允許以兩種方式對錶進行分割槽。一種方法是透過更改字首來使用單獨的表名(如前所述)。另一種方法是指定 region 名稱來在單個表中對資料進行分割槽。第二種方法的一個重要用例是當 MessageStore 管理支援 Spring Integration 訊息通道的持久化佇列時。持久化通道的訊息資料在儲存中以通道名稱為鍵。因此,如果通道名稱不是全域性唯一的,通道可能會獲取並非為其準備的資料。為了避免這種危險,你可以使用訊息儲存 region 來為具有相同邏輯名稱的不同物理通道分開資料。

PostgreSQL:接收推送通知

PostgreSQL 提供了一個監聽和通知框架,用於在資料庫表操作時接收推送通知。Spring Integration 利用此機制(從版本 6.0 開始)來允許在向 JdbcChannelMessageStore 新增新訊息時接收推送通知。使用此功能時,必須定義一個數據庫觸發器,該觸發器可以在 spring-integration-jdbc 模組中包含的 schema-postgresql.sql 檔案的註釋中找到。

推送通知透過 PostgresChannelMessageTableSubscriber 類接收,該類允許其訂閱者在任何給定的 regiongroupId 有新訊息到達時接收回調。即使訊息是在不同的 JVM 上追加到同一個資料庫中,也能收到這些通知。PostgresSubscribableChannel 實現使用 PostgresChannelMessageTableSubscriber.Subscription 契約來從儲存中拉取訊息,作為對上述 PostgresChannelMessageTableSubscriber 通知的回應。

例如,可以按如下方式接收某些組的推送通知

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
    messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
    return messageStore;
}

@Bean
public PostgresChannelMessageTableSubscriber subscriber(
      @Value("${spring.datasource.url}") String url,
      @Value("${spring.datasource.username}") String username,
      @Value("${spring.datasource.password}") String password) {
    return new PostgresChannelMessageTableSubscriber(() ->
        DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}

@Bean
public PostgresSubscribableChannel channel(
    PostgresChannelMessageTableSubscriber subscriber,
    JdbcChannelMessageStore messageStore) {
  return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}

事務支援

從版本 6.0.5 開始,在 PostgresSubscribableChannel 上指定 PlatformTransactionManager 將在事務中通知訂閱者。訂閱者中的異常將導致事務回滾並將訊息放回訊息儲存中。事務支援預設不啟用。

重試

從版本 6.0.5 開始,透過向 PostgresSubscribableChannel 提供 RetryTemplate,可以指定重試策略。預設情況下,不執行重試。

任何活動的 PostgresChannelMessageTableSubscriber 在其活動生命週期內佔用一個獨佔的 JDBC Connection。因此,此連線不應來自連線池 DataSource,這一點很重要。此類連線池通常期望發出的連線在預定義超時視窗內關閉。

對於獨佔連線的這種需求,也建議一個 JVM 只執行一個 PostgresChannelMessageTableSubscriber,該 Subscriber 可用於註冊任意數量的訂閱。