使用 R2DBC 進行資料訪問
R2DBC (“Reactive Relational Database Connectivity”) 是一項由社群驅動的規範工作,旨在標準化使用響應式模式訪問 SQL 資料庫。
包層級
Spring Framework 的 R2DBC 抽象框架由兩個不同的包組成:
-
core:org.springframework.r2dbc.core包包含DatabaseClient類以及各種相關類。請參閱使用 R2DBC 核心類來控制基本的 R2DBC 處理和錯誤處理。 -
connection:org.springframework.r2dbc.connection包包含一個用於方便ConnectionFactory訪問的實用工具類,以及各種簡單的ConnectionFactory實現,您可以用於測試和執行未經修改的 R2DBC。請參閱控制資料庫連線。
使用 R2DBC 核心類來控制基本的 R2DBC 處理和錯誤處理
本節介紹如何使用 R2DBC 核心類來控制基本的 R2DBC 處理,包括錯誤處理。它包含以下主題:
使用 DatabaseClient
DatabaseClient 是 R2DBC 核心包中的核心類。它處理資源的建立和釋放,這有助於避免常見的錯誤,例如忘記關閉連線。它執行核心 R2DBC 工作流的基本任務(例如語句建立和執行),將應用程式程式碼留給提供 SQL 和提取結果。DatabaseClient 類:
-
執行 SQL 查詢
-
更新語句和儲存過程呼叫
-
對
Result例項執行迭代 -
捕獲 R2DBC 異常並將其轉換為
org.springframework.dao包中定義的通用、更具資訊性的異常層次結構。(請參閱一致的異常層次結構。)
客戶端具有使用響應式型別進行宣告式組合的功能性、流暢式 API。
當您將 DatabaseClient 用於您的程式碼時,您只需實現 java.util.function 介面,為它們提供明確定義的契約。給定由 DatabaseClient 類提供的 Connection,Function 回撥會建立一個 Publisher。對於提取 Row 結果的對映函式也是如此。
您可以透過使用 ConnectionFactory 引用直接例項化 DatabaseClient,或者在 Spring IoC 容器中配置它並將其作為 bean 引用提供給 DAO,從而在 DAO 實現中使用 DatabaseClient。
建立 DatabaseClient 物件的最簡單方法是透過靜態工廠方法,如下所示:
-
Java
-
Kotlin
DatabaseClient client = DatabaseClient.create(connectionFactory);
val client = DatabaseClient.create(connectionFactory)
ConnectionFactory 應始終在 Spring IoC 容器中配置為 bean。 |
前面的方法建立了一個具有預設設定的 DatabaseClient。
您還可以從 DatabaseClient.builder() 獲取 Builder 例項。您可以透過呼叫以下方法來自定義客戶端:
-
….bindMarkers(…):提供特定的BindMarkersFactory以配置命名引數到資料庫繫結標記的轉換。 -
….executeFunction(…):設定ExecuteFunction如何執行Statement物件。 -
….namedParameters(false):停用命名引數擴充套件。預設啟用。
方言由 BindMarkersFactoryResolver 從 ConnectionFactory 解析,通常透過檢查 ConnectionFactoryMetadata 來完成。您可以透過在 META-INF/spring.factories 中註冊一個實現 org.springframework.r2dbc.core.binding.BindMarkersFactoryResolver$BindMarkerFactoryProvider 的類來讓 Spring 自動發現您的 BindMarkersFactory。BindMarkersFactoryResolver 使用 Spring 的 SpringFactoriesLoader 從類路徑中發現繫結標記提供程式實現。 |
目前支援的資料庫有:
-
H2
-
MariaDB
-
Microsoft SQL Server
-
MySQL
-
Postgres
此類的所有 SQL 都以 DEBUG 級別記錄在與客戶端例項的完全限定類名對應的類別下(通常是 DefaultDatabaseClient)。此外,每次執行都會在響應式序列中註冊一個檢查點以幫助除錯。
以下各節提供了一些 DatabaseClient 用法的示例。這些示例並非 DatabaseClient 所公開的所有功能的詳盡列表。請參閱相關的 javadoc。
執行語句
DatabaseClient 提供執行語句的基本功能。以下示例展示了建立新表所需的最小但功能齊全的程式碼:
-
Java
-
Kotlin
Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
.then();
client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
.await()
DatabaseClient 旨在提供方便、流暢的用法。它在執行規範的每個階段都公開中間、延續和終端方法。上面的示例使用 then() 返回一個完成 Publisher,該 Publisher 在查詢(或查詢,如果 SQL 查詢包含多個語句)完成時立即完成。
execute(…) 接受 SQL 查詢字串或查詢 Supplier<String>,以在執行之前延遲實際的查詢建立。 |
查詢 (SELECT)
SQL 查詢可以透過 Row 物件或受影響的行數返回值。DatabaseClient 可以根據發出的查詢返回更新的行數或行本身。
以下查詢從表中獲取 id 和 name 列:
-
Java
-
Kotlin
Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person")
.fetch().first();
val first = client.sql("SELECT id, name FROM person")
.fetch().awaitSingle()
以下查詢使用繫結變數:
-
Java
-
Kotlin
Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
.bind("fn", "Joe")
.fetch().first();
val first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
.bind("fn", "Joe")
.fetch().awaitSingle()
您可能已經注意到上面示例中使用了 fetch()。fetch() 是一個延續運算子,允許您指定要消費多少資料。
呼叫 first() 返回結果的第一行並丟棄剩餘的行。您可以使用以下運算子來消費資料:
-
first()返回整個結果的第一行。其 Kotlin 協程變體對於非空返回值命名為awaitSingle(),如果值是可選的,則命名為awaitSingleOrNull()。 -
one()返回恰好一個結果,如果結果包含多行則失敗。使用 Kotlin 協程,對於恰好一個值使用awaitOne(),如果該值可能為null,則使用awaitOneOrNull()。 -
all()返回結果的所有行。當使用 Kotlin 協程時,使用flow()。 -
rowsUpdated()返回受影響的行數(INSERT/UPDATE/DELETE計數)。其 Kotlin 協程變體命名為awaitRowsUpdated()。
在不指定進一步對映細節的情況下,查詢將表格結果作為 Map 返回,其鍵是不區分大小寫的列名,對映到其列值。
您可以透過提供一個 Function<Row, T> 來控制結果對映,該函式將為每個 Row 呼叫,以便它可以返回任意值(單個值、集合和對映以及物件)。
以下示例提取 name 列併發出其值:
-
Java
-
Kotlin
Flux<String> names = client.sql("SELECT name FROM person")
.map(row -> row.get("name", String.class))
.all();
val names = client.sql("SELECT name FROM person")
.map{ row: Row -> row.get("name", String.class) }
.flow()
或者,有一個對映到單個值的快捷方式:
Flux<String> names = client.sql("SELECT name FROM person")
.mapValue(String.class)
.all();
或者您可以對映到具有 bean 屬性或記錄元件的結果物件:
// assuming a name property on Person
Flux<Person> persons = client.sql("SELECT name FROM person")
.mapProperties(Person.class)
.all();
使用 DatabaseClient 進行更新 (INSERT、UPDATE 和 DELETE)
修改語句的唯一區別是這些語句通常不返回表格資料,因此您使用 rowsUpdated() 來消費結果。
以下示例顯示了一個 UPDATE 語句,該語句返回更新的行數:
-
Java
-
Kotlin
Mono<Integer> affectedRows = client.sql("UPDATE person SET first_name = :fn")
.bind("fn", "Joe")
.fetch().rowsUpdated();
val affectedRows = client.sql("UPDATE person SET first_name = :fn")
.bind("fn", "Joe")
.fetch().awaitRowsUpdated()
將值繫結到查詢
典型的應用程式需要引數化 SQL 語句以根據某些輸入選擇或更新行。這些通常是受 WHERE 子句約束的 SELECT 語句或接受輸入引數的 INSERT 和 UPDATE 語句。如果引數未正確轉義,引數化語句存在 SQL 注入的風險。DatabaseClient 利用 R2DBC 的 bind API 來消除查詢引數的 SQL 注入風險。您可以使用 execute(…) 運算子提供引數化 SQL 語句並將引數繫結到實際的 Statement。然後,您的 R2DBC 驅動程式將使用預處理語句和引數替換來執行該語句。
引數繫結支援兩種繫結策略:
-
按索引,使用從零開始的引數索引。
-
按名稱,使用佔位符名稱。
以下示例顯示了查詢的引數繫結:
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind("id", "joe")
.bind("name", "Joe")
.bind("age", 34);
或者,您可以傳入名稱和值的對映:
Map<String, Object> params = new LinkedHashMap<>();
params.put("id", "joe");
params.put("name", "Joe");
params.put("age", 34);
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bindValues(params);
或者您可以傳入具有 bean 屬性或記錄元件的引數物件:
// assuming id, name, age properties on Person
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bindProperties(new Person("joe", "Joe", 34);
或者,您可以使用位置引數將值繫結到語句。索引從零開始。
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind(0, "joe")
.bind(1, "Joe")
.bind(2, 34);
如果您的應用程式繫結到許多引數,可以透過一次呼叫實現相同的功能:
List<?> values = List.of("joe", "Joe", 34);
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bindValues(values);
查詢預處理器將命名 Collection 引數展開為一系列繫結標記,從而無需根據引數數量進行動態查詢建立。巢狀物件陣列被展開以允許使用(例如)選擇列表。
考慮以下查詢:
SELECT id, name, state FROM table WHERE (name, age) IN (('John', 35), ('Ann', 50))
前面的查詢可以引數化並按如下方式執行:
-
Java
-
Kotlin
List<Object[]> tuples = new ArrayList<>();
tuples.add(new Object[] {"John", 35});
tuples.add(new Object[] {"Ann", 50});
client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
.bind("tuples", tuples);
val tuples: MutableList<Array<Any>> = ArrayList()
tuples.add(arrayOf("John", 35))
tuples.add(arrayOf("Ann", 50))
client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
.bind("tuples", tuples)
| 選擇列表的使用取決於供應商。 |
以下示例顯示了一個使用 IN 謂詞的更簡單變體:
-
Java
-
Kotlin
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
.bind("ages", Arrays.asList(35, 50));
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
.bind("ages", arrayOf(35, 50))
R2DBC 本身不支援類似 Collection 的值。然而,在 Spring 的 R2DBC 支援中,擴充套件上述示例中給定的 List 對命名引數有效,例如,用於上述 IN 子句。但是,插入或更新陣列型別列(例如,在 Postgres 中)需要底層 R2DBC 驅動程式支援的陣列型別:通常是 Java 陣列,例如 String[] 以更新 text[] 列。不要將 Collection<String> 或類似型別作為陣列引數傳遞。 |
語句過濾器
有時您需要在實際 Statement 執行之前微調其選項。為此,請向 DatabaseClient 註冊一個 Statement 過濾器 (StatementFilterFunction),以在執行過程中攔截和修改語句,如以下示例所示:
-
Java
-
Kotlin
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter((s, next) -> next.execute(s.returnGeneratedValues("id")))
.bind("name", …)
.bind("state", …);
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { s: Statement, next: ExecuteFunction -> next.execute(s.returnGeneratedValues("id")) }
.bind("name", …)
.bind("state", …)
DatabaseClient 還公開了一個簡化的 filter(…) 過載,它接受一個 Function<Statement, Statement>:
-
Java
-
Kotlin
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter(statement -> s.returnGeneratedValues("id"));
client.sql("SELECT id, name, state FROM table")
.filter(statement -> s.fetchSize(25));
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { statement -> s.returnGeneratedValues("id") }
client.sql("SELECT id, name, state FROM table")
.filter { statement -> s.fetchSize(25) }
StatementFilterFunction 實現允許過濾 Statement 和過濾 Result 物件。
DatabaseClient 最佳實踐
DatabaseClient 類的例項一旦配置,就是執行緒安全的。這很重要,因為這意味著您可以配置 DatabaseClient 的單個例項,然後安全地將此共享引用注入到多個 DAO(或儲存庫)中。DatabaseClient 是有狀態的,因為它維護對 ConnectionFactory 的引用,但此狀態不是會話狀態。
使用 DatabaseClient 類時的常見做法是在 Spring 配置檔案中配置 ConnectionFactory,然後將該共享 ConnectionFactory bean 依賴注入到您的 DAO 類中。DatabaseClient 在 ConnectionFactory 的 setter 中建立。這導致 DAO 類似於以下內容:
-
Java
-
Kotlin
public class R2dbcCorporateEventDao implements CorporateEventDao {
private DatabaseClient databaseClient;
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.databaseClient = DatabaseClient.create(connectionFactory);
}
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao {
private val databaseClient = DatabaseClient.create(connectionFactory)
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
除了顯式配置之外,還可以使用元件掃描和註解支援進行依賴注入。在這種情況下,您可以使用 @Component 註解類(使其成為元件掃描的候選者),並使用 @Autowired 註解 ConnectionFactory setter 方法。以下示例顯示瞭如何操作:
-
Java
-
Kotlin
@Component (1)
public class R2dbcCorporateEventDao implements CorporateEventDao {
private DatabaseClient databaseClient;
@Autowired (2)
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.databaseClient = DatabaseClient.create(connectionFactory); (3)
}
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
| 1 | 用 @Component 註解類。 |
| 2 | 用 @Autowired 註解 ConnectionFactory setter 方法。 |
| 3 | 使用 ConnectionFactory 建立新的 DatabaseClient。 |
@Component (1)
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao { (2)
private val databaseClient = DatabaseClient(connectionFactory) (3)
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
| 1 | 用 @Component 註解類。 |
| 2 | ConnectionFactory 的建構函式注入。 |
| 3 | 使用 ConnectionFactory 建立新的 DatabaseClient。 |
無論您選擇使用(或不使用)上述哪種模板初始化樣式,每次要執行 SQL 時都很少需要建立 DatabaseClient 類的新例項。一旦配置,DatabaseClient 例項就是執行緒安全的。如果您的應用程式訪問多個數據庫,您可能需要多個 DatabaseClient 例項,這需要多個 ConnectionFactory,因此需要多個不同配置的 DatabaseClient 例項。
檢索自動生成的主鍵
當將行插入定義了自增或標識列的表中時,INSERT 語句可能會生成主鍵。要完全控制要生成的主鍵列名,只需註冊一個 StatementFilterFunction,該函式請求所需列的生成主鍵。
-
Java
-
Kotlin
Mono<Integer> generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter(statement -> s.returnGeneratedValues("id"))
.map(row -> row.get("id", Integer.class))
.first();
// generatedId emits the generated key once the INSERT statement has finished
val generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { statement -> s.returnGeneratedValues("id") }
.map { row -> row.get("id", Integer.class) }
.awaitOne()
// generatedId emits the generated key once the INSERT statement has finished
控制資料庫連線
本節內容
使用 ConnectionFactory
Spring 透過 ConnectionFactory 獲取 R2DBC 資料庫連線。ConnectionFactory 是 R2DBC 規範的一部分,是驅動程式的常見入口點。它允許容器或框架嚮應用程式程式碼隱藏連線池和事務管理問題。作為開發人員,您無需瞭解如何連線到資料庫的詳細資訊。這是設定 ConnectionFactory 的管理員的職責。您很可能在開發和測試程式碼時同時扮演這兩種角色,但您不一定必須知道生產資料來源是如何配置的。
當您使用 Spring 的 R2DBC 層時,您可以使用第三方提供的連線池實現來配置自己的連線。流行的實現是 R2DBC Pool (r2dbc-pool)。Spring 發行版中的實現僅用於測試目的,不提供池化。
要配置 ConnectionFactory:
-
使用
ConnectionFactory獲取連線,就像您通常獲取 R2DBCConnectionFactory一樣。 -
提供 R2DBC URL(有關正確值,請參閱您的驅動程式文件)。
以下示例顯示瞭如何配置 ConnectionFactory:
-
Java
-
Kotlin
ConnectionFactory factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
val factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
使用 ConnectionFactoryUtils
ConnectionFactoryUtils 類是一個方便且功能強大的 helper 類,它提供 static 方法從 ConnectionFactory 獲取連線並關閉連線(如果需要)。
它支援與訂閱者 Context 繫結的連線,例如 R2dbcTransactionManager。
使用 SingleConnectionFactory
SingleConnectionFactory 類是 DelegatingConnectionFactory 介面的一個實現,它封裝了一個在每次使用後不關閉的單個 Connection。
如果任何客戶端程式碼假定連線池呼叫 close(如在使用持久化工具時),您應該將 suppressClose 屬性設定為 true。此設定返回一個抑制關閉的代理,該代理封裝了物理連線。請注意,您不能再將其強制轉換為原生 Connection 或類似物件。
SingleConnectionFactory 主要是一個測試類,如果您的 R2DBC 驅動程式允許,它可用於特定要求,例如流水線。與池化的 ConnectionFactory 不同,它始終重用相同的連線,避免過度建立物理連線。
使用 TransactionAwareConnectionFactoryProxy
TransactionAwareConnectionFactoryProxy 是目標 ConnectionFactory 的代理。該代理封裝了目標 ConnectionFactory,以增加對 Spring 管理的事務的感知。
如果您使用的 R2DBC 客戶端沒有以其他方式與 Spring 的 R2DBC 支援整合,則需要使用此類的。在這種情況下,您仍然可以使用此客戶端,同時讓此客戶端參與 Spring 管理的事務。通常,最好將 R2DBC 客戶端與對 ConnectionFactoryUtils 的適當訪問整合以進行資源管理。 |
有關更多詳細資訊,請參閱 TransactionAwareConnectionFactoryProxy javadoc。
使用 R2dbcTransactionManager
R2dbcTransactionManager 類是單個 R2DBC ConnectionFactory 的 ReactiveTransactionManager 實現。它將指定 ConnectionFactory 中的 R2DBC Connection 繫結到訂閱者 Context,可能允許每個 ConnectionFactory 有一個訂閱者 Connection。
應用程式程式碼需要透過 ConnectionFactoryUtils.getConnection(ConnectionFactory) 而不是 R2DBC 的標準 ConnectionFactory.create() 來檢索 R2DBC Connection。所有框架類(例如 DatabaseClient)都隱式使用此策略。如果不與事務管理器一起使用,查詢策略的行為與 ConnectionFactory.create() 完全相同,因此在任何情況下都可以使用。