資料庫
與大多數企業應用樣式一樣,資料庫是批處理的中心儲存機制。然而,批處理與其他應用樣式不同,因為它必須處理的資料集規模龐大。如果一個 SQL 語句返回 100 萬行,結果集可能會將所有返回的結果儲存在記憶體中,直到所有行都被讀取。Spring Batch 為此問題提供了兩種解決方案:
基於遊標的 ItemReader 實現
使用資料庫遊標通常是大多數批處理開發人員的預設方法,因為它是資料庫解決“流式傳輸”關係資料問題的方法。Java 的 ResultSet 類本質上是一種用於操作遊標的面向物件機制。ResultSet 維護一個指向當前資料行的遊標。呼叫 ResultSet 上的 next 會將此遊標移動到下一行。Spring Batch 基於遊標的 ItemReader 實現會在初始化時開啟一個遊標,並在每次呼叫 read 時將遊標向前移動一行,返回一個可用於處理的對映物件。然後呼叫 close 方法以確保所有資源都被釋放。Spring Core 的 JdbcTemplate 透過使用回撥模式完全對映 ResultSet 中的所有行並在將控制權返回給方法呼叫者之前關閉來解決此問題。然而,在批處理中,這必須等到步驟完成。下圖顯示了基於遊標的 ItemReader 如何工作的通用圖表。請注意,雖然示例使用 SQL(因為 SQL 廣為人知),但任何技術都可以實現基本方法。
此示例說明了基本模式。給定一個具有三列的“FOO”表:ID、NAME 和 BAR,選擇 ID 大於 1 但小於 7 的所有行。這將遊標的開頭(第 1 行)放在 ID 2 上。此行的結果應該是一個完全對映的 Foo 物件。再次呼叫 read() 會將遊標移動到下一行,即 ID 為 3 的 Foo。這些讀取的結果在每次 read 之後寫入,允許物件進行垃圾回收(假設沒有例項變數維護對它們的引用)。
JdbcCursorItemReader
JdbcCursorItemReader 是基於遊標技術的 JDBC 實現。它直接與 ResultSet 一起工作,並且需要一個 SQL 語句來對從 DataSource 獲取的連線執行。以下資料庫 schema 用作示例
CREATE TABLE CUSTOMER (
ID BIGINT IDENTITY PRIMARY KEY,
NAME VARCHAR(45),
CREDIT FLOAT
);
許多人喜歡為每一行使用一個領域物件,因此以下示例使用 RowMapper 介面的實現來對映 CustomerCredit 物件
public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {
public static final String ID_COLUMN = "id";
public static final String NAME_COLUMN = "name";
public static final String CREDIT_COLUMN = "credit";
public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
CustomerCredit customerCredit = new CustomerCredit();
customerCredit.setId(rs.getInt(ID_COLUMN));
customerCredit.setName(rs.getString(NAME_COLUMN));
customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));
return customerCredit;
}
}
因為 JdbcCursorItemReader 與 JdbcTemplate 共享關鍵介面,所以檢視如何使用 JdbcTemplate 讀取此資料的示例很有用,以便與 ItemReader 進行對比。為了本示例的目的,假設 CUSTOMER 資料庫中有 1,000 行。第一個示例使用 JdbcTemplate
//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
new CustomerCreditRowMapper());
執行完前面的程式碼片段後,customerCredits 列表包含 1,000 個 CustomerCredit 物件。在查詢方法中,從 DataSource 獲取連線,對它執行提供的 SQL,併為 ResultSet 中的每一行呼叫 mapRow 方法。將其與 JdbcCursorItemReader 的方法進行對比,如以下示例所示
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
執行上述程式碼片段後,計數器等於 1,000。如果上述程式碼將返回的 customerCredit 放入列表中,結果將與 JdbcTemplate 示例完全相同。但是,ItemReader 的最大優點是它允許“流式傳輸”項。read 方法可以呼叫一次,項可以由 ItemWriter 寫出,然後可以透過 read 獲取下一個項。這允許以“塊”的方式完成項的讀取和寫入並定期提交,這是高效能批處理的精髓。此外,它可以輕鬆配置以注入 Spring Batch Step。
-
Java
-
XML
以下示例展示瞭如何在 Java 中將 ItemReader 注入到 Step 中
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
return new JdbcCursorItemReaderBuilder<CustomerCredit>()
.dataSource(this.dataSource)
.name("creditReader")
.sql("select ID, NAME, CREDIT from CUSTOMER")
.rowMapper(new CustomerCreditRowMapper())
.build();
}
以下示例展示瞭如何在 XML 中將 ItemReader 注入到 Step 中
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
附加屬性
由於在 Java 中開啟遊標的選項很多,因此 JdbcCursorItemReader 上可以設定許多屬性,如以下表格所述
ignoreWarnings |
確定是否記錄 SQL 警告或引發異常。預設值為 |
fetchSize |
向 JDBC 驅動程式提供一個提示,指示當 |
maxRows |
設定底層 |
queryTimeout |
設定驅動程式等待 |
verifyCursorPosition |
由於 |
saveState |
指示讀取器的狀態是否應儲存在 |
driverSupportsAbsolute |
指示 JDBC 驅動程式是否支援在 |
setUseSharedExtendedConnection |
指示遊標使用的連線是否應由所有其他處理使用,從而共享相同的事務。如果將其設定為 |
StoredProcedureItemReader
有時需要透過使用儲存過程來獲取遊標資料。StoredProcedureItemReader 的工作方式與 JdbcCursorItemReader 類似,不同之處在於,它不是執行查詢來獲取遊標,而是執行返回遊標的儲存過程。儲存過程可以透過三種不同的方式返回遊標
-
作為返回的
ResultSet(由 SQL Server、Sybase、DB2、Derby 和 MySQL 使用)。 -
作為作為輸出引數返回的 ref-cursor(由 Oracle 和 PostgreSQL 使用)。
-
作為儲存函式呼叫的返回值。
-
Java
-
XML
以下 Java 示例配置使用與早期示例相同的“客戶信用”示例
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
return reader;
}
以下 XML 示例配置使用與早期示例相同的“客戶信用”示例
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
前面的示例依賴儲存過程提供 ResultSet 作為返回結果(前面選項 1)。
如果儲存過程返回 ref-cursor(選項 2),那麼我們需要提供作為返回 ref-cursor 的輸出引數的位置。
-
Java
-
XML
以下示例展示瞭如何在 Java 中處理第一個引數是 ref-cursor 的情況
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setRefCursorPosition(1);
return reader;
}
以下示例展示瞭如何在 XML 中處理第一個引數是 ref-cursor 的情況
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
如果遊標是從儲存函式(選項 3)返回的,我們需要將屬性“function”設定為 true。它預設為 false。
-
Java
-
XML
以下示例展示了在 Java 中將屬性設定為 true
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setFunction(true);
return reader;
}
以下示例展示了在 XML 中將屬性設定為 true
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="function" value="true"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
在所有這些情況下,我們都需要定義一個 RowMapper 以及一個 DataSource 和實際的儲存過程名稱。
如果儲存過程或函式帶有引數,那麼必須使用 parameters 屬性宣告並設定它們。以下示例(針對 Oracle)聲明瞭三個引數。第一個是返回 ref-cursor 的 out 引數,第二個和第三個是接受 INTEGER 型別值的輸入引數。
-
Java
-
XML
以下示例展示瞭如何在 Java 中使用引數
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
List<SqlParameter> parameters = new ArrayList<>();
parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
parameters.add(new SqlParameter("amount", Types.INTEGER);
parameters.add(new SqlParameter("custId", Types.INTEGER);
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("spring.cursor_func");
reader.setParameters(parameters);
reader.setRefCursorPosition(1);
reader.setRowMapper(rowMapper());
reader.setPreparedStatementSetter(parameterSetter());
return reader;
}
以下示例展示瞭如何在 XML 中使用引數
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="spring.cursor_func"/>
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="newid"/>
<constructor-arg index="1">
<util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="amount"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="custid"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper" ref="rowMapper"/>
<property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>
除了引數宣告之外,我們還需要指定一個 PreparedStatementSetter 實現來設定呼叫的引數值。這與上面的 JdbcCursorItemReader 相同。 附加屬性 中列出的所有附加屬性也適用於 StoredProcedureItemReader。
分頁 ItemReader 實現
使用資料庫遊標的另一種方法是執行多個查詢,每個查詢都獲取部分結果。我們將此部分稱為頁。每個查詢都必須指定起始行號以及我們希望在頁中返回的行數。
JdbcPagingItemReader
分頁 ItemReader 的一個實現是 JdbcPagingItemReader。JdbcPagingItemReader 需要一個 PagingQueryProvider,該提供程式負責提供用於檢索構成頁的行的 SQL 查詢。由於每個資料庫都有自己提供分頁支援的策略,因此我們需要為每種支援的資料庫型別使用不同的 PagingQueryProvider。還有一個 SqlPagingQueryProviderFactoryBean,它可以自動檢測正在使用的資料庫並確定適當的 PagingQueryProvider 實現。這簡化了配置,並且是推薦的最佳實踐。
SqlPagingQueryProviderFactoryBean 要求您指定一個 select 子句和一個 from 子句。您還可以提供一個可選的 where 子句。這些子句和必需的 sortKey 用於構建 SQL 語句。
在 sortKey 上具有唯一鍵約束很重要,以確保在執行之間不會丟失任何資料。 |
讀取器開啟後,它會以與任何其他 ItemReader 相同的基本方式,在每次呼叫 read 時返回一個項。當需要額外的行時,分頁會在幕後發生。
-
Java
-
XML
以下 Java 示例配置使用了與前面顯示的基於遊標的 ItemReaders 類似的“客戶信用”示例
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
以下 XML 示例配置使用了與前面顯示的基於遊標的 ItemReaders 類似的“客戶信用”示例
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="queryProvider">
<bean class="org.spr...SqlPagingQueryProviderFactoryBean">
<property name="selectClause" value="select id, name, credit"/>
<property name="fromClause" value="from customer"/>
<property name="whereClause" value="where status=:status"/>
<property name="sortKey" value="id"/>
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="status" value="NEW"/>
</map>
</property>
<property name="pageSize" value="1000"/>
<property name="rowMapper" ref="customerMapper"/>
</bean>
此配置的 ItemReader 使用必須指定的 RowMapper 返回 CustomerCredit 物件。'pageSize' 屬性確定每次查詢執行從資料庫讀取的實體數量。
'parameterValues' 屬性可用於為查詢指定引數值的 Map。如果在 where 子句中使用命名引數,則每個條目的鍵應與命名引數的名稱匹配。如果使用傳統的“?”佔位符,則每個條目的鍵應為佔位符的編號,從 1 開始。
JpaPagingItemReader
分頁 ItemReader 的另一個實現是 JpaPagingItemReader。JPA 沒有類似於 Hibernate StatelessSession 的概念,因此我們必須使用 JPA 規範提供的其他功能。由於 JPA 支援分頁,因此在將 JPA 用於批處理時,這是一個自然的選擇。每次讀取頁面後,實體都會分離,並清除持久化上下文,以允許實體在處理頁面後進行垃圾回收。
JpaPagingItemReader 允許您宣告一個 JPQL 語句並傳入一個 EntityManagerFactory。然後,它會以與任何其他 ItemReader 相同的基本方式,在每次呼叫 read 時返回一個項。當需要額外的實體時,分頁會在幕後發生。
-
Java
-
XML
以下 Java 示例配置使用與前面顯示的 JDBC 讀取器相同的“客戶信用”示例
@Bean
public JpaPagingItemReader itemReader() {
return new JpaPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.entityManagerFactory(entityManagerFactory())
.queryString("select c from CustomerCredit c")
.pageSize(1000)
.build();
}
以下 XML 示例配置使用與前面顯示的 JDBC 讀取器相同的“客戶信用”示例
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
<property name="entityManagerFactory" ref="entityManagerFactory"/>
<property name="queryString" value="select c from CustomerCredit c"/>
<property name="pageSize" value="1000"/>
</bean>
此配置的 ItemReader 以與上述 JdbcPagingItemReader 所述的完全相同的方式返回 CustomerCredit 物件,前提是 CustomerCredit 物件具有正確的 JPA 註解或 ORM 對映檔案。'pageSize' 屬性確定每次查詢執行從資料庫讀取的實體數量。
資料庫 ItemWriters
雖然平面檔案和 XML 檔案都有特定的 ItemWriter 例項,但在資料庫世界中沒有完全等效的例項。這是因為事務提供了所有需要的功能。ItemWriter 實現對於檔案是必需的,因為它們必須像事務一樣工作,跟蹤寫入的項並在適當的時候重新整理或清除。資料庫不需要此功能,因為寫入已包含在事務中。使用者可以建立自己的實現 ItemWriter 介面的 DAO,或者使用為通用處理問題編寫的自定義 ItemWriter 中的一個。無論哪種方式,它們都應該工作而不會出現任何問題。需要注意的一點是批處理輸出所提供的效能和錯誤處理能力。這在使用 Hibernate 作為 ItemWriter 時最常見,但在使用 JDBC 批處理模式時也可能存在相同的問題。批處理資料庫輸出本身沒有任何固有的缺陷,前提是我們仔細重新整理並且資料中沒有錯誤。但是,寫入時發生的任何錯誤都可能導致混淆,因為無法知道哪個單個項導致了異常,甚至無法知道是否有任何單個項負責,如下圖所示
如果在寫入之前緩衝了項,則在提交之前重新整理緩衝區之前不會丟擲任何錯誤。例如,假設每個塊寫入 20 個項,並且第 15 個項丟擲 DataIntegrityViolationException。就 Step 而言,所有 20 個項都成功寫入,因為在實際寫入之前無法知道是否發生錯誤。一旦呼叫 Session#flush(),緩衝區就會清空並觸發異常。此時,Step 無能為力。事務必須回滾。通常,此異常可能會導致項被跳過(取決於跳過/重試策略),然後不再寫入。但是,在批處理場景中,無法知道哪個項導致了問題。發生故障時正在寫入整個緩衝區。解決此問題的唯一方法是在每個項之後重新整理,如下圖所示
這是一個常見的用例,尤其是在使用 Hibernate 時,ItemWriter 實現的簡單指南是在每次呼叫 write() 時重新整理。這樣做可以可靠地跳過項,Spring Batch 會在出錯後內部處理對 ItemWriter 呼叫的粒度。