Apache Cassandra 向量儲存

本節將引導您設定 CassandraVectorStore 以儲存文件嵌入並執行相似性搜尋。

什麼是 Apache Cassandra?

Apache Cassandra® 是一個真正的開源分散式資料庫,以其線性擴充套件性、成熟的容錯性和低延遲而聞名,是任務關鍵型事務資料的完美平臺。

其向量相似性搜尋 (VSS) 基於 JVector 庫,可確保一流的效能和相關性。

在 Apache Cassandra 中進行向量搜尋非常簡單,例如

SELECT content FROM table ORDER BY content_vector ANN OF query_embedding;

更多相關文件可在此處閱讀此處

這個 Spring AI 向量儲存旨在用於全新的 RAG 應用,也能夠適配到現有資料和表之上。

該儲存也可用於現有資料庫中的非 RAG 用例,例如語義搜尋、地理位置鄰近搜尋等。

該儲存將根據其配置自動建立或增強 schema。如果您不希望修改 schema,請使用 disallowSchemaChanges 配置該儲存。

使用 spring-boot-autoconfigure 時,根據 Spring Boot 標準,disallowSchemaChanges 預設設定為 true,您必須在 application.properties 檔案中設定 …​initialize-schema=true 來選擇啟用 schema 的建立/修改。

什麼是 JVector?

JVector 是一個純 Java 嵌入式向量搜尋引擎。

它在其他 HNSW 向量相似性搜尋實現中脫穎而出,原因在於其具備以下特性:

  • 演算法快速。JVector 使用受 DiskANN 及相關研究啟發的先進圖演算法,提供高召回率和低延遲。

  • 實現快速。JVector 使用 Panama SIMD API 加速索引構建和查詢。

  • 記憶體高效。JVector 使用乘積量化壓縮向量,使其在搜尋期間能夠駐留在記憶體中。

  • 磁碟感知。JVector 的磁碟佈局設計旨在查詢時執行最少必要的 IOPS。

  • 併發。索引構建可以線性擴充套件到至少 32 個執行緒。執行緒加倍,構建時間減半。

  • 增量。在構建索引的同時即可查詢。新增向量後,可以立即在搜尋結果中找到,沒有延遲。

  • 易於嵌入。API 由在生產環境中使用它的人員設計,易於嵌入。

先決條件

  1. 一個用於計算文件嵌入的 EmbeddingModel 例項。這通常被配置為一個 Spring Bean。有多種選項可用:

    • Transformers Embedding - 在您的本地環境中計算嵌入。預設是透過 ONNX 和 all-MiniLM-L6-v2 Sentence Transformers。這開箱即用。

    • 如果您想使用 OpenAI 的 Embedding - 使用 OpenAI 的嵌入端點。您需要在 OpenAI 註冊頁面建立賬戶,並在 API Keys 頁面生成 API 金鑰令牌。

    • 還有更多選擇,請參閱 Embeddings API 文件。

  2. 一個 Apache Cassandra 例項,版本 5.0-beta1 或更高

    1. 自行快速入門

    2. 對於託管服務,Astra DB 提供了不錯的免費套餐。

依賴

Spring AI 自動配置、starter 模組的 artifact 名稱發生了重大變化。請參閱升級說明獲取更多資訊。

對於依賴管理,我們建議使用 Spring AI BOM,具體說明請參閱依賴管理部分。

將這些依賴新增到您的專案

  • 僅使用 Cassandra Vector Store

<dependency>
    <groupId>org.springframework.ai</groupId>
    <artifactId>spring-ai-cassandra-store</artifactId>
</dependency>
  • 或者,對於 RAG 應用所需的一切(使用預設 ONNX Embedding Model)

<dependency>
    <groupId>org.springframework.ai</groupId>
    <artifactId>spring-ai-starter-vector-store-cassandra</artifactId>
</dependency>

配置屬性

您可以在 Spring Boot 配置中使用以下屬性來定製 Apache Cassandra 向量儲存。

屬性 預設值

spring.ai.vectorstore.cassandra.keyspace

springframework

spring.ai.vectorstore.cassandra.table

ai_vector_store

spring.ai.vectorstore.cassandra.initialize-schema

false

spring.ai.vectorstore.cassandra.index-name

spring.ai.vectorstore.cassandra.content-column-name

content

spring.ai.vectorstore.cassandra.embedding-column-name

embedding

spring.ai.vectorstore.cassandra.fixed-thread-pool-executor-size

16

用法

基本用法

建立一個 CassandraVectorStore 例項作為 Spring Bean

@Bean
public VectorStore vectorStore(CqlSession session, EmbeddingModel embeddingModel) {
    return CassandraVectorStore.builder(embeddingModel)
        .session(session)
        .keyspace("my_keyspace")
        .table("my_vectors")
        .build();
}

一旦有了向量儲存例項,您就可以新增文件並執行搜尋

// Add documents
vectorStore.add(List.of(
    new Document("1", "content1", Map.of("key1", "value1")),
    new Document("2", "content2", Map.of("key2", "value2"))
));

// Search with filters
List<Document> results = vectorStore.similaritySearch(
    SearchRequest.query("search text")
        .withTopK(5)
        .withSimilarityThreshold(0.7f)
        .withFilterExpression("metadata.key1 == 'value1'")
);

高階配置

對於更復雜的用例,您可以在 Spring Bean 中配置更多設定

@Bean
public VectorStore vectorStore(CqlSession session, EmbeddingModel embeddingModel) {
    return CassandraVectorStore.builder(embeddingModel)
        .session(session)
        .keyspace("my_keyspace")
        .table("my_vectors")
        // Configure primary keys
        .partitionKeys(List.of(
            new SchemaColumn("id", DataTypes.TEXT),
            new SchemaColumn("category", DataTypes.TEXT)
        ))
        .clusteringKeys(List.of(
            new SchemaColumn("timestamp", DataTypes.TIMESTAMP)
        ))
        // Add metadata columns with optional indexing
        .addMetadataColumns(
            new SchemaColumn("category", DataTypes.TEXT, SchemaColumnTags.INDEXED),
            new SchemaColumn("score", DataTypes.DOUBLE)
        )
        // Customize column names
        .contentColumnName("text")
        .embeddingColumnName("vector")
        // Performance tuning
        .fixedThreadPoolExecutorSize(32)
        // Schema management
        .disallowSchemaChanges(false)
        // Custom batching strategy
        .batchingStrategy(new TokenCountBatchingStrategy())
        .build();
}

連線配置

有兩種方法配置與 Cassandra 的連線

  • 使用注入的 CqlSession (推薦)

@Bean
public VectorStore vectorStore(CqlSession session, EmbeddingModel embeddingModel) {
    return CassandraVectorStore.builder(embeddingModel)
        .session(session)
        .keyspace("my_keyspace")
        .table("my_vectors")
        .build();
}
  • 直接在構建器中指定連線詳情

@Bean
public VectorStore vectorStore(EmbeddingModel embeddingModel) {
    return CassandraVectorStore.builder(embeddingModel)
        .contactPoint(new InetSocketAddress("localhost", 9042))
        .localDatacenter("datacenter1")
        .keyspace("my_keyspace")
        .build();
}

元資料過濾

您可以利用通用的、可移植的元資料過濾器與 CassandraVectorStore 一起使用。元資料列要想能夠被搜尋,必須是主鍵或者經過 SAI 索引。要讓非主鍵列被索引,請使用 SchemaColumnTags.INDEXED 配置元資料列。

例如,您可以使用文字表示式語言

vectorStore.similaritySearch(
    SearchRequest.builder().query("The World")
        .topK(5)
        .filterExpression("country in ['UK', 'NL'] && year >= 2020").build());

或透過程式設計方式使用表示式 DSL

Filter.Expression f = new FilterExpressionBuilder()
    .and(
        f.in("country", "UK", "NL"),
        f.gte("year", 2020)
    ).build();

vectorStore.similaritySearch(
    SearchRequest.builder().query("The World")
        .topK(5)
        .filterExpression(f).build());

可移植的過濾器表示式會自動轉換為 CQL 查詢

高階示例:基於維基百科資料集的向量儲存

以下示例演示瞭如何在現有 schema 上使用該儲存。這裡我們使用來自 github.com/datastax-labs/colbert-wikipedia-data 專案的 schema,該專案提供了完整的維基百科資料集,並且已為您準備好向量化版本。

首先,在 Cassandra 資料庫中建立 schema

wget https://s.apache.org/colbert-wikipedia-schema-cql -O colbert-wikipedia-schema.cql
cqlsh -f colbert-wikipedia-schema.cql

然後使用構建器模式配置儲存

@Bean
public VectorStore vectorStore(CqlSession session, EmbeddingModel embeddingModel) {
    List<SchemaColumn> partitionColumns = List.of(
        new SchemaColumn("wiki", DataTypes.TEXT),
        new SchemaColumn("language", DataTypes.TEXT),
        new SchemaColumn("title", DataTypes.TEXT)
    );

    List<SchemaColumn> clusteringColumns = List.of(
        new SchemaColumn("chunk_no", DataTypes.INT),
        new SchemaColumn("bert_embedding_no", DataTypes.INT)
    );

    List<SchemaColumn> extraColumns = List.of(
        new SchemaColumn("revision", DataTypes.INT),
        new SchemaColumn("id", DataTypes.INT)
    );

    return CassandraVectorStore.builder()
        .session(session)
        .embeddingModel(embeddingModel)
        .keyspace("wikidata")
        .table("articles")
        .partitionKeys(partitionColumns)
        .clusteringKeys(clusteringColumns)
        .contentColumnName("body")
        .embeddingColumnName("all_minilm_l6_v2_embedding")
        .indexName("all_minilm_l6_v2_ann")
        .disallowSchemaChanges(true)
        .addMetadataColumns(extraColumns)
        .primaryKeyTranslator((List<Object> primaryKeys) -> {
            if (primaryKeys.isEmpty()) {
                return "test§¶0";
            }
            return String.format("%s§¶%s", primaryKeys.get(2), primaryKeys.get(3));
        })
        .documentIdTranslator((id) -> {
            String[] parts = id.split("§¶");
            String title = parts[0];
            int chunk_no = parts.length > 1 ? Integer.parseInt(parts[1]) : 0;
            return List.of("simplewiki", "en", title, chunk_no, 0);
        })
        .build();
}

@Bean
public EmbeddingModel embeddingModel() {
    // default is ONNX all-MiniLM-L6-v2 which is what we want
    return new TransformersEmbeddingModel();
}

載入完整的維基百科資料集

載入完整的維基百科資料集

  1. s.apache.org/simplewiki-sstable-tar 下載 simplewiki-sstable.tar(這將需要一段時間,檔案大小為數十 GB)

  2. 載入資料

tar -xf simplewiki-sstable.tar -C ${CASSANDRA_DATA}/data/wikidata/articles-*/
nodetool import wikidata articles ${CASSANDRA_DATA}/data/wikidata/articles-*/
  • 如果此表中有現有資料,請在執行 tar 時檢查 tarball 中的檔案是否會覆蓋現有的 sstables。

  • nodetool import 的替代方法是直接重啟 Cassandra。

  • 如果索引出現任何故障,它們將自動重建。

訪問原生客戶端

Cassandra 向量儲存實現透過 getNativeClient() 方法提供對底層原生 Cassandra 客戶端 (CqlSession) 的訪問

CassandraVectorStore vectorStore = context.getBean(CassandraVectorStore.class);
Optional<CqlSession> nativeClient = vectorStore.getNativeClient();

if (nativeClient.isPresent()) {
    CqlSession session = nativeClient.get();
    // Use the native client for Cassandra-specific operations
}

原生客戶端使您能夠訪問透過 VectorStore 介面可能未暴露的 Cassandra 特有功能和操作。