資料庫
與大多數企業應用風格一樣,資料庫是 Batch 的主要儲存機制。然而,Batch 與其他應用風格不同,因為它必須處理的資料集規模龐大。如果一個 SQL 語句返回一百萬行,結果集可能會將所有返回的結果都儲存在記憶體中,直到所有行都被讀取。Spring Batch 為這個問題提供了兩種解決方案:
基於遊標的 ItemReader
實現
使用資料庫遊標通常是大多數 Batch 開發人員的預設方法,因為這是資料庫解決關係型資料“流式傳輸”問題的方案。Java 的 ResultSet
類本質上是一種面向物件的機制,用於操作遊標。一個 ResultSet
維護一個指向當前資料行的遊標。在 ResultSet
上呼叫 next
會將此遊標移動到下一行。Spring Batch 基於遊標的 ItemReader
實現會在初始化時開啟一個遊標,並在每次呼叫 read
時將遊標向前移動一行,返回一個對映的物件,可用於處理。然後呼叫 close
方法以確保釋放所有資源。Spring Core 的 JdbcTemplate
透過使用回撥模式來解決這個問題,它在返回控制權給方法呼叫者之前完全對映 ResultSet
中的所有行並關閉。然而,在 Batch 中,這必須等到 Step 完成。下圖顯示了基於遊標的 ItemReader
如何工作的通用圖示。請注意,雖然示例使用了 SQL(因為 SQL 廣為人知),但任何技術都可以實現基本方法。

此示例說明了基本模式。給定一個具有三列:ID
、NAME
和 BAR
的“FOO”表,選擇所有 ID 大於 1 但小於 7 的行。這將遊標的起始位置(第 1 行)置於 ID 為 2 的行。這一行的結果應該是一個完全對映的 Foo
物件。再次呼叫 read()
會將遊標移到下一行,即 ID 為 3 的 Foo
。這些 read 的結果在每次 read
之後被寫出,允許物件被垃圾回收(假設沒有例項變數維護對其的引用)。
JdbcCursorItemReader
JdbcCursorItemReader
是基於遊標技術的 JDBC 實現。它直接與 ResultSet
工作,需要一個 SQL 語句來針對從 DataSource
獲取的連線執行。以下資料庫 Schema 用作示例:
CREATE TABLE CUSTOMER (
ID BIGINT IDENTITY PRIMARY KEY,
NAME VARCHAR(45),
CREDIT FLOAT
);
許多人傾向於為每一行使用一個領域物件,因此以下示例使用 RowMapper
介面的實現來對映 CustomerCredit
物件:
public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {
public static final String ID_COLUMN = "id";
public static final String NAME_COLUMN = "name";
public static final String CREDIT_COLUMN = "credit";
public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
CustomerCredit customerCredit = new CustomerCredit();
customerCredit.setId(rs.getInt(ID_COLUMN));
customerCredit.setName(rs.getString(NAME_COLUMN));
customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));
return customerCredit;
}
}
因為 JdbcCursorItemReader
與 JdbcTemplate
共享關鍵介面,所以檢視如何使用 JdbcTemplate
讀取這些資料的示例很有用,以便將其與 ItemReader
進行對比。為了本示例的目的,假設 CUSTOMER
資料庫中有 1,000 行。第一個示例使用 JdbcTemplate
:
//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
new CustomerCreditRowMapper());
執行上述程式碼片段後,customerCredits
列表包含 1,000 個 CustomerCredit
物件。在 query 方法中,從 DataSource
獲取連線,針對該連線執行提供的 SQL,併為 ResultSet
中的每一行呼叫 mapRow
方法。將其與 JdbcCursorItemReader
的方法進行對比,如下例所示:
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
執行上述程式碼片段後,計數器等於 1,000。如果上述程式碼將返回的 customerCredit
放入列表中,結果將與 JdbcTemplate
示例完全相同。然而,ItemReader
的巨大優勢在於它允許 Item 被“流式傳輸”。可以呼叫一次 read
方法,該 Item 可以由 ItemWriter
寫出,然後可以透過 read
獲取下一個 Item。這允許以“塊”的形式進行 Item 的讀取和寫入,並定期提交,這是高效能 Batch 處理的精髓。此外,它可以很容易地配置注入到 Spring Batch Step
中。
-
Java
-
XML
以下示例展示瞭如何在 Java 中將 ItemReader
注入到 Step
中:
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
return new JdbcCursorItemReaderBuilder<CustomerCredit>()
.dataSource(this.dataSource)
.name("creditReader")
.sql("select ID, NAME, CREDIT from CUSTOMER")
.rowMapper(new CustomerCreditRowMapper())
.build();
}
以下示例展示瞭如何在 XML 中將 ItemReader
注入到 Step
中:
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
附加屬性
由於在 Java 中開啟遊標有許多不同的選項,JdbcCursorItemReader
上有許多可以設定的屬性,如下表所述:
ignoreWarnings |
確定 SQLWarning 是否被記錄或導致異常。預設為 |
fetchSize |
當 |
maxRows |
設定底層 |
queryTimeout |
設定驅動程式等待 |
verifyCursorPosition |
因為 |
saveState |
指示讀取器的狀態是否應在 |
driverSupportsAbsolute |
指示 JDBC 驅動程式是否支援在 |
setUseSharedExtendedConnection |
指示用於遊標的連線是否應由所有其他處理使用,從而共享同一個事務。如果設定為 |
StoredProcedureItemReader
有時需要透過儲存過程獲取遊標資料。StoredProcedureItemReader
的工作方式類似於 JdbcCursorItemReader
,但不是執行查詢來獲取遊標,而是執行一個返回遊標的儲存過程。儲存過程可以透過三種不同方式返回遊標:
-
作為返回的
ResultSet
(由 SQL Server、Sybase、DB2、Derby 和 MySQL 使用)。 -
作為透過 out 引數返回的 ref-cursor(由 Oracle 和 PostgreSQL 使用)。
-
作為儲存函式呼叫的返回值。
-
Java
-
XML
以下 Java 示例配置使用與之前示例相同的“客戶信用”示例:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
return reader;
}
以下 XML 示例配置使用與之前示例相同的“客戶信用”示例:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
前面的示例依賴儲存過程提供一個 ResultSet
作為返回結果(前面提到的選項 1)。
如果儲存過程返回一個 ref-cursor
(選項 2),那麼我們需要提供作為返回的 ref-cursor
的 out 引數的位置。
-
Java
-
XML
以下示例展示瞭如何在 Java 中處理第一個引數為 ref-cursor 的情況:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setRefCursorPosition(1);
return reader;
}
以下示例展示瞭如何在 XML 中處理第一個引數為 ref-cursor 的情況:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
如果遊標是從儲存函式返回的(選項 3),我們需要將屬性“function”設定為 true
。預設為 false
。
-
Java
-
XML
以下示例展示瞭如何在 Java 中將屬性設定為 true
:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setFunction(true);
return reader;
}
以下示例展示瞭如何在 XML 中將屬性設定為 true
:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="function" value="true"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
在所有這些情況下,我們需要定義一個 RowMapper
,以及一個 DataSource
和實際的儲存過程名稱。
如果儲存過程或函式接受引數,則必須使用 parameters
屬性宣告和設定它們。以下示例,適用於 Oracle,聲明瞭三個引數。第一個是返回 ref-cursor 的 out
引數,第二和第三個是 in 引數,接受型別為 INTEGER
的值。
-
Java
-
XML
以下示例展示瞭如何在 Java 中處理引數:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
List<SqlParameter> parameters = new ArrayList<>();
parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
parameters.add(new SqlParameter("amount", Types.INTEGER);
parameters.add(new SqlParameter("custId", Types.INTEGER);
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("spring.cursor_func");
reader.setParameters(parameters);
reader.setRefCursorPosition(1);
reader.setRowMapper(rowMapper());
reader.setPreparedStatementSetter(parameterSetter());
return reader;
}
以下示例展示瞭如何在 XML 中處理引數:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="spring.cursor_func"/>
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="newid"/>
<constructor-arg index="1">
<util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="amount"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="custid"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper" ref="rowMapper"/>
<property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>
除了引數宣告之外,我們需要指定一個 PreparedStatementSetter
實現,用於設定呼叫的引數值。這與上面 JdbcCursorItemReader
的工作方式相同。在附加屬性中列出的所有附加屬性也適用於 StoredProcedureItemReader
。
基於分頁的 ItemReader
實現
使用資料庫遊標的替代方案是執行多個查詢,每個查詢獲取結果的一部分。我們將這部分稱為一頁。每個查詢必須指定起始行號和我們希望在該頁中返回的行數。
JdbcPagingItemReader
JdbcPagingItemReader
是基於分頁的 ItemReader
的一個實現。JdbcPagingItemReader
需要一個 PagingQueryProvider
,負責提供用於檢索構成頁的行的 SQL 查詢。由於每個資料庫都有自己的提供分頁支援的策略,我們需要為每種支援的資料庫型別使用不同的 PagingQueryProvider
。還有一個 SqlPagingQueryProviderFactoryBean
,它可以自動檢測正在使用的資料庫並確定合適的 PagingQueryProvider
實現。這簡化了配置,是推薦的最佳實踐。
SqlPagingQueryProviderFactoryBean
要求您指定一個 select
子句和一個 from
子句。您還可以提供一個可選的 where
子句。這些子句和必需的 sortKey
用於構建一個 SQL 語句。
在 sortKey 上具有唯一鍵約束很重要,以保證在執行之間不會丟失資料。 |
讀取器開啟後,它會像任何其他 ItemReader
一樣,每次呼叫 read
都返回一個 Item。當需要更多行時,分頁在後臺發生。
-
Java
-
XML
以下 Java 示例配置使用與之前展示的基於遊標的 ItemReaders
類似的“客戶信用”示例:
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
以下 XML 示例配置使用與之前展示的基於遊標的 ItemReaders
類似的“客戶信用”示例:
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="queryProvider">
<bean class="org.spr...SqlPagingQueryProviderFactoryBean">
<property name="selectClause" value="select id, name, credit"/>
<property name="fromClause" value="from customer"/>
<property name="whereClause" value="where status=:status"/>
<property name="sortKey" value="id"/>
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="status" value="NEW"/>
</map>
</property>
<property name="pageSize" value="1000"/>
<property name="rowMapper" ref="customerMapper"/>
</bean>
此配置的 ItemReader
使用 RowMapper
返回 CustomerCredit
物件,RowMapper
必須指定。pageSize
屬性確定在每次查詢執行時從資料庫讀取的實體數量。
parameterValues
屬性可用於指定引數值的 Map
。如果在 where
子句中使用命名引數,則每個條目的鍵應該與命名引數的名稱匹配。如果使用傳統的“?”佔位符,那麼每個條目的鍵應該是指佔位符的序號,從 1 開始。
JpaPagingItemReader
JpaPagingItemReader
是基於分頁的 ItemReader
的另一個實現。JPA 沒有類似於 Hibernate StatelessSession
的概念,所以我們必須使用 JPA 規範提供的其他特性。由於 JPA 支援分頁,因此在使用 JPA 進行 Batch 處理時,這是一個自然的選擇。讀取每一頁後,實體變為遊離態,持久化上下文被清除,允許實體在頁面處理完畢後被垃圾回收。
JpaPagingItemReader
允許您宣告一個 JPQL 語句,並傳入一個 EntityManagerFactory
。然後它會像任何其他 ItemReader
一樣,每次呼叫 read 都返回一個 Item。當需要更多實體時,分頁在後臺發生。
-
Java
-
XML
以下 Java 示例配置使用與之前展示的 JDBC 讀取器相同的“客戶信用”示例:
@Bean
public JpaPagingItemReader itemReader() {
return new JpaPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.entityManagerFactory(entityManagerFactory())
.queryString("select c from CustomerCredit c")
.pageSize(1000)
.build();
}
以下 XML 示例配置使用與之前展示的 JDBC 讀取器相同的“客戶信用”示例:
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
<property name="entityManagerFactory" ref="entityManagerFactory"/>
<property name="queryString" value="select c from CustomerCredit c"/>
<property name="pageSize" value="1000"/>
</bean>
此配置的 ItemReader
以與上面 JdbcPagingItemReader
描述的完全相同的方式返回 CustomerCredit
物件,假設 CustomerCredit
物件具有正確的 JPA 註解或 ORM 對映檔案。pageSize
屬性確定在每次查詢執行時從資料庫讀取的實體數量。
資料庫 ItemWriter
雖然平面檔案和 XML 檔案都有特定的 ItemWriter
例項,但在資料庫領域沒有完全等價的。這是因為事務提供了所有需要的功能。檔案需要 ItemWriter
實現,因為它們必須表現得像事務一樣,跟蹤已寫入的 Item 並在適當的時間執行 flush 或 clear。資料庫不需要此功能,因為寫入操作已經包含在事務中。使用者可以建立自己的實現 ItemWriter
介面的 DAO,或者使用來自自定義 ItemWriter
的一個,該自定義 ItemWriter
是為通用處理關注點編寫的。無論哪種方式,它們都應該正常工作。需要注意的一點是批次寫入輸出提供的效能和錯誤處理能力。這在使用 hibernate 作為 ItemWriter
時最常見,但在使用 JDBC batch 模式時也可能出現相同問題。批次寫入資料庫輸出本身沒有缺陷,前提是我們小心地執行 flush 並且資料中沒有錯誤。然而,寫入時出現的任何錯誤都可能導致困惑,因為無法知道是哪個單獨的 Item 導致了異常,甚至無法知道是否有任何單獨的 Item 負責,如下圖所示:

如果在寫入前緩衝 Item,直到緩衝區在提交之前被 flush 之前不會丟擲任何錯誤。例如,假設每個 chunk 寫入 20 個 Item,第 15 個 Item 丟擲了 DataIntegrityViolationException
。就 Step
而言,所有 20 個 Item 都成功寫入,因為直到它們實際寫入時才知道發生了錯誤。一旦呼叫 Session#flush()
,緩衝區被清空,異常被觸發。此時,Step
無能為力。事務必須回滾。通常,此異常可能會導致 Item 被跳過(取決於跳過/重試策略),然後它不會再次寫入。然而,在批次處理場景中,無法知道是哪個 Item 導致了問題。發生故障時,整個緩衝區正在寫入。解決此問題的唯一方法是在每個 Item 之後進行 flush,如下圖所示:

這是一個常見的用例,尤其是在使用 Hibernate 時,ItemWriter
實現的簡單指導原則是在每次呼叫 write()
時執行 flush。這樣做可以可靠地跳過 Item,由 Spring Batch 在內部處理發生錯誤後對 ItemWriter
的呼叫的粒度。