檔案分割器
FileSplitter
在 4.1.2 版本中新增,其名稱空間支援在 4.2 版本中新增。FileSplitter
根據 BufferedReader.readLine()
將文字檔案分割成單獨的行。預設情況下,分割器使用 Iterator
在從檔案中讀取行時逐行發出。將 iterator
屬性設定為 false
會使其在發出訊息之前將所有行讀入記憶體。這樣做的一種用例是,您可能想在傳送任何包含行的訊息之前檢測檔案上的 I/O 錯誤。然而,這隻適用於相對較短的檔案。
入站載荷可以是 File
、String
(檔案路徑)、InputStream
或 Reader
。其他載荷型別將原樣發出。
以下列表顯示了配置 FileSplitter
的可能方式
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@SpringBootApplication
public class FileSplitterApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileSplitterApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileSplitterFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(tmpDir.getRoot())
.filter(new ChainFileListFilter<File>()
.addFilter(new AcceptOnceFileListFilter<>())
.addFilter(new ExpressionFileListFilter<>(
new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
.split(Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.channel(c -> c.queue("fileSplittingResultChannel"))
.get();
}
}
@Bean
fun fileSplitterFlow() =
integrationFlow(
Files.inboundAdapter(tmpDir.getRoot())
.filter(
ChainFileListFilter<File?>()
.addFilter(AcceptOnceFileListFilter())
.addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
)
) {
split(
Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true)
)
channel { queue("fileSplittingResultChannel") }
}
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
FileSplitter splitter = new FileSplitter(true, true);
splitter.setApplySequence(true);
splitter.setOutputChannel(outputChannel);
return splitter;
}
<int-file:splitter id="splitter" (1)
iterator="" (2)
markers="" (3)
markers-json="" (4)
apply-sequence="" (5)
requires-reply="" (6)
charset="" (7)
first-line-as-header="" (8)
input-channel="" (9)
output-channel="" (10)
send-timeout="" (11)
auto-startup="" (12)
order="" (13)
phase="" /> (14)
1 | 分割器的 bean 名稱。 |
2 | 設定為 true (預設值)以使用迭代器,或者設定為 false 以在傳送行之前將檔案載入到記憶體中。 |
3 | 設定為 true 以在檔案資料之前和之後發出檔案開始和檔案結束標記訊息。標記是載荷為 FileSplitter.FileMarker 的訊息(其 mark 屬性包含 START 和 END 值)。當您在下游流中按順序處理檔案,並且某些行被過濾掉時,可以使用標記。它們使下游處理能夠知道檔案何時已完全處理完畢。此外,一個包含 START 或 END 的 file_marker 頭會被新增到這些訊息中。END 標記包含行計數。如果檔案為空,則只發出 START 和 END 標記,其 lineCount 為 0 。預設值為 false 。當設定為 true 時,apply-sequence 預設為 false 。另請參閱 markers-json (下一個屬性)。 |
4 | 當 markers 為 true 時,將其設定為 true 可將 FileMarker 物件轉換為 JSON 字串。(底層使用 SimpleJsonSerializer )。 |
5 | 設定為 false 可停用在訊息中包含 sequenceSize 和 sequenceNumber 頭。預設值為 true ,除非 markers 為 true 。當為 true 且 markers 為 true 時,標記會包含在序列中。當為 true 且 iterator 為 true 時,sequenceSize 頭會被設定為 0 ,因為大小未知。 |
6 | 設定為 true 可在檔案沒有行時丟擲 RequiresReplyException 。預設值為 false 。 |
7 | 設定讀取文字資料到 String 載荷時使用的字元集名稱。預設值是平臺字元集。 |
8 | 第一行作為頭資訊包含在為剩餘行發出的訊息中的頭名稱。自 5.0 版本起。 |
9 | 設定用於向分割器傳送訊息的輸入通道。 |
10 | 設定訊息傳送到的輸出通道。 |
11 | 設定傳送超時。僅當 output-channel 可能阻塞時適用,例如滿的 QueueChannel 。 |
12 | 設定為 false 可停用在上下文重新整理時自動啟動分割器。預設值為 true 。 |
13 | 如果 input-channel 是 <publish-subscribe-channel/> ,則設定此端點的順序。 |
14 | 設定分割器的啟動階段(當 auto-startup 為 true 時使用)。 |
FileSplitter
還可以將任何基於文字的 InputStream
分割成行。從 4.3 版本開始,當與 FTP 或 SFTP 流式入站通道介面卡,或使用 stream
選項檢索檔案的 FTP 或 SFTP 出站閘道器結合使用時,當檔案完全被消費時,分割器會自動關閉支援該流的會話。有關這些功能的更多資訊,請參見 FTP 流式入站通道介面卡 和 SFTP 流式入站通道介面卡,以及 FTP 出站閘道器 和 SFTP 出站閘道器。
使用 Java 配置時,還有一個額外的建構函式可用,如下例所示
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)
當 markersJson
為 true 時,標記表示為 JSON 字串(使用 SimpleJsonSerializer
)。
5.0 版本引入了 firstLineAsHeader
選項,用於指定內容的第一行是頭(例如 CSV 檔案中的列名)。傳遞給此屬性的引數是頭名稱,在該名稱下,第一行作為頭資訊包含在為剩餘行發出的訊息中。此行不包含在序列頭中(如果 applySequence
為 true),也不包含在與 FileMarker.END
關聯的 lineCount
中。注意:從 5.5 版本開始,lineCount
也作為 FileHeaders.LINE_COUNT
包含在 FileMarker.END
訊息的頭中,因為 FileMarker
可以被序列化為 JSON。如果檔案只包含頭行,則該檔案被視為空檔案,因此在分割期間只發出 FileMarker
例項(如果啟用了標記 — 否則,不發出任何訊息)。預設情況下(如果未設定頭名稱),第一行被視為資料,併成為第一個發出的訊息的載荷。
如果您需要更復雜的邏輯來從檔案內容中提取頭(不是第一行,不是行的全部內容,不是某個特定的頭等等),可以考慮在 FileSplitter
之前使用 頭增強器。請注意,已移至頭的行可能會在下游被過濾掉,而不參與正常的內容處理。
分割檔案後的冪等下游處理
當 apply-sequence
為 true 時,分割器會在 SEQUENCE_NUMBER
頭中新增行號(當 markers
為 true 時,標記被計為行)。行號可以與 冪等接收器 一起使用,以避免在重新啟動後重新處理行。
例如
@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}