資料庫

與大多數企業應用風格一樣,資料庫是 Batch 的主要儲存機制。然而,Batch 與其他應用風格不同,因為它必須處理的資料集規模龐大。如果一個 SQL 語句返回一百萬行,結果集可能會將所有返回的結果都儲存在記憶體中,直到所有行都被讀取。Spring Batch 為這個問題提供了兩種解決方案:

基於遊標的 ItemReader 實現

使用資料庫遊標通常是大多數 Batch 開發人員的預設方法,因為這是資料庫解決關係型資料“流式傳輸”問題的方案。Java 的 ResultSet 類本質上是一種面向物件的機制,用於操作遊標。一個 ResultSet 維護一個指向當前資料行的遊標。在 ResultSet 上呼叫 next 會將此遊標移動到下一行。Spring Batch 基於遊標的 ItemReader 實現會在初始化時開啟一個遊標,並在每次呼叫 read 時將遊標向前移動一行,返回一個對映的物件,可用於處理。然後呼叫 close 方法以確保釋放所有資源。Spring Core 的 JdbcTemplate 透過使用回撥模式來解決這個問題,它在返回控制權給方法呼叫者之前完全對映 ResultSet 中的所有行並關閉。然而,在 Batch 中,這必須等到 Step 完成。下圖顯示了基於遊標的 ItemReader 如何工作的通用圖示。請注意,雖然示例使用了 SQL(因為 SQL 廣為人知),但任何技術都可以實現基本方法。

Cursor Example
圖 1. 遊標示例

此示例說明了基本模式。給定一個具有三列:IDNAMEBAR 的“FOO”表,選擇所有 ID 大於 1 但小於 7 的行。這將遊標的起始位置(第 1 行)置於 ID 為 2 的行。這一行的結果應該是一個完全對映的 Foo 物件。再次呼叫 read() 會將遊標移到下一行,即 ID 為 3 的 Foo。這些 read 的結果在每次 read 之後被寫出,允許物件被垃圾回收(假設沒有例項變數維護對其的引用)。

JdbcCursorItemReader

JdbcCursorItemReader 是基於遊標技術的 JDBC 實現。它直接與 ResultSet 工作,需要一個 SQL 語句來針對從 DataSource 獲取的連線執行。以下資料庫 Schema 用作示例:

CREATE TABLE CUSTOMER (
   ID BIGINT IDENTITY PRIMARY KEY,
   NAME VARCHAR(45),
   CREDIT FLOAT
);

許多人傾向於為每一行使用一個領域物件,因此以下示例使用 RowMapper 介面的實現來對映 CustomerCredit 物件:

public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {

    public static final String ID_COLUMN = "id";
    public static final String NAME_COLUMN = "name";
    public static final String CREDIT_COLUMN = "credit";

    public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
        CustomerCredit customerCredit = new CustomerCredit();

        customerCredit.setId(rs.getInt(ID_COLUMN));
        customerCredit.setName(rs.getString(NAME_COLUMN));
        customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));

        return customerCredit;
    }
}

因為 JdbcCursorItemReaderJdbcTemplate 共享關鍵介面,所以檢視如何使用 JdbcTemplate 讀取這些資料的示例很有用,以便將其與 ItemReader 進行對比。為了本示例的目的,假設 CUSTOMER 資料庫中有 1,000 行。第一個示例使用 JdbcTemplate

//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
                                          new CustomerCreditRowMapper());

執行上述程式碼片段後,customerCredits 列表包含 1,000 個 CustomerCredit 物件。在 query 方法中,從 DataSource 獲取連線,針對該連線執行提供的 SQL,併為 ResultSet 中的每一行呼叫 mapRow 方法。將其與 JdbcCursorItemReader 的方法進行對比,如下例所示:

JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close();

執行上述程式碼片段後,計數器等於 1,000。如果上述程式碼將返回的 customerCredit 放入列表中,結果將與 JdbcTemplate 示例完全相同。然而,ItemReader 的巨大優勢在於它允許 Item 被“流式傳輸”。可以呼叫一次 read 方法,該 Item 可以由 ItemWriter 寫出,然後可以透過 read 獲取下一個 Item。這允許以“塊”的形式進行 Item 的讀取和寫入,並定期提交,這是高效能 Batch 處理的精髓。此外,它可以很容易地配置注入到 Spring Batch Step 中。

  • Java

  • XML

以下示例展示瞭如何在 Java 中將 ItemReader 注入到 Step 中:

Java 配置
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
	return new JdbcCursorItemReaderBuilder<CustomerCredit>()
			.dataSource(this.dataSource)
			.name("creditReader")
			.sql("select ID, NAME, CREDIT from CUSTOMER")
			.rowMapper(new CustomerCreditRowMapper())
			.build();

}

以下示例展示瞭如何在 XML 中將 ItemReader 注入到 Step 中:

XML 配置
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

附加屬性

由於在 Java 中開啟遊標有許多不同的選項,JdbcCursorItemReader 上有許多可以設定的屬性,如下表所述:

表 1. JdbcCursorItemReader 屬性

ignoreWarnings

確定 SQLWarning 是否被記錄或導致異常。預設為 true(意味著警告被記錄)。

fetchSize

ItemReader 使用的 ResultSet 物件需要更多行時,向 JDBC 驅動程式提供一個提示,說明應該從資料庫獲取的行數。預設情況下,不提供提示。

maxRows

設定底層 ResultSet 在任何時候可以容納的最大行數的限制。

queryTimeout

設定驅動程式等待 Statement 物件執行的秒數。如果超出限制,會丟擲 DataAccessException。(詳細資訊請查閱驅動程式供應商文件)。

verifyCursorPosition

因為 ItemReader 持有的同一個 ResultSet 會傳遞給 RowMapper,所以使用者自己呼叫 ResultSet.next() 是可能的,這可能會導致讀取器內部計數出現問題。將此值設定為 true 會在呼叫 RowMapper 後遊標位置與之前不同時丟擲異常。

saveState

指示讀取器的狀態是否應在 ItemStream#update(ExecutionContext) 提供的 ExecutionContext 中儲存。預設為 true

driverSupportsAbsolute

指示 JDBC 驅動程式是否支援在 ResultSet 上設定絕對行。建議支援 ResultSet.absolute() 的 JDBC 驅動程式將其設定為 true,因為它可能會提高效能,尤其是在處理大型資料集時 Step 失敗的情況下。預設為 false

setUseSharedExtendedConnection

指示用於遊標的連線是否應由所有其他處理使用,從而共享同一個事務。如果設定為 false,則遊標會用自己的連線開啟,不參與為 Step 的其餘處理啟動的任何事務。如果將此標誌設定為 true,則必須將 DataSource 包裝在 ExtendedConnectionDataSourceProxy 中,以防止連線在每次提交後被關閉和釋放。當您將此選項設定為 true 時,用於開啟遊標的語句會使用 'READ_ONLY' 和 'HOLD_CURSORS_OVER_COMMIT' 兩個選項建立。這允許在事務開始和 Step 處理中執行的提交之後保持遊標開啟。要使用此功能,您需要一個支援此功能的資料庫以及支援 JDBC 3.0 或更高版本的 JDBC 驅動程式。預設為 false

StoredProcedureItemReader

有時需要透過儲存過程獲取遊標資料。StoredProcedureItemReader 的工作方式類似於 JdbcCursorItemReader,但不是執行查詢來獲取遊標,而是執行一個返回遊標的儲存過程。儲存過程可以透過三種不同方式返回遊標:

  • 作為返回的 ResultSet(由 SQL Server、Sybase、DB2、Derby 和 MySQL 使用)。

  • 作為透過 out 引數返回的 ref-cursor(由 Oracle 和 PostgreSQL 使用)。

  • 作為儲存函式呼叫的返回值。

  • Java

  • XML

以下 Java 示例配置使用與之前示例相同的“客戶信用”示例:

Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());

	return reader;
}

以下 XML 示例配置使用與之前示例相同的“客戶信用”示例:

XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

前面的示例依賴儲存過程提供一個 ResultSet 作為返回結果(前面提到的選項 1)。

如果儲存過程返回一個 ref-cursor(選項 2),那麼我們需要提供作為返回的 ref-cursor 的 out 引數的位置。

  • Java

  • XML

以下示例展示瞭如何在 Java 中處理第一個引數為 ref-cursor 的情況:

Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());
	reader.setRefCursorPosition(1);

	return reader;
}

以下示例展示瞭如何在 XML 中處理第一個引數為 ref-cursor 的情況:

XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

如果遊標是從儲存函式返回的(選項 3),我們需要將屬性“function”設定為 true。預設為 false

  • Java

  • XML

以下示例展示瞭如何在 Java 中將屬性設定為 true

Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());
	reader.setFunction(true);

	return reader;
}

以下示例展示瞭如何在 XML 中將屬性設定為 true

XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="function" value="true"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

在所有這些情況下,我們需要定義一個 RowMapper,以及一個 DataSource 和實際的儲存過程名稱。

如果儲存過程或函式接受引數,則必須使用 parameters 屬性宣告和設定它們。以下示例,適用於 Oracle,聲明瞭三個引數。第一個是返回 ref-cursor 的 out 引數,第二和第三個是 in 引數,接受型別為 INTEGER 的值。

  • Java

  • XML

以下示例展示瞭如何在 Java 中處理引數:

Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	List<SqlParameter> parameters = new ArrayList<>();
	parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
	parameters.add(new SqlParameter("amount", Types.INTEGER);
	parameters.add(new SqlParameter("custId", Types.INTEGER);

	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("spring.cursor_func");
	reader.setParameters(parameters);
	reader.setRefCursorPosition(1);
	reader.setRowMapper(rowMapper());
	reader.setPreparedStatementSetter(parameterSetter());

	return reader;
}

以下示例展示瞭如何在 XML 中處理引數:

XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="spring.cursor_func"/>
    <property name="parameters">
        <list>
            <bean class="org.springframework.jdbc.core.SqlOutParameter">
                <constructor-arg index="0" value="newid"/>
                <constructor-arg index="1">
                    <util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="amount"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="custid"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
        </list>
    </property>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper" ref="rowMapper"/>
    <property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>

除了引數宣告之外,我們需要指定一個 PreparedStatementSetter 實現,用於設定呼叫的引數值。這與上面 JdbcCursorItemReader 的工作方式相同。在附加屬性中列出的所有附加屬性也適用於 StoredProcedureItemReader

基於分頁的 ItemReader 實現

使用資料庫遊標的替代方案是執行多個查詢,每個查詢獲取結果的一部分。我們將這部分稱為一頁。每個查詢必須指定起始行號和我們希望在該頁中返回的行數。

JdbcPagingItemReader

JdbcPagingItemReader 是基於分頁的 ItemReader 的一個實現。JdbcPagingItemReader 需要一個 PagingQueryProvider,負責提供用於檢索構成頁的行的 SQL 查詢。由於每個資料庫都有自己的提供分頁支援的策略,我們需要為每種支援的資料庫型別使用不同的 PagingQueryProvider。還有一個 SqlPagingQueryProviderFactoryBean,它可以自動檢測正在使用的資料庫並確定合適的 PagingQueryProvider 實現。這簡化了配置,是推薦的最佳實踐。

SqlPagingQueryProviderFactoryBean 要求您指定一個 select 子句和一個 from 子句。您還可以提供一個可選的 where 子句。這些子句和必需的 sortKey 用於構建一個 SQL 語句。

sortKey 上具有唯一鍵約束很重要,以保證在執行之間不會丟失資料。

讀取器開啟後,它會像任何其他 ItemReader 一樣,每次呼叫 read 都返回一個 Item。當需要更多行時,分頁在後臺發生。

  • Java

  • XML

以下 Java 示例配置使用與之前展示的基於遊標的 ItemReaders 類似的“客戶信用”示例:

Java 配置
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
	Map<String, Object> parameterValues = new HashMap<>();
	parameterValues.put("status", "NEW");

	return new JdbcPagingItemReaderBuilder<CustomerCredit>()
           				.name("creditReader")
           				.dataSource(dataSource)
           				.queryProvider(queryProvider)
           				.parameterValues(parameterValues)
           				.rowMapper(customerCreditMapper())
           				.pageSize(1000)
           				.build();
}

@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
	SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

	provider.setSelectClause("select id, name, credit");
	provider.setFromClause("from customer");
	provider.setWhereClause("where status=:status");
	provider.setSortKey("id");

	return provider;
}

以下 XML 示例配置使用與之前展示的基於遊標的 ItemReaders 類似的“客戶信用”示例:

XML 配置
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="queryProvider">
        <bean class="org.spr...SqlPagingQueryProviderFactoryBean">
            <property name="selectClause" value="select id, name, credit"/>
            <property name="fromClause" value="from customer"/>
            <property name="whereClause" value="where status=:status"/>
            <property name="sortKey" value="id"/>
        </bean>
    </property>
    <property name="parameterValues">
        <map>
            <entry key="status" value="NEW"/>
        </map>
    </property>
    <property name="pageSize" value="1000"/>
    <property name="rowMapper" ref="customerMapper"/>
</bean>

此配置的 ItemReader 使用 RowMapper 返回 CustomerCredit 物件,RowMapper 必須指定。pageSize 屬性確定在每次查詢執行時從資料庫讀取的實體數量。

parameterValues 屬性可用於指定引數值的 Map。如果在 where 子句中使用命名引數,則每個條目的鍵應該與命名引數的名稱匹配。如果使用傳統的“?”佔位符,那麼每個條目的鍵應該是指佔位符的序號,從 1 開始。

JpaPagingItemReader

JpaPagingItemReader 是基於分頁的 ItemReader 的另一個實現。JPA 沒有類似於 Hibernate StatelessSession 的概念,所以我們必須使用 JPA 規範提供的其他特性。由於 JPA 支援分頁,因此在使用 JPA 進行 Batch 處理時,這是一個自然的選擇。讀取每一頁後,實體變為遊離態,持久化上下文被清除,允許實體在頁面處理完畢後被垃圾回收。

JpaPagingItemReader 允許您宣告一個 JPQL 語句,並傳入一個 EntityManagerFactory。然後它會像任何其他 ItemReader 一樣,每次呼叫 read 都返回一個 Item。當需要更多實體時,分頁在後臺發生。

  • Java

  • XML

以下 Java 示例配置使用與之前展示的 JDBC 讀取器相同的“客戶信用”示例:

Java 配置
@Bean
public JpaPagingItemReader itemReader() {
	return new JpaPagingItemReaderBuilder<CustomerCredit>()
           				.name("creditReader")
           				.entityManagerFactory(entityManagerFactory())
           				.queryString("select c from CustomerCredit c")
           				.pageSize(1000)
           				.build();
}

以下 XML 示例配置使用與之前展示的 JDBC 讀取器相同的“客戶信用”示例:

XML 配置
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
    <property name="entityManagerFactory" ref="entityManagerFactory"/>
    <property name="queryString" value="select c from CustomerCredit c"/>
    <property name="pageSize" value="1000"/>
</bean>

此配置的 ItemReader 以與上面 JdbcPagingItemReader 描述的完全相同的方式返回 CustomerCredit 物件,假設 CustomerCredit 物件具有正確的 JPA 註解或 ORM 對映檔案。pageSize 屬性確定在每次查詢執行時從資料庫讀取的實體數量。

資料庫 ItemWriter

雖然平面檔案和 XML 檔案都有特定的 ItemWriter 例項,但在資料庫領域沒有完全等價的。這是因為事務提供了所有需要的功能。檔案需要 ItemWriter 實現,因為它們必須表現得像事務一樣,跟蹤已寫入的 Item 並在適當的時間執行 flush 或 clear。資料庫不需要此功能,因為寫入操作已經包含在事務中。使用者可以建立自己的實現 ItemWriter 介面的 DAO,或者使用來自自定義 ItemWriter 的一個,該自定義 ItemWriter 是為通用處理關注點編寫的。無論哪種方式,它們都應該正常工作。需要注意的一點是批次寫入輸出提供的效能和錯誤處理能力。這在使用 hibernate 作為 ItemWriter 時最常見,但在使用 JDBC batch 模式時也可能出現相同問題。批次寫入資料庫輸出本身沒有缺陷,前提是我們小心地執行 flush 並且資料中沒有錯誤。然而,寫入時出現的任何錯誤都可能導致困惑,因為無法知道是哪個單獨的 Item 導致了異常,甚至無法知道是否有任何單獨的 Item 負責,如下圖所示:

Error On Flush
圖 2. Flush 錯誤

如果在寫入前緩衝 Item,直到緩衝區在提交之前被 flush 之前不會丟擲任何錯誤。例如,假設每個 chunk 寫入 20 個 Item,第 15 個 Item 丟擲了 DataIntegrityViolationException。就 Step 而言,所有 20 個 Item 都成功寫入,因為直到它們實際寫入時才知道發生了錯誤。一旦呼叫 Session#flush(),緩衝區被清空,異常被觸發。此時,Step 無能為力。事務必須回滾。通常,此異常可能會導致 Item 被跳過(取決於跳過/重試策略),然後它不會再次寫入。然而,在批次處理場景中,無法知道是哪個 Item 導致了問題。發生故障時,整個緩衝區正在寫入。解決此問題的唯一方法是在每個 Item 之後進行 flush,如下圖所示:

Error On Write
圖 3. 寫入錯誤

這是一個常見的用例,尤其是在使用 Hibernate 時,ItemWriter 實現的簡單指導原則是在每次呼叫 write() 時執行 flush。這樣做可以可靠地跳過 Item,由 Spring Batch 在內部處理發生錯誤後對 ItemWriter 的呼叫的粒度。