檔案分流器
FileSplitter 在 4.1.2 版本中新增,其名稱空間支援在 4.2 版本中新增。FileSplitter 基於 BufferedReader.readLine() 將文字檔案拆分為單獨的行。預設情況下,拆分器使用 Iterator 在從檔案讀取時一次發射一行檔案。將 iterator 屬性設定為 false 會使其在發射訊息之前將所有行讀入記憶體。這樣做的一個用例可能是您想在傳送任何包含行的訊息之前檢測檔案上的 I/O 錯誤。然而,這隻適用於相對較短的檔案。
入站有效載荷可以是 File、String(檔案路徑)、InputStream 或 Reader。其他有效載荷型別不變地發射。
以下列表顯示了配置 FileSplitter 的可能方式
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@SpringBootApplication
public class FileSplitterApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileSplitterApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileSplitterFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(tmpDir.getRoot())
.filter(new ChainFileListFilter<File>()
.addFilter(new AcceptOnceFileListFilter<>())
.addFilter(new ExpressionFileListFilter<>(
new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
.split(Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.channel(c -> c.queue("fileSplittingResultChannel"))
.get();
}
}
@Bean
fun fileSplitterFlow() =
integrationFlow(
Files.inboundAdapter(tmpDir.getRoot())
.filter(
ChainFileListFilter<File?>()
.addFilter(AcceptOnceFileListFilter())
.addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
)
) {
split(
Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true)
)
channel { queue("fileSplittingResultChannel") }
}
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
FileSplitter splitter = new FileSplitter(true, true);
splitter.setApplySequence(true);
splitter.setOutputChannel(outputChannel);
return splitter;
}
<int-file:splitter id="splitter" (1)
iterator="" (2)
markers="" (3)
markers-json="" (4)
apply-sequence="" (5)
requires-reply="" (6)
charset="" (7)
first-line-as-header="" (8)
input-channel="" (9)
output-channel="" (10)
send-timeout="" (11)
auto-startup="" (12)
order="" (13)
phase="" /> (14)
| 1 | 拆分器的 bean 名稱。 |
| 2 | 設定為 true(預設值)以使用迭代器,或設定為 false 以在傳送行之前將檔案載入到記憶體中。 |
| 3 | 設定為 true 可在檔案資料之前和之後發射檔案開始和檔案結束標記訊息。標記是帶有 FileSplitter.FileMarker 有效載荷(mark 屬性中帶有 START 和 END 值)的訊息。當在下游流中順序處理檔案且某些行被過濾時,您可能會使用標記。它們使下游處理能夠知道檔案何時已完全處理。此外,這些訊息會新增一個包含 START 或 END 的 file_marker 標頭。END 標記包含行數。如果檔案為空,則只發射 START 和 END 標記,其中 lineCount 為 0。預設值為 false。當為 true 時,apply-sequence 預設為 false。另請參見 markers-json(下一個屬性)。 |
| 4 | 當 markers 為 true 時,將其設定為 true 以將 FileMarker 物件轉換為 JSON 字串。(底層使用 SimpleJsonSerializer)。 |
| 5 | 設定為 false 以停用在訊息中包含 sequenceSize 和 sequenceNumber 標頭。預設值為 true,除非 markers 為 true。當為 true 且 markers 為 true 時,標記包含在排序中。當為 true 且 iterator 為 true 時,sequenceSize 標頭設定為 0,因為大小未知。 |
| 6 | 設定為 true 可在檔案中沒有行時丟擲 RequiresReplyException。預設值為 false。 |
| 7 | 設定將文字資料讀入 String 有效載荷時使用的字元集名稱。預設值為平臺字元集。 |
| 8 | 第一行的標頭名稱,作為剩餘行發射的訊息中的標頭。從版本 5.0 開始。 |
| 9 | 設定用於向拆分器傳送訊息的輸入通道。 |
| 10 | 設定訊息傳送到的輸出通道。 |
| 11 | 設定傳送超時。僅當 output-channel 可以阻塞時(例如已滿的 QueueChannel)才適用。 |
| 12 | 設定為 false 以在上下文重新整理時停用自動啟動拆分器。預設值為 true。 |
| 13 | 如果 input-channel 是 <publish-subscribe-channel/>,則設定此端點的順序。 |
| 14 | 設定拆分器的啟動階段(當 auto-startup 為 true 時使用)。 |
FileSplitter 還將任何基於文字的 InputStream 拆分為行。從版本 4.3 開始,當與 FTP 或 SFTP 流式入站通道介面卡或使用 stream 選項檢索檔案的 FTP 或 SFTP 出站閘道器結合使用時,當檔案完全消耗完畢後,拆分器會自動關閉支援流的會話。有關這些功能的更多資訊,請參見 FTP 流式入站通道介面卡 和 SFTP 流式入站通道介面卡 以及 FTP 出站閘道器 和 SFTP 出站閘道器。
當使用 Java 配置時,可以使用額外的建構函式,示例如下
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)
當 markersJson 為 true 時,標記表示為 JSON 字串(使用 SimpleJsonSerializer)。
版本 5.0 引入了 firstLineAsHeader 選項,用於指定內容的第一行是標頭(例如 CSV 檔案中的列名)。傳遞給此屬性的引數是標頭名稱,第一行作為該標頭名稱下的標頭包含在為剩餘行發射的訊息中。此行不包含在序列標頭中(如果 applySequence 為 true),也不包含在與 FileMarker.END 關聯的 lineCount 中。注意:從版本 5.5 開始,lineCount 也作為 FileHeaders.LINE_COUNT 包含在 FileMarker.END 訊息的標頭中,因為 FileMarker 可以序列化為 JSON。如果檔案只包含標頭行,則該檔案被視為空,因此,在拆分過程中只發射 FileMarker 例項(如果啟用了標記 — 否則不發射任何訊息)。預設情況下(如果沒有設定標頭名稱),第一行被視為資料併成為第一個發射訊息的有效載荷。
如果您需要更復雜的邏輯來從檔案內容中提取標頭(不是第一行,不是整行內容,不是某個特定標頭等),請考慮在 FileSplitter 之前使用 標頭豐富器。請注意,已移動到標頭的行可能會在正常內容處理的下游被過濾。
冪等下游處理拆分檔案
當 apply-sequence 為 true 時,拆分器在 SEQUENCE_NUMBER 標頭中新增行號(當 markers 為 true 時,標記也算作行)。行號可與 冪等接收器 結合使用,以避免在重新啟動後重新處理行。
例如:
@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}