Apache Cassandra 支援
Spring Integration 提供了通道介面卡(從 6.0 版本開始),用於對 Apache Cassandra 叢集執行資料庫操作。它完全基於 Spring Data for Apache Cassandra 專案。
專案需要此依賴項
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-cassandra</artifactId>
<version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-cassandra:7.0.0"
Cassandra 出站元件
CassandraMessageHandler 是 AbstractReplyProducingMessageHandler 的實現,可以以單向(預設)和請求-回覆模式(producesReply 選項)工作。它預設是非同步的(setAsync(false) 可以重置),並對提供的 ReactiveCassandraOperations 執行響應式的 INSERT、UPDATE、DELETE 或 STATEMENT 操作。操作型別可以透過 CassandraMessageHandler.Type 選項進行配置。ingestQuery 將模式設定為 INSERT;query 或 statementExpression 或 statementProcessor 將模式設定為 STATEMENT。
以下程式碼片段演示了此通道介面卡或閘道器的各種配置。
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
IntegrationFlow cassandraSelectFlow(ReactiveCassandraOperations cassandraOperations) {
return flow -> flow
.handle(Cassandra.outboundGateway(cassandraOperations)
.query("SELECT * FROM book WHERE author = :author limit :size")
.parameter("author", "payload")
.parameter("size", m -> m.getHeaders().get("limit")))
.channel(c -> c.flux("resultChannel"));
}
@Bean
fun outboundReactive(cassandraOperations: ReactiveCassandraOperations) =
integrationFlow {
handle(
Cassandra.outboundChannelAdapter(cassandraOperations)
.statementExpression("T(QueryBuilder).truncate('book').build()")
) { async(false) }
}
@ServiceActivator(inputChannel = "cassandraSelectChannel")
@Bean
public MessageHandler cassandraMessageHandler() {
CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
cassandraMessageHandler.setQuery("SELECT * FROM book WHERE author = :author limit :size");
Map<String, Expression> params = new HashMap<>();
params.put("author", PARSER.parseExpression("payload"));
params.put("size", PARSER.parseExpression("headers.limit"));
cassandraMessageHandler.setParameterExpressions(params);
cassandraMessageHandler.setOutputChannel(resultChannel());
cassandraMessageHandler.setProducesReply(true);
return cassandraMessageHandler;
}
<int-cassandra:outbound-channel-adapter id="outboundAdapter"
cassandra-template="cassandraTemplate"
write-options="writeOptions"
auto-startup="false"
async="false"/>
<int-cassandra:outbound-gateway id="outgateway"
request-channel="input"
cassandra-template="cassandraTemplate"
mode="STATEMENT"
write-options="writeOptions"
query="SELECT * FROM book limit :size"
reply-channel="resultChannel"
auto-startup="true">
<int-cassandra:parameter-expression name="author" expression="payload"/>
<int-cassandra:parameter-expression name="size" expression="headers.limit"/>
</int-cassandra:outbound-gateway>
如果 CassandraMessageHandler 在預設非同步模式下用作閘道器,則會生成一個 Mono<WriteResult>,並根據提供的 MessageChannel 實現進行處理。對於真正的響應式處理,建議將 FluxMessageChannel 用於輸出通道配置。在同步模式下,會呼叫 Mono.block() 來獲取回覆值。
如果執行 INSERT、UPDATE 或 DELETE 操作,請求訊息負載中需要一個實體(標記為 org.springframework.data.cassandra.core.mapping.Table)。如果負載是實體列表,則執行相應的批處理操作。
ingestQuery 模式期望負載以值矩陣的形式存在以進行插入 - List<List<?>>。例如,如果實體如下:
@Table("book")
public record Book(@PrimaryKey String isbn,
String title,
@Indexed String author,
int pages,
LocalDate saleDate,
boolean isInStock) {
}
並且通道介面卡有此配置:
@Bean
public MessageHandler cassandraMessageHandler3() {
CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
String cqlIngest = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
cassandraMessageHandler.setIngestQuery(cqlIngest);
cassandraMessageHandler.setAsync(false);
return cassandraMessageHandler;
}
請求訊息負載必須像這樣轉換:
List<List<Object>> ingestBooks =
payload.stream()
.map(book ->
List.<Object>of(
book.isbn(),
book.title(),
book.author(),
book.pages(),
book.saleDate(),
book.isInStock()))
.toList();
對於更復雜的用例,負載可以是 com.datastax.oss.driver.api.core.cql.Statement 的例項。建議使用 com.datastax.oss.driver.api.querybuilder.QueryBuilder API 來構建要針對 Apache Cassandra 執行的各種語句。例如,要從 Book 表中刪除所有資料,可以向 CassandraMessageHandler 傳送一個負載如下的訊息:QueryBuilder.truncate("book").build()。或者,對於基於請求訊息的邏輯,可以為 CassandraMessageHandler 提供 statementExpression 或 statementProcessor,以便根據該訊息構建 Statement。為方便起見,com.datastax.oss.driver.api.querybuilder 已作為 import 註冊到 SpEL 評估上下文中,因此目標表達式可以像這樣簡單:
statement-expression="T(QueryBuilder).selectFrom("book").all()"
setParameterExpressions(Map<String, Expression> parameterExpressions) 表示可繫結的命名查詢引數,並且僅與 setQuery(String query) 選項一起使用。請參閱上面提到的 Java 和 XML 示例。