JDBC 通道訊息儲存 JSON 序列化
7.0 版本引入了對 JdbcChannelMessageStore 的 JSON 序列化支援。預設情況下,Spring Integration 使用 Java 序列化將訊息儲存在資料庫中。新的 JSON 序列化選項提供了一種替代的序列化機制。
|
安全注意事項: JSON 序列化將訊息內容以文字形式儲存在資料庫中,這可能會暴露敏感資料。在使用 JSON 序列化生產環境之前,請確保適當的資料庫訪問控制、靜態加密,並考慮您組織的資料保護要求。 |
配置
JSON (反)序列化有兩個可用元件
-
JsonChannelMessageStorePreparedStatementSetter- 將訊息序列化為 JSON -
JsonMessageRowMapper- 從 JSON 反序列化訊息
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
store.setChannelMessageStoreQueryProvider(
new PostgresChannelMessageStoreQueryProvider());
// Enable JSON serialization
store.setPreparedStatementSetter(
new JsonChannelMessageStorePreparedStatementSetter());
store.setMessageRowMapper(
new JsonMessageRowMapper("com.example"));
return store;
}
字串引數 ("com.example") 指定了用於反序列化的其他受信任包。這些包將新增到預設的受信任包中(請參閱受信任包部分)。出於安全考慮,只有受信任包中的類才能被反序列化。
資料庫模式修改
|
JSON 序列化需要修改資料庫模式。具有 |
MESSAGE_CONTENT 列必須更改為可以儲存 JSON 的基於文字的型別。
-
PostgreSQL
-
MySQL
-
H2
-
其他資料庫
對於 PostgreSQL,可以使用 JSONB 型別。
-- JSONB (enables JSON queries)
ALTER TABLE INT_CHANNEL_MESSAGE
ALTER COLUMN MESSAGE_CONTENT TYPE JSONB
USING MESSAGE_CONTENT::text::jsonb;
對於 MySQL,可以使用 JSON 型別。
-- JSON type (enables JSON functions)
ALTER TABLE INT_CHANNEL_MESSAGE
MODIFY COLUMN MESSAGE_CONTENT JSON;
對於 H2 資料庫,可以使用 CLOB 型別。
ALTER TABLE INT_CHANNEL_MESSAGE
ALTER COLUMN MESSAGE_CONTENT CLOB;
對於任何支援大型文字列(CLOB、TEXT 等)的資料庫,MESSAGE_CONTENT 列可以修改為適當的文字型別。
JSON 序列化的示例模式
以下示例演示如何為基於 JSON 的訊息儲存建立專用表。
-
PostgreSQL
-
MySQL
-
H2
CREATE TABLE JSON_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT nextval('JSON_MESSAGE_SEQ'),
MESSAGE_CONTENT JSONB, -- JSON message content
REGION VARCHAR(100) NOT NULL,
CONSTRAINT JSON_CHANNEL_MESSAGE_PK
PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
CREATE TABLE JSON_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL AUTO_INCREMENT UNIQUE,
MESSAGE_CONTENT JSON, -- JSON message content
REGION VARCHAR(100) NOT NULL,
CONSTRAINT JSON_CHANNEL_MESSAGE_PK
PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
CREATE TABLE JSON_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT NEXT VALUE FOR JSON_MESSAGE_SEQ,
MESSAGE_CONTENT CLOB, -- JSON message content
REGION VARCHAR(100) NOT NULL,
CONSTRAINT JSON_CHANNEL_MESSAGE_PK
PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
JSON 結構
使用基於 Jackson 的序列化時,訊息使用 Jackson 的多型型別處理以以下 JSON 結構儲存
{
"@class": "org.springframework.messaging.support.GenericMessage",
"payload": {
"@class": "com.example.OrderMessage",
"orderId": "ORDER-12345",
"amount": 1299.99
},
"headers": {
"@class": "java.util.HashMap",
"priority": ["java.lang.String", "HIGH"],
"id": ["java.util.UUID", "a1b2c3d4-..."],
"timestamp": ["java.lang.Long", 1234567890]
}
}
@class 屬性提供正確反序列化多型型別所需的型別資訊。
查詢 JSON 內容(可選)
如果使用原生 JSON 列型別(PostgreSQL JSONB 或 MySQL JSON),則可以直接查詢訊息內容。
PostgreSQL JSONB 查詢
-- Find messages by payload field
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE MESSAGE_CONTENT @> '{"payload": {"orderId": "ORDER-12345"}}';
-- Find high-priority messages
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE MESSAGE_CONTENT -> 'headers' @> '{"priority": ["java.lang.String", "HIGH"]}';
MySQL JSON 函式
-- Find messages by payload field
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE JSON_EXTRACT(MESSAGE_CONTENT, '$.payload.orderId') = 'ORDER-12345';
-- Find high-priority messages
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE JSON_EXTRACT(MESSAGE_CONTENT, '$.headers.priority[1]') = 'HIGH';
|
如果使用 |
受信任的包
JacksonMessagingUtils.messagingAwareMapper() 根據受信任的包列表驗證所有反序列化的類,以防止安全漏洞。
預設受信任的包包括:- java.util - java.lang - org.springframework.messaging.support - org.springframework.integration.support - org.springframework.integration.message - org.springframework.integration.store - org.springframework.integration.history - org.springframework.integration.handler
可以在建構函式中指定其他包並附加到此列表
// Trust additional packages for custom payload types
new JsonMessageRowMapper("com.example.orders", "com.example.payments")
自定義 JsonObjectMapper
對於高階場景,可以提供自定義的 JsonObjectMapper
import org.springframework.integration.support.json.JacksonJsonObjectMapper;
import org.springframework.integration.support.json.JacksonMessagingUtils;
import tools.jackson.databind.ObjectMapper;
import tools.jackson.databind.SerializationFeature;
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
ObjectMapper customMapper = JacksonMessagingUtils.messagingAwareMapper("com.example");
customMapper.enable(SerializationFeature.INDENT_OUTPUT);
customMapper.registerModule(new CustomModule());
JacksonJsonObjectMapper jsonObjectMapper = new JacksonJsonObjectMapper(customMapper);
JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
store.setPreparedStatementSetter(
new JsonChannelMessageStorePreparedStatementSetter(jsonObjectMapper));
store.setMessageRowMapper(
new JsonMessageRowMapper(jsonObjectMapper));
return store;
}
|
自定義 |