JDBC 通道訊息儲存 JSON 序列化

7.0 版本引入了對 JdbcChannelMessageStore 的 JSON 序列化支援。預設情況下,Spring Integration 使用 Java 序列化將訊息儲存在資料庫中。新的 JSON 序列化選項提供了一種替代的序列化機制。

安全注意事項: JSON 序列化將訊息內容以文字形式儲存在資料庫中,這可能會暴露敏感資料。在使用 JSON 序列化生產環境之前,請確保適當的資料庫訪問控制、靜態加密,並考慮您組織的資料保護要求。

配置

JSON (反)序列化有兩個可用元件

  • JsonChannelMessageStorePreparedStatementSetter - 將訊息序列化為 JSON

  • JsonMessageRowMapper - 從 JSON 反序列化訊息

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
    store.setChannelMessageStoreQueryProvider(
        new PostgresChannelMessageStoreQueryProvider());

    // Enable JSON serialization
    store.setPreparedStatementSetter(
        new JsonChannelMessageStorePreparedStatementSetter());
    store.setMessageRowMapper(
        new JsonMessageRowMapper("com.example"));

    return store;
}

字串引數 ("com.example") 指定了用於反序列化的其他受信任包。這些包將新增到預設的受信任包中(請參閱受信任包部分)。出於安全考慮,只有受信任包中的類才能被反序列化。

資料庫模式修改

JSON 序列化需要修改資料庫模式。具有 BLOB/BYTEA 列型別的預設模式不能用於 JSON 序列化。

MESSAGE_CONTENT 列必須更改為可以儲存 JSON 的基於文字的型別。

  • PostgreSQL

  • MySQL

  • H2

  • 其他資料庫

對於 PostgreSQL,可以使用 JSONB 型別。

-- JSONB (enables JSON queries)
ALTER TABLE INT_CHANNEL_MESSAGE
ALTER COLUMN MESSAGE_CONTENT TYPE JSONB
USING MESSAGE_CONTENT::text::jsonb;

對於 MySQL,可以使用 JSON 型別。

-- JSON type (enables JSON functions)
ALTER TABLE INT_CHANNEL_MESSAGE
MODIFY COLUMN MESSAGE_CONTENT JSON;

對於 H2 資料庫,可以使用 CLOB 型別。

ALTER TABLE INT_CHANNEL_MESSAGE
ALTER COLUMN MESSAGE_CONTENT CLOB;

對於任何支援大型文字列(CLOB、TEXT 等)的資料庫,MESSAGE_CONTENT 列可以修改為適當的文字型別。

JSON 序列化的示例模式

以下示例演示如何為基於 JSON 的訊息儲存建立專用表。

  • PostgreSQL

  • MySQL

  • H2

CREATE TABLE JSON_CHANNEL_MESSAGE (
   MESSAGE_ID CHAR(36) NOT NULL,
   GROUP_KEY CHAR(36) NOT NULL,
   CREATED_DATE BIGINT NOT NULL,
   MESSAGE_PRIORITY BIGINT,
   MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT nextval('JSON_MESSAGE_SEQ'),
   MESSAGE_CONTENT JSONB, -- JSON message content
   REGION VARCHAR(100) NOT NULL,
   CONSTRAINT JSON_CHANNEL_MESSAGE_PK
      PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
CREATE TABLE JSON_CHANNEL_MESSAGE (
   MESSAGE_ID CHAR(36) NOT NULL,
   GROUP_KEY CHAR(36) NOT NULL,
   CREATED_DATE BIGINT NOT NULL,
   MESSAGE_PRIORITY BIGINT,
   MESSAGE_SEQUENCE BIGINT NOT NULL AUTO_INCREMENT UNIQUE,
   MESSAGE_CONTENT JSON, -- JSON message content
   REGION VARCHAR(100) NOT NULL,
   CONSTRAINT JSON_CHANNEL_MESSAGE_PK
      PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
CREATE TABLE JSON_CHANNEL_MESSAGE (
   MESSAGE_ID CHAR(36) NOT NULL,
   GROUP_KEY CHAR(36) NOT NULL,
   CREATED_DATE BIGINT NOT NULL,
   MESSAGE_PRIORITY BIGINT,
   MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT NEXT VALUE FOR JSON_MESSAGE_SEQ,
   MESSAGE_CONTENT CLOB, -- JSON message content
   REGION VARCHAR(100) NOT NULL,
   CONSTRAINT JSON_CHANNEL_MESSAGE_PK
      PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);

JSON 結構

使用基於 Jackson 的序列化時,訊息使用 Jackson 的多型型別處理以以下 JSON 結構儲存

{
  "@class": "org.springframework.messaging.support.GenericMessage",
  "payload": {
    "@class": "com.example.OrderMessage",
    "orderId": "ORDER-12345",
    "amount": 1299.99
  },
  "headers": {
    "@class": "java.util.HashMap",
    "priority": ["java.lang.String", "HIGH"],
    "id": ["java.util.UUID", "a1b2c3d4-..."],
    "timestamp": ["java.lang.Long", 1234567890]
  }
}

@class 屬性提供正確反序列化多型型別所需的型別資訊。

查詢 JSON 內容(可選)

如果使用原生 JSON 列型別(PostgreSQL JSONB 或 MySQL JSON),則可以直接查詢訊息內容。

PostgreSQL JSONB 查詢

-- Find messages by payload field
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE MESSAGE_CONTENT @> '{"payload": {"orderId": "ORDER-12345"}}';

-- Find high-priority messages
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE MESSAGE_CONTENT -> 'headers' @> '{"priority": ["java.lang.String", "HIGH"]}';

MySQL JSON 函式

-- Find messages by payload field
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE JSON_EXTRACT(MESSAGE_CONTENT, '$.payload.orderId') = 'ORDER-12345';

-- Find high-priority messages
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE JSON_EXTRACT(MESSAGE_CONTENT, '$.headers.priority[1]') = 'HIGH';

如果使用 TEXTCLOB 列型別,則這些 JSON 特定的查詢不可用。但是,JSON 序列化仍然可以透過 Spring Integration 進行儲存和檢索。

受信任的包

JacksonMessagingUtils.messagingAwareMapper() 根據受信任的包列表驗證所有反序列化的類,以防止安全漏洞。

預設受信任的包包括:- java.util - java.lang - org.springframework.messaging.support - org.springframework.integration.support - org.springframework.integration.message - org.springframework.integration.store - org.springframework.integration.history - org.springframework.integration.handler

可以在建構函式中指定其他包並附加到此列表

// Trust additional packages for custom payload types
new JsonMessageRowMapper("com.example.orders", "com.example.payments")

自定義 JsonObjectMapper

對於高階場景,可以提供自定義的 JsonObjectMapper

import org.springframework.integration.support.json.JacksonJsonObjectMapper;
import org.springframework.integration.support.json.JacksonMessagingUtils;
import tools.jackson.databind.ObjectMapper;
import tools.jackson.databind.SerializationFeature;

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    ObjectMapper customMapper = JacksonMessagingUtils.messagingAwareMapper("com.example");
    customMapper.enable(SerializationFeature.INDENT_OUTPUT);
    customMapper.registerModule(new CustomModule());

    JacksonJsonObjectMapper jsonObjectMapper = new JacksonJsonObjectMapper(customMapper);

    JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
    store.setPreparedStatementSetter(
        new JsonChannelMessageStorePreparedStatementSetter(jsonObjectMapper));
    store.setMessageRowMapper(
        new JsonMessageRowMapper(jsonObjectMapper));

    return store;
}

自定義 JsonObjectMapper 應該針對 Spring Integration 訊息序列化進行適當配置。建議從 JacksonMessagingUtils.messagingAwareMapper() 開始並在此基礎上進行定製。在 JsonChannelMessageStorePreparedStatementSetterJsonMessageRowMapper 中必須使用相同的配置以實現一致的序列化和反序列化。

© . This site is unofficial and not affiliated with VMware.