JDBC 訊息儲存

Spring Integration 提供了兩種 JDBC 特定的訊息儲存實現。JdbcMessageStore 適用於聚合器和認領檢查模式。JdbcChannelMessageStore 實現為訊息通道提供了更具針對性和可擴充套件性的實現。

請注意,您可以使用 JdbcMessageStore 作為訊息通道的後端,但 JdbcChannelMessageStore 專為此目的進行了最佳化。

從 5.0.11、5.1.2 版本開始,JdbcChannelMessageStore 的索引已得到最佳化。如果您的儲存中有大型訊息組,您可能希望修改索引。此外,PriorityChannel 的索引已被註釋掉,因為它只有在使用由 JDBC 支援的此類通道時才需要。
使用 OracleChannelMessageStoreQueryProvider 時,**必須**新增優先順序通道索引,因為它包含在查詢提示中。

初始化資料庫

在開始使用 JDBC 訊息儲存元件之前,您應該為目標資料庫提供適當的物件。

Spring Integration 附帶了一些可用於初始化資料庫的示例指令碼。在 spring-integration-jdbc JAR 檔案中,您可以在 org.springframework.integration.jdbc 包中找到指令碼。它為一系列常見的資料庫平臺提供了示例建立和示例刪除指令碼。使用這些指令碼的常見方法是在 Spring JDBC 資料來源初始化器 中引用它們。請注意,這些指令碼作為示例和所需表名和列名的規範提供。您可能會發現需要為生產使用增強它們,例如,透過新增索引宣告。

從 6.2 版本開始,JdbcMessageStoreJdbcChannelMessageStoreJdbcMetadataStoreDefaultLockRepository 都實現了 SmartLifecycle 並在其 start() 方法中對各自的表執行 SELECT COUNT 查詢,以確保所需表(根據提供的 L_ 字首)存在於目標資料庫中。如果所需表不存在,應用程式上下文將無法啟動。可以透過 setCheckDatabaseOnStart(false) 停用此檢查。

通用 JDBC 訊息儲存

JDBC 模組提供了由資料庫支援的 Spring Integration MessageStore(在認領檢查模式中很重要)和 MessageGroupStore(在有狀態模式(如聚合器)中很重要)的實現。這兩個介面都由 JdbcMessageStore 實現,並且支援在 XML 中配置儲存例項,如下例所示

<int-jdbc:message-store id="messageStore" data-source="dataSource"/>

您可以指定一個 JdbcTemplate 而不是 DataSource

以下示例顯示了一些其他可選屬性

<int-jdbc:message-store id="messageStore" data-source="dataSource" table-prefix="MY_INT_"/>

在前面的示例中,我們為儲存生成的查詢中的表名指定了一個字首。表名字首預設為 INT_

支援訊息通道

如果您打算使用 JDBC 支援訊息通道,我們建議使用 JdbcChannelMessageStore 實現。它僅與訊息通道結合使用。

支援的資料庫

JdbcChannelMessageStore 使用特定於資料庫的 SQL 查詢從資料庫中檢索訊息。因此,您必須在 JdbcChannelMessageStore 上設定 ChannelMessageStoreQueryProvider 屬性。此 channelMessageStoreQueryProvider 為您指定的特定資料庫提供 SQL 查詢。Spring Integration 支援以下關係資料庫

  • PostgreSQL

  • HSQLDB

  • MySQL

  • Oracle

  • Derby

  • H2

  • SqlServer

  • Sybase

  • DB2

如果您的資料庫未列出,您可以實現 ChannelMessageStoreQueryProvider 介面並提供自己的自定義查詢。

4.0 版本在表中添加了 MESSAGE_SEQUENCE 列,以確保即使訊息在同一毫秒記憶體儲,也能實現先進先出 (FIFO) 排隊。

從 6.2 版本開始,ChannelMessageStoreQueryProvider 公開了 isSingleStatementForPoll 標誌,其中 PostgresChannelMessageStoreQueryProvider 返回 true,並且其輪詢查詢現在基於單個 DELETE…​RETURNING 語句。JdbcChannelMessageStore 諮詢 isSingleStatementForPoll 選項,如果只支援單個輪詢語句,則跳過單獨的 DELETE 語句。

自定義訊息插入

從 5.0 版本開始,透過過載 ChannelMessageStorePreparedStatementSetter 類,您可以為 JdbcChannelMessageStore 中的訊息插入提供自定義實現。您可以使用它來設定不同的列或更改表結構或序列化策略。例如,您可以將其結構儲存為 JSON 字串,而不是預設序列化為 byte[]

以下示例使用 setValues 的預設實現來儲存通用列,並覆蓋行為以將訊息負載儲存為 varchar

public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {

    @Override
    public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
        Object groupId, String region, 	boolean priorityEnabled) throws SQLException {
        // Populate common columns
        super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
        // Store message payload as varchar
        preparedStatement.setString(6, requestMessage.getPayload().toString());
    }
}

通常,我們不建議使用關係資料庫進行排隊。相反,如果可能,請考慮使用 JMS 或 AMQP 支援的通道。有關更多參考,請參閱以下資源

如果您仍計劃將資料庫用作佇列,請考慮使用 PostgreSQL 及其通知機制,這在後續部分中有所描述。

併發輪詢

輪詢訊息通道時,您可以選擇配置關聯的 Poller 以引用 TaskExecutor

但請記住,如果您使用 JDBC 支援的訊息通道,並且計劃使用多個執行緒對通道以及訊息儲存進行事務性輪詢,則應確保使用支援多版本併發控制 (MVCC) 的關係資料庫。否則,鎖定可能會成為問題,並且在使用多個執行緒時,效能可能不會如預期般實現。例如,Apache Derby 在這方面存在問題。

為了獲得更好的 JDBC 佇列吞吐量並避免不同執行緒可能從佇列輪詢同一 Message 時出現問題,在使用不支援 MVCC 的資料庫時,**請務必**將 JdbcChannelMessageStoreusingIdCache 屬性設定為 true。以下示例顯示瞭如何執行此操作

<bean id="queryProvider"
    class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>

<task:executor id="pool" pool-size="10"
    queue-capacity="10" rejection-policy="CALLER_RUNS" />

<bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
    <property name="region" value="TX_TIMEOUT"/>
    <property name="usingIdCache" value="true"/>
</bean>

<int:channel id="inputChannel">
    <int:queue message-store="store"/>
</int:channel>

<int:bridge input-channel="inputChannel" output-channel="outputChannel">
    <int:poller fixed-delay="500" receive-timeout="500"
        max-messages-per-poll="1" task-executor="pool">
        <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
        isolation="READ_COMMITTED" transaction-manager="transactionManager" />
    </int:poller>
</int:bridge>

<int:channel id="outputChannel" />

優先順序通道

從 4.0 版本開始,JdbcChannelMessageStore 實現了 PriorityCapableChannelMessageStore 並提供了 priorityEnabled 選項,使其可用作 priority-queue 例項的 message-store 引用。為此,INT_CHANNEL_MESSAGE 表有一個 MESSAGE_PRIORITY 列來儲存 PRIORITY 訊息頭的_值_。此外,新的 MESSAGE_SEQUENCE 列讓我們實現了一個健壯的先進先出 (FIFO) 輪詢機制,即使多個訊息以相同的優先順序在同一毫秒記憶體儲。訊息以 order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE 從資料庫中輪詢(選擇)。

我們不建議將相同的 JdbcChannelMessageStore bean 用於優先順序和非優先順序佇列通道,因為 priorityEnabled 選項適用於整個儲存,並且佇列通道的適當 FIFO 佇列語義不會保留。但是,相同的 INT_CHANNEL_MESSAGE 表(甚至 region)可以用於兩種 JdbcChannelMessageStore 型別。要配置該場景,您可以從另一個訊息儲存 bean 擴充套件一個訊息儲存 bean,如下例所示
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

分割槽訊息儲存

通常,JdbcMessageStore 被用作一組應用程式或同一應用程式中的節點的全域性儲存。為了提供一些針對名稱衝突的保護並控制資料庫元資料配置,訊息儲存允許以兩種方式對錶進行分割槽。一種方法是透過更改字首來使用單獨的表名(如前文所述)。另一種方法是為在單個表中分割槽資料指定 region 名稱。第二種方法的一個重要用例是當 MessageStore 管理支援 Spring Integration 訊息通道的持久佇列時。持久通道的訊息資料在儲存中以通道名稱為鍵。因此,如果通道名稱不是全域性唯一的,則通道可能會獲取不打算用於它們的資料。為了避免這種危險,您可以使用訊息儲存 region 來保持具有相同邏輯名稱的不同物理通道的資料分離。

PostgreSQL:接收推送通知

PostgreSQL 提供了一個偵聽和通知框架,用於在資料庫表操作時接收推送通知。Spring Integration 利用此機制(從 6.0 版本開始)允許在將新訊息新增到 JdbcChannelMessageStore 時接收推送通知。使用此功能時,必須定義一個數據庫觸發器,該觸發器可以在 Spring Integration JDBC 模組中包含的 schema-postgresql.sql 檔案的註釋中找到。

推送通知透過 PostgresChannelMessageTableSubscriber 類接收,該類允許其訂閱者在任何給定 regiongroupId 的新訊息到達時接收回調。即使訊息在不同的 JVM 上附加到同一個資料庫,也會收到這些通知。PostgresSubscribableChannel 實現使用 PostgresChannelMessageTableSubscriber.Subscription 契約,作為對上述 PostgresChannelMessageTableSubscriber 通知作出反應,從儲存中拉取訊息。

例如,可以按如下方式接收 some group 的推送通知

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
    messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
    return messageStore;
}

@Bean
public PostgresChannelMessageTableSubscriber subscriber(
      @Value("${spring.datasource.url}") String url,
      @Value("${spring.datasource.username}") String username,
      @Value("${spring.datasource.password}") String password) {
    return new PostgresChannelMessageTableSubscriber(() ->
        DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}

@Bean
public PostgresSubscribableChannel channel(
    PostgresChannelMessageTableSubscriber subscriber,
    JdbcChannelMessageStore messageStore) {
  return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}

事務支援

從 6.0.5 版本開始,在 PostgresSubscribableChannel 上指定 PlatformTransactionManager 將在事務中通知訂閱者。訂閱者中的異常將導致事務回滾並將訊息放回訊息儲存。預設情況下不啟用事務支援。

重試

從 6.0.5 版本開始,可以透過向 PostgresSubscribableChannel 提供 RetryTemplate 來指定重試策略。預設情況下,不執行重試。

任何活動的 PostgresChannelMessageTableSubscriber 在其活動生命週期內都佔用一個獨佔的 JDBC Connection。因此,此連線不應源自池化 DataSource,這一點很重要。此類連線池通常期望在預定義超時視窗內關閉已發出的連線。

由於需要獨佔連線,因此還建議 JVM 僅執行單個 PostgresChannelMessageTableSubscriber,該訂閱者可用於註冊任意數量的訂閱。

© . This site is unofficial and not affiliated with VMware.