使用 R2DBC 進行資料訪問

R2DBC(“Reactive Relational Database Connectivity”,響應式關係型資料庫連線)是一項由社群驅動的規範工作,旨在利用響應式模式標準化對 SQL 資料庫的訪問。

包層級結構

Spring Framework 的 R2DBC 抽象框架包含兩個不同的包

使用 R2DBC 核心類控制基本的 R2DBC 處理和錯誤處理

本節介紹如何使用 R2DBC 核心類控制基本的 R2DBC 處理,包括錯誤處理。它包含以下主題

使用 DatabaseClient

DatabaseClient 是 R2DBC 核心包中的中心類。它負責資源的建立和釋放,有助於避免常見的錯誤,例如忘記關閉連線。它執行核心 R2DBC 工作流的基本任務(例如語句建立和執行),而應用程式程式碼只需提供 SQL 並提取結果。DatabaseClient

  • 執行 SQL 查詢

  • 更新語句和儲存過程呼叫

  • Result 例項執行迭代

  • 捕獲 R2DBC 異常並將其轉換為在 org.springframework.dao 包中定義的通用、資訊量更大的異常層級結構。(參見一致的異常層級結構。)

該客戶端具有函式式、流式的 API,使用響應式型別進行宣告式組合。

當你在程式碼中使用 DatabaseClient 時,只需實現 java.util.function 介面,為它們提供一個清晰定義的契約。給定由 DatabaseClient 類提供的 Connection,一個 Function 回撥會建立一個 Publisher。提取 Row 結果的對映函式也是如此。

你可以在 DAO 實現中透過直接例項化並引用 ConnectionFactory 來使用 DatabaseClient,也可以在 Spring IoC 容器中配置它,並將其作為 Bean 引用注入到 DAO 中。

建立 DatabaseClient 物件最簡單的方法是透過一個靜態工廠方法,如下所示

  • Java

  • Kotlin

DatabaseClient client = DatabaseClient.create(connectionFactory);
val client = DatabaseClient.create(connectionFactory)
ConnectionFactory 應始終在 Spring IoC 容器中配置為一個 Bean。

上述方法建立一個具有預設設定的 DatabaseClient

你還可以從 DatabaseClient.builder() 獲取一個 Builder 例項。可以透過呼叫以下方法來自定義客戶端

  • ….bindMarkers(…):提供一個特定的 BindMarkersFactory 來配置命名引數到資料庫繫結標記的轉換。

  • ….executeFunction(…):設定 ExecuteFunction 來控制 Statement 物件的執行方式。

  • ….namedParameters(false):停用命名引數展開。預設啟用。

方言透過 BindMarkersFactoryResolverConnectionFactory 解析,通常是透過檢查 ConnectionFactoryMetadata
透過在 META-INF/spring.factories 中註冊實現 org.springframework.r2dbc.core.binding.BindMarkersFactoryResolver$BindMarkerFactoryProvider 介面的類,可以讓 Spring 自動發現你的 BindMarkersFactoryBindMarkersFactoryResolver 使用 Spring 的 SpringFactoriesLoader 從類路徑中發現繫結標記提供者實現。

目前支援的資料庫有

  • H2

  • MariaDB

  • Microsoft SQL Server

  • MySQL

  • Postgres

該類執行的所有 SQL 都會在 DEBUG 級別下,以對應客戶端例項的完全限定類名(通常是 DefaultDatabaseClient)作為類別進行日誌記錄。此外,每次執行都會在響應式序列中註冊一個檢查點,以幫助除錯。

以下部分提供了一些 DatabaseClient 的使用示例。這些示例並非 DatabaseClient 暴露的所有功能的詳盡列表。更多資訊請參見相應的 javadoc

執行語句

DatabaseClient 提供了執行語句的基本功能。以下示例展示了建立新表所需的最少但功能完整的程式碼

  • Java

  • Kotlin

Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
		.then();
client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
		.await()

DatabaseClient 旨在提供方便、流式的使用體驗。它在執行規範的每個階段都暴露了中間、繼續和終端方法。上面的示例使用了 then() 方法來返回一個完成 Publisher,該 Publisher 在查詢(如果 SQL 查詢包含多個語句,則為多個查詢)完成後立即完成。

execute(…) 接受 SQL 查詢字串或查詢 Supplier<String>,以將實際查詢建立推遲到執行時。

查詢(SELECT

SQL 查詢可以透過 Row 物件或受影響的行數返回值。DatabaseClient 可以根據執行的查詢返回更新的行數或行本身。

以下查詢從表中獲取 idname

  • Java

  • Kotlin

Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person")
		.fetch().first();
val first = client.sql("SELECT id, name FROM person")
		.fetch().awaitSingle()

以下查詢使用繫結變數

  • Java

  • Kotlin

Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
		.bind("fn", "Joe")
		.fetch().first();
val first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
		.bind("fn", "Joe")
		.fetch().awaitSingle()

你可能已經注意到上面示例中使用了 fetch()fetch() 是一個 continuation 運算子,允許你指定要消費多少資料。

呼叫 first() 返回結果中的第一行並丟棄剩餘的行。可以使用以下運算子來消費資料

  • first() 返回整個結果的第一行。其 Kotlin Coroutine 變體對於非空返回值命名為 awaitSingle(),對於可選值命名為 awaitSingleOrNull()

  • one() 精確返回一個結果,如果結果包含更多行則失敗。使用 Kotlin Coroutines 時,對於恰好一個值使用 awaitOne(),如果值可能為 null 則使用 awaitOneOrNull()

  • all() 返回結果的所有行。使用 Kotlin Coroutines 時,請使用 flow()

  • rowsUpdated() 返回受影響的行數(INSERT/UPDATE/DELETE 計數)。其 Kotlin Coroutine 變體命名為 awaitRowsUpdated()

如果不指定進一步的對映詳情,查詢將返回表格結果,形式為 Map,其鍵是不區分大小寫的列名,對映到對應的列值。

你可以透過提供一個 Function<Row, T> 函式來控制結果對映,該函式會為每一行 Row 呼叫,以便它可以返回任意值(單個值、集合、map 和物件)。

以下示例提取 name 列併發出其值

  • Java

  • Kotlin

Flux<String> names = client.sql("SELECT name FROM person")
		.map(row -> row.get("name", String.class))
		.all();
val names = client.sql("SELECT name FROM person")
		.map{ row: Row -> row.get("name", String.class) }
		.flow()

或者,有一個對映到單個值的快捷方式

	Flux<String> names = client.sql("SELECT name FROM person")
			.mapValue(String.class)
			.all();

或者你可以對映到一個具有 Bean 屬性或 record 元件的結果物件

	// assuming a name property on Person
	Flux<Person> persons = client.sql("SELECT name FROM person")
			.mapProperties(Person.class)
			.all();
null 怎麼辦?

關係型資料庫結果可以包含 null 值。Reactive Streams 規範禁止發出 null 值。這項要求強制要求在提取函式中正確處理 null。雖然可以從 Row 中獲取 null 值,但絕不能發出 null 值。你必須將任何 null 值包裝在一個物件中(例如,對於單個值使用 Optional),以確保提取函式不會直接返回 null 值。

使用 DatabaseClient 進行更新(INSERTUPDATEDELETE

修改語句的唯一區別是它們通常不返回表格資料,因此使用 rowsUpdated() 來消費結果。

以下示例展示了一個返回更新行數的 UPDATE 語句

  • Java

  • Kotlin

Mono<Integer> affectedRows = client.sql("UPDATE person SET first_name = :fn")
		.bind("fn", "Joe")
		.fetch().rowsUpdated();
val affectedRows = client.sql("UPDATE person SET first_name = :fn")
		.bind("fn", "Joe")
		.fetch().awaitRowsUpdated()

將值繫結到查詢

典型的應用程式需要引數化的 SQL 語句來根據某些輸入選擇或更新行。這些通常是由 WHERE 子句約束的 SELECT 語句,或接受輸入引數的 INSERTUPDATE 語句。如果引數未正確轉義,引數化語句存在 SQL 注入的風險。DatabaseClient 利用 R2DBC 的 bind API 來消除查詢引數的 SQL 注入風險。你可以使用 execute(…)。 運算子提供引數化的 SQL 語句,並將引數繫結到實際的 Statement。然後,你的 R2DBC 驅動程式會使用預處理語句和引數替換來執行該語句。

引數繫結支援兩種繫結策略

  • 按索引,使用從零開始的引數索引。

  • 按名稱,使用佔位符名稱。

以下示例展示了查詢的引數繫結

	db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
			.bind("id", "joe")
			.bind("name", "Joe")
			.bind("age", 34);

或者,你可以傳入一個名稱和值的 Map

	Map<String, Object> params = new LinkedHashMap<>();
	params.put("id", "joe");
	params.put("name", "Joe");
	params.put("age", 34);
	db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
			.bindValues(params);

或者你可以傳入一個帶有 Bean 屬性或 record 元件的引數物件

	// assuming id, name, age properties on Person
	db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
			.bindProperties(new Person("joe", "Joe", 34);

或者,你可以使用位置引數將值繫結到語句。索引從零開始。

	db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
			.bind(0, "joe")
			.bind(1, "Joe")
			.bind(2, 34);

如果你的應用程式繫結到許多引數,可以透過一次呼叫實現相同的目的

	List<?> values = List.of("joe", "Joe", 34);
	db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
			.bindValues(values);
R2DBC 原生繫結標記

R2DBC 使用依賴於實際資料庫供應商的資料庫原生繫結標記。例如,Postgres 使用索引標記,例如 $1$2$n。另一個例子是 SQL Server,它使用以 @ 為字首的命名繫結標記。

這與 JDBC 不同,JDBC 要求使用 ? 作為繫結標記。在 JDBC 中,實際的驅動程式會在語句執行時將 ? 繫結標記轉換為資料庫原生標記。

Spring Framework 的 R2DBC 支援允許你使用原生繫結標記或使用 :name 語法的命名繫結標記。

命名引數支援利用 BindMarkersFactory 例項在查詢執行時將命名引數展開為原生繫結標記,這使得你的查詢在各種資料庫供應商之間具有一定的可移植性。

查詢預處理器將命名的 Collection 引數展開成一系列繫結標記,以消除根據引數數量動態建立查詢的需要。巢狀物件陣列也會展開,以允許使用(例如)選擇列表。

考慮以下查詢

SELECT id, name, state FROM table WHERE (name, age) IN (('John', 35), ('Ann', 50))

上述查詢可以引數化並按如下方式執行

  • Java

  • Kotlin

List<Object[]> tuples = new ArrayList<>();
tuples.add(new Object[] {"John", 35});
tuples.add(new Object[] {"Ann",  50});

client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
		.bind("tuples", tuples);
val tuples: MutableList<Array<Any>> = ArrayList()
tuples.add(arrayOf("John", 35))
tuples.add(arrayOf("Ann", 50))

client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
		.bind("tuples", tuples)
選擇列表的使用取決於供應商。

以下示例展示了一個使用 IN 謂詞的更簡單變體

  • Java

  • Kotlin

client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
		.bind("ages", Arrays.asList(35, 50));
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
		.bind("ages", arrayOf(35, 50))
R2DBC 本身不支援 Collection 類似的值。然而,在上面示例中展開給定的 List 對於 Spring 的 R2DBC 支援中的命名引數是可行的,例如用於 IN 子句中。但是,插入或更新陣列型別列(例如在 Postgres 中)需要底層 R2DBC 驅動程式支援的陣列型別:通常是 Java 陣列,例如 String[] 來更新 text[] 列。不要將 Collection<String> 或類似型別作為陣列引數傳遞。

語句過濾器

有時,你需要在實際執行 Statement 之前對其選項進行微調。為此,可以透過在 DatabaseClient 中註冊一個 Statement 過濾器(StatementFilterFunction),以便在語句執行過程中攔截和修改它們,示例如下

  • Java

  • Kotlin

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter((s, next) -> next.execute(s.returnGeneratedValues("id")))
		.bind("name", …)
		.bind("state", …);
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter { s: Statement, next: ExecuteFunction -> next.execute(s.returnGeneratedValues("id")) }
		.bind("name", …)
		.bind("state", …)

DatabaseClient 還暴露了一個簡化的 filter(…) 過載方法,它接受一個 Function<Statement, Statement>

  • Java

  • Kotlin

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter(statement -> s.returnGeneratedValues("id"));

client.sql("SELECT id, name, state FROM table")
		.filter(statement -> s.fetchSize(25));
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter { statement -> s.returnGeneratedValues("id") }

client.sql("SELECT id, name, state FROM table")
		.filter { statement -> s.fetchSize(25) }

StatementFilterFunction 的實現允許對 Statement 進行過濾以及對 Result 物件進行過濾。

DatabaseClient 最佳實踐

一旦配置完成,DatabaseClient 類的例項是執行緒安全的。這一點很重要,因為這意味著你可以配置一個 DatabaseClient 的單例例項,然後安全地將這個共享引用注入到多個 DAO(或 Repository)中。DatabaseClient 是有狀態的,因為它維護著對 ConnectionFactory 的引用,但這種狀態不是會話狀態。

使用 DatabaseClient 類時的常見做法是在 Spring 配置檔案中配置一個 ConnectionFactory,然後將該共享的 ConnectionFactory Bean 依賴注入到你的 DAO 類中。DatabaseClientConnectionFactory 的 setter 方法中建立。這使得 DAO 類似於以下示例

  • Java

  • Kotlin

public class R2dbcCorporateEventDao implements CorporateEventDao {

	private DatabaseClient databaseClient;

	public void setConnectionFactory(ConnectionFactory connectionFactory) {
		this.databaseClient = DatabaseClient.create(connectionFactory);
	}

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao {

	private val databaseClient = DatabaseClient.create(connectionFactory)

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}

顯式配置的另一種選擇是使用元件掃描和註解支援進行依賴注入。在這種情況下,你可以使用 @Component 註解類(使其成為元件掃描的候選),並使用 @Autowired 註解 ConnectionFactory 的 setter 方法。以下示例展示瞭如何實現

  • Java

  • Kotlin

@Component (1)
public class R2dbcCorporateEventDao implements CorporateEventDao {

	private DatabaseClient databaseClient;

	@Autowired (2)
	public void setConnectionFactory(ConnectionFactory connectionFactory) {
		this.databaseClient = DatabaseClient.create(connectionFactory); (3)
	}

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
1 使用 @Component 註解類。
2 使用 @Autowired 註解 ConnectionFactory 的 setter 方法。
3 使用 ConnectionFactory 建立一個新的 DatabaseClient
@Component (1)
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao { (2)

	private val databaseClient = DatabaseClient(connectionFactory) (3)

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
1 使用 @Component 註解類。
2 ConnectionFactory 的建構函式注入。
3 使用 ConnectionFactory 建立一個新的 DatabaseClient

無論你選擇使用(或不使用)上述哪種模板初始化風格,每次要執行 SQL 時建立 DatabaseClient 類的新例項通常是不必要的。一旦配置完成,DatabaseClient 例項是執行緒安全的。如果你的應用程式訪問多個數據庫,你可能需要多個 DatabaseClient 例項,這需要多個 ConnectionFactory,從而需要多個配置不同的 DatabaseClient 例項。

檢索自動生成的主鍵

INSERT 語句在向定義了自增或 identity 列的表中插入行時,可能會生成主鍵。要完全控制生成的列名,只需註冊一個 StatementFilterFunction 來請求所需列的生成主鍵。

  • Java

  • Kotlin

Mono<Integer> generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter(statement -> s.returnGeneratedValues("id"))
		.map(row -> row.get("id", Integer.class))
		.first();

// generatedId emits the generated key once the INSERT statement has finished
val generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter { statement -> s.returnGeneratedValues("id") }
		.map { row -> row.get("id", Integer.class) }
		.awaitOne()

// generatedId emits the generated key once the INSERT statement has finished

控制資料庫連線

本節包含

使用 ConnectionFactory

Spring 透過 ConnectionFactory 獲取資料庫的 R2DBC 連線。ConnectionFactory 是 R2DBC 規範的一部分,是驅動程式的通用入口點。它允許容器或框架嚮應用程式程式碼隱藏連線池和事務管理問題。作為開發人員,您無需瞭解如何連線到資料庫的詳細資訊。這是設定 ConnectionFactory 的管理員的職責。您在開發和測試程式碼時很可能同時扮演這兩個角色,但您不必一定了解生產資料來源是如何配置的。

當您使用 Spring 的 R2DBC 層時,您可以使用第三方提供的連線池實現來配置您自己的連線池。一個流行的實現是 R2DBC Pool (r2dbc-pool)。Spring 發行版中的實現僅用於測試目的,不提供連線池。

配置 ConnectionFactory

  1. 使用 ConnectionFactory 獲取連線,就像您通常獲取 R2DBC ConnectionFactory 一樣。

  2. 提供一個 R2DBC URL(請參閱您的驅動程式文件以獲取正確的值)。

以下示例展示瞭如何配置 ConnectionFactory

  • Java

  • Kotlin

ConnectionFactory factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
val factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");

使用 ConnectionFactoryUtils

ConnectionFactoryUtils 類是一個方便強大的輔助類,它提供了 static 方法用於從 ConnectionFactory 獲取連線並在必要時關閉連線。

它支援訂閱者 Context 繫結的連線,例如與 R2dbcTransactionManager 一起使用。

使用 SingleConnectionFactory

SingleConnectionFactory 類是 DelegatingConnectionFactory 介面的一個實現,它封裝了一個不會在每次使用後關閉的單個 Connection

如果任何客戶端程式碼假定連線是池化的(例如在使用持久化工具時),並呼叫 close 方法,則應將 suppressClose 屬性設定為 true。此設定會返回一個抑制關閉的代理,該代理封裝了物理連線。請注意,您無法再將此代理轉換為原生的 Connection 或類似物件。

SingleConnectionFactory 主要是一個測試類,如果您的 R2DBC 驅動程式允許,它也可以用於特定需求,例如流水線處理。與池化的 ConnectionFactory 不同,它總是重用同一個連線,避免了過度建立物理連線。

使用 TransactionAwareConnectionFactoryProxy

TransactionAwareConnectionFactoryProxy 是一個目標 ConnectionFactory 的代理。該代理封裝了目標 ConnectionFactory,以增加對 Spring 管理的事務的感知。

如果您使用的 R2DBC 客戶端沒有與 Spring 的 R2DBC 支援整合,則需要使用此類。在這種情況下,您仍然可以使用此客戶端,同時讓該客戶端參與 Spring 管理的事務。通常更傾向於將 R2DBC 客戶端與對 ConnectionFactoryUtils 的正確訪問整合,以便進行資源管理。

有關更多詳細資訊,請參閱 TransactionAwareConnectionFactoryProxy 的 Javadoc。

使用 R2dbcTransactionManager

R2dbcTransactionManager 類是針對單個 R2DBC ConnectionFactoryReactiveTransactionManager 實現。它將指定 ConnectionFactory 的 R2DBC Connection 繫結到訂閱者 Context,可能允許每個 ConnectionFactory 對應一個訂閱者 Connection

應用程式程式碼需要透過 ConnectionFactoryUtils.getConnection(ConnectionFactory) 獲取 R2DBC Connection,而不是使用 R2DBC 標準的 ConnectionFactory.create()。所有框架類(例如 DatabaseClient)都隱式使用此策略。如果不與事務管理器一起使用,查詢策略的行為與 ConnectionFactory.create() 完全相同,因此在任何情況下都可以使用。