讀取檔案
可以使用 FileReadingMessageSource
從檔案系統消費檔案。這是 MessageSource
的一個實現,它從檔案系統目錄建立訊息。以下示例展示瞭如何配置 FileReadingMessageSource
:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:directory="${input.directory}"/>
為了防止為某些檔案建立訊息,你可以提供一個 FileListFilter
。預設情況下,我們使用以下過濾器:
-
IgnoreHiddenFileListFilter
-
AcceptOnceFileListFilter
IgnoreHiddenFileListFilter
確保隱藏檔案不會被處理。請注意,隱藏的確切定義取決於系統。例如,在基於 UNIX 的系統上,以點字元開頭的檔案被認為是隱藏的。另一方面,Microsoft Windows 有一個專門的檔案屬性來指示隱藏檔案。
4.2 版本引入了 |
AcceptOnceFileListFilter
確保檔案只從目錄中被選取一次。
自 4.0 版本以來,此過濾器需要 自 4.1.5 版本以來,此過濾器具有一個新屬性( |
持久檔案列表過濾器現在有一個布林屬性 forRecursion
。將此屬性設定為 true
,也會設定 alwaysAcceptDirectories
,這意味著出站閘道器(ls
和 mget
)上的遞迴操作現在每次都會遍歷完整的目錄樹。這是為了解決目錄樹深處更改未被檢測到的問題。此外,forRecursion=true
會導致檔案的完整路徑用作元資料儲存的鍵;這解決了如果具有相同名稱的檔案出現在不同目錄中多次時過濾器無法正常工作的問題。重要提示:這意味著在持久元資料儲存中,頂級目錄下的檔案將找不到現有鍵。因此,該屬性預設為 false
;這可能會在未來的版本中更改。
以下示例配置了帶有過濾器的 FileReadingMessageSource
:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>
讀取檔案的一個常見問題是檔案可能在準備好之前就被檢測到(即,某些其他程序可能仍在寫入檔案)。預設的 AcceptOnceFileListFilter
不會阻止這種情況。在大多數情況下,如果檔案寫入程序在檔案準備好讀取後立即重新命名檔案,則可以防止這種情況。一個基於 filename-pattern
或 filename-regex
的過濾器(可能基於已知字尾)接受僅準備好的檔案,與預設的 AcceptOnceFileListFilter
組合使用,可以應對這種情況。CompositeFileListFilter
可以實現這種組合,如下例所示:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>
<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>
如果無法使用臨時名稱建立檔案並重命名為最終名稱,Spring Integration 提供了另一種選擇。4.2 版本添加了 LastModifiedFileListFilter
。可以為該過濾器配置一個 age
屬性,以便只有早於此值的檔案才能透過過濾器。預設年齡為 60 秒,但你應該選擇一個足夠大的年齡,以避免過早地選取檔案(例如,由於網路故障)。以下示例展示瞭如何配置 LastModifiedFileListFilter
:
<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>
從 4.3.7 版本開始,引入了 ChainFileListFilter
(CompositeFileListFilter
的擴充套件),以便在後續過濾器僅看到前一個過濾器的結果時允許使用場景。(使用 CompositeFileListFilter
時,所有過濾器都會看到所有檔案,但它只傳遞已透過所有過濾器的檔案)。需要新行為的一個示例是 LastModifiedFileListFilter
和 AcceptOnceFileListFilter
的組合,在這種情況下,我們不希望在一段時間過去之前接受檔案。使用 CompositeFileListFilter
時,由於 AcceptOnceFileListFilter
在第一次掃描時看到所有檔案,當另一個過濾器透過時,它以後不會再傳遞它。CompositeFileListFilter
方法在模式過濾器與自定義過濾器(用於查詢輔助檔案以指示檔案傳輸完成)結合使用時很有用。模式過濾器可能只傳遞主檔案(例如 something.txt
),但“完成”過濾器需要檢視(例如)something.done
是否存在。
假設我們有檔案 a.txt
、a.done
和 b.txt
。
模式過濾器僅傳遞 a.txt
和 b.txt
,而“完成”過濾器看到所有三個檔案並僅傳遞 a.txt
。組合過濾器的最終結果是僅釋放 a.txt
。
使用 ChainFileListFilter 時,如果鏈中的任何過濾器返回空列表,則不會呼叫剩餘的過濾器。 |
5.0 版本引入了 ExpressionFileListFilter
,用於對檔案執行 SpEL 表示式作為上下文評估根物件。為此,所有用於檔案處理(本地和遠端)的 XML 元件,以及現有的 filter
屬性,都提供了 filter-expression
選項,如下例所示:
<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>
5.0.5 版本引入了對拒絕檔案感興趣的 DiscardAwareFileListFilter
實現。為此,應透過 addDiscardCallback(Consumer<File>)
為此類過濾器實現提供回撥。在框架中,此功能與 LastModifiedFileListFilter
結合使用,來自 FileReadingMessageSource.WatchServiceDirectoryScanner
。與常規的 DirectoryScanner
不同,WatchService
根據目標檔案系統上的事件提供檔案進行處理。在輪詢帶有這些檔案的內部佇列時,LastModifiedFileListFilter
可能會因為它們相對於其配置的 age
太年輕而丟棄它們。因此,我們丟失了未來可能考慮的檔案。丟棄回撥鉤子允許我們將檔案保留在內部佇列中,以便在後續輪詢中針對 age
進行檢查。CompositeFileListFilter
也實現了 DiscardAwareFileListFilter
,並向其所有 DiscardAwareFileListFilter
委託填充丟棄回撥。
由於 CompositeFileListFilter 將檔案與所有委託進行匹配,因此對於同一檔案,discardCallback 可能會被呼叫多次。 |
從 5.1 版本開始,FileReadingMessageSource
不再檢查目錄是否存在,也不會建立它,直到其 start()
被呼叫(通常透過包裝 SourcePollingChannelAdapter
)。以前,例如從測試中引用目錄時,或者許可權稍後應用時,沒有簡單的方法可以防止作業系統許可權錯誤。
訊息頭
從 5.0 版本開始,FileReadingMessageSource
(除了作為輪詢的 File
的 payload
)還會為出站 Message
填充以下訊息頭:
-
FileHeaders.FILENAME
:要傳送檔案的File.getName()
。可用於後續的重新命名或複製邏輯。 -
FileHeaders.ORIGINAL_FILE
:File
物件本身。通常,當丟失原始的File
物件時,此訊息頭由框架元件(例如 分割器 或 轉換器)自動填充。然而,為了與任何其他自定義用例保持一致和方便,此訊息頭對於訪問原始檔案可能很有用。 -
FileHeaders.RELATIVE_PATH
:引入了一個新的訊息頭,用於表示檔案路徑相對於掃描根目錄的部分。當需要在其他位置恢復源目錄層次結構時,此訊息頭可能很有用。為此,可以配置DefaultFileNameGenerator
(參見 生成檔名)來使用此訊息頭。
目錄掃描和輪詢
FileReadingMessageSource
不會立即為目錄中的檔案生成訊息。它使用內部佇列儲存由 scanner
返回的“合格檔案”。scanEachPoll
選項用於確保內部佇列在每次輪詢時使用最新的輸入目錄內容進行重新整理。預設情況下(scanEachPoll = false
),FileReadingMessageSource
會在再次掃描目錄之前清空其佇列。這種預設行為對於減少對目錄中大量檔案的掃描特別有用。然而,在需要自定義排序的情況下,考慮將此標誌設定為 true
的影響非常重要。檔案處理的順序可能與預期不符。預設情況下,佇列中的檔案按照其自然順序(path
)處理。掃描新增的新檔案,即使佇列中已有檔案,也會被插入到適當的位置以保持該自然順序。要自定義順序,FileReadingMessageSource
可以接受一個 Comparator<File>
作為建構函式引數。它由內部的 PriorityBlockingQueue
使用,以根據業務需求重新排序其內容。因此,要按照特定順序處理檔案,你應該為 FileReadingMessageSource
提供一個比較器,而不是對自定義 DirectoryScanner
生成的列表進行排序。
5.0 版本引入了 RecursiveDirectoryScanner
來執行檔案樹遍歷。該實現基於 Files.walk(Path start, int maxDepth, FileVisitOption… options)
功能。根目錄(DirectoryScanner.listFiles(File)
)引數從結果中排除。所有其他子目錄的包含和排除都基於目標 FileListFilter
實現。例如,SimplePatternFileListFilter
預設過濾掉目錄。更多資訊請參見 AbstractDirectoryAwareFileListFilter
及其實現。
從 5.5 版本開始,Java DSL 的 FileInboundChannelAdapterSpec 有一個方便的 recursive(boolean) 選項,可以在目標 FileReadingMessageSource 中使用 RecursiveDirectoryScanner 代替預設的。 |
名稱空間支援
使用檔案特定的名稱空間可以簡化檔案讀取的配置。為此,請使用以下模板:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>
在該名稱空間內,你可以簡化 FileReadingMessageSource
並將其包裝在入站通道介面卡中,如下所示:
<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>
<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />
<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />
<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />
第一個通道介面卡示例依賴於預設的 FileListFilter
實現:
-
IgnoreHiddenFileListFilter
(不處理隱藏檔案) -
AcceptOnceFileListFilter
(防止重複)
因此,你也可以省略 prevent-duplicates
和 ignore-hidden
屬性,因為它們預設都為 true
。
Spring Integration 4.2 引入了 |
第二個通道介面卡示例使用自定義過濾器,第三個使用 filename-pattern
屬性新增一個基於 AntPathMatcher
的過濾器,第四個使用 filename-regex
屬性為 FileReadingMessageSource
新增一個基於正則表示式模式的過濾器。filename-pattern
和 filename-regex
屬性彼此與常規的 filter
引用屬性互斥。但是,你可以使用 filter
屬性引用一個 CompositeFileListFilter
例項,該例項可以組合任意數量的過濾器,包括一個或多個基於模式的過濾器,以滿足你的特定需求。
當多個程序從同一目錄讀取時,你可能希望鎖定檔案以防止它們被併發拾取。為此,可以使用 FileLocker
。有一個基於 java.nio
的實現可用,但也可以實現自己的鎖定方案。可以按如下方式注入 nio
locker:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>
你可以按如下方式配置自定義 locker:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
當檔案入站介面卡配置了 locker 時,它負責在允許接收檔案之前獲取鎖。它不承擔解鎖檔案的責任。如果你已經處理了檔案並保留了懸掛的鎖,就會出現記憶體洩漏。如果這是一個問題,你應該在適當的時候自己呼叫 FileLocker.unlock(File file) 。 |
當過濾和鎖定檔案不足時,你可能需要完全控制檔案的列出方式。為了實現這種型別的需求,可以使用 DirectoryScanner
的實現。此掃描器允許你精確確定每次輪詢時列出哪些檔案。這也是 Spring Integration 在內部用於將 FileListFilter
例項和 FileLocker
連線到 FileReadingMessageSource
的介面。你可以在 <int-file:inbound-channel-adapter/>
的 scanner
屬性上注入自定義的 DirectoryScanner
,如下例所示:
<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>
這樣做可以讓你完全自由地選擇排序、列出和鎖定策略。
同樣重要的是要理解過濾器(包括 patterns
、regex
、prevent-duplicates
等)和 locker
例項實際上是由 scanner
使用的。在介面卡上設定的任何這些屬性隨後都會注入到內部 scanner
中。對於外部 scanner
的情況,在 FileReadingMessageSource
上禁止所有過濾器和 locker 屬性。它們必須(如果需要)在該自定義 DirectoryScanner
上指定。換句話說,如果你將 scanner
注入到 FileReadingMessageSource
中,你應該在該 scanner
上提供 filter
和 locker
,而不是在 FileReadingMessageSource
上。
預設情況下,DefaultDirectoryScanner 使用一個 IgnoreHiddenFileListFilter 和一個 AcceptOnceFileListFilter 。為了防止它們的使用,你可以配置自己的過濾器(例如 AcceptAllFileListFilter ),甚至將其設定為 null 。 |
WatchServiceDirectoryScanner
FileReadingMessageSource.WatchServiceDirectoryScanner
依賴於當新檔案被新增到目錄時產生的檔案系統事件。在初始化期間,目錄會被註冊以生成事件。初始檔案列表也在初始化期間構建。遍歷目錄樹時,遇到的任何子目錄也會被註冊以生成事件。在第一次輪詢時,返回遍歷目錄得到的初始檔案列表。在隨後的輪詢中,返回來自新建立事件的檔案。如果添加了新的子目錄,其建立事件將被用來遍歷新的子樹以查詢現有檔案並註冊找到的任何新子目錄。
當程式的處理速度跟不上目錄修改事件的發生速度時,WatchKey 的內部事件佇列 queue 可能會出現問題。如果佇列大小超出限制,會發出一個 StandardWatchEventKinds.OVERFLOW 事件,表明可能丟失了一些檔案系統事件。在這種情況下,根目錄會被完全重新掃描。為了避免重複,請考慮使用合適的 FileListFilter (例如 AcceptOnceFileListFilter )或在處理完成後移除檔案。 |
可以透過 FileReadingMessageSource.use-watch-service
選項啟用 WatchServiceDirectoryScanner
,該選項與 scanner
選項互斥。對於提供的 directory
,會填充一個內部的 FileReadingMessageSource.WatchServiceDirectoryScanner
例項。
此外,現在 WatchService
的輪詢邏輯可以跟蹤 StandardWatchEventKinds.ENTRY_MODIFY
和 StandardWatchEventKinds.ENTRY_DELETE
事件。
如果你需要跟蹤現有檔案的修改以及新檔案的新增,你應該在 FileListFilter
中實現 ENTRY_MODIFY
事件的邏輯。否則,來自這些事件的檔案將被以相同的方式處理。
ResettableFileListFilter
的實現會接收到 ENTRY_DELETE
事件。因此,這些檔案會被提供給 remove()
操作。當啟用此事件時,像 AcceptOnceFileListFilter
這樣的過濾器會將檔案移除。結果是,如果出現同名檔案,它會透過過濾器並作為訊息傳送。
為此,引入了 watch-events
屬性(FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents)
)。(WatchEventType
是 FileReadingMessageSource
中的一個公共內部列舉。) 透過此選項,我們可以為新檔案使用一種下游流程邏輯,為修改的檔案使用另一種邏輯。以下示例展示瞭如何在同一目錄中為建立和修改事件配置不同的邏輯
值得一提的是,ENTRY_DELETE
事件與被監控目錄的子目錄的重新命名操作有關。更具體地說,與舊目錄名相關的 ENTRY_DELETE
事件發生在通知新(重新命名)目錄的 ENTRY_CREATE
事件之前。在某些作業系統(如 Windows)上,必須註冊 ENTRY_DELETE
事件來處理這種情況。否則,在檔案資源管理器中重新命名被監控的子目錄可能導致在該子目錄中無法檢測到新檔案。
<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>
<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->
從 6.1 版本開始,FileReadingMessageSource
暴露了兩個新的與 WatchService
相關的選項
-
watchMaxDepth
- 作為Files.walkFileTree(Path root, Set attributes, int maxDepth, FileVisitor visitor)
API 的引數; -
watchDirPredicate
- 一個Predicate<Path>
,用於測試掃描樹中的目錄是否應該被遍歷並註冊到WatchService
以及配置的監視事件種類。
限制記憶體消耗
您可以使用 HeadDirectoryScanner
來限制記憶體中保留的檔案數量。這在掃描大型目錄時非常有用。透過 XML 配置,可以透過在入站通道介面卡上設定 queue-size
屬性來啟用此功能。
在 4.2 版本之前,此設定與使用任何其他過濾器不相容。任何其他過濾器(包括 prevent-duplicates="true"
)都會覆蓋用於限制大小的過濾器。
使用 通常,在這種情況下,你應該移除已處理的檔案,而不是使用 |
使用 Java Configuration 進行配置
以下 Spring Boot 應用程式展示瞭如何使用 Java Configuration 配置出站介面卡的示例
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
}
使用 Java DSL 進行配置
以下 Spring Boot 應用程式展示瞭如何使用 Java DSL 配置出站介面卡的示例
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}
}
檔案“跟蹤”(tail)
另一個常見的用例是從檔案的末尾(或尾部)獲取“行”,並在新增新行時捕獲它們。提供了兩種實現。第一種是 OSDelegatingFileTailingMessageProducer
,它使用原生 tail
命令(在支援此命令的作業系統上)。這通常是這些平臺上最有效的實現。對於沒有 tail
命令的作業系統,第二種實現 ApacheCommonsFileTailingMessageProducer
使用 Apache commons-io
庫的 Tailer
類。
在這兩種情況下,檔案系統事件(例如檔案不可用和其他事件)都透過正常的 Spring 事件釋出機制作為 ApplicationEvent
例項釋出。此類事件的示例包括以下內容
[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]
上述示例中顯示的事件序列可能發生,例如,當檔案被輪轉時。
從 5.0 版本開始,當在 idleEventInterval
期間檔案沒有資料時,會發出 FileTailingIdleEvent
。以下示例顯示了此類事件的樣子
[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
並非所有支援 tail 命令的平臺都提供這些狀態訊息。 |
從這些端點發出的訊息包含以下頭部資訊
-
FileHeaders.ORIGINAL_FILE
:File
物件 -
FileHeaders.FILENAME
: 檔名 (File.getName()
)
在 5.0 版本之前,FileHeaders.FILENAME 頭部包含檔案的絕對路徑的字串表示。現在可以透過對原始檔案頭部呼叫 getAbsolutePath() 來獲取該字串表示。 |
以下示例建立一個具有預設選項('-F -n 0',表示從當前檔案末尾跟蹤檔案)的原生介面卡。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>
以下示例建立一個具有 '-F -n +0' 選項(表示跟蹤檔案,併發出所有現有行)的原生介面卡。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>
如果 tail
命令失敗(在某些平臺上,檔案丟失會導致 tail
失敗,即使指定了 `-F`),該命令會每 10 秒重試一次。
預設情況下,原生介面卡從標準輸出捕獲內容並將其作為訊息傳送。它們也從標準錯誤捕獲資訊以觸發事件。從 4.3.6 版本開始,您可以透過將 enable-status-reader
設定為 false
來丟棄標準錯誤事件,如下例所示
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>
在以下示例中,IdleEventInterval
設定為 5000
,這意味著如果在五秒內沒有寫入任何行,則每五秒觸發一次 FileTailingIdleEvent
。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>
這在需要停止介面卡時非常有用。
以下示例建立了一個 Apache commons-io
Tailer
介面卡,它每兩秒檢查一次檔案是否有新行,並每十秒檢查一次丟失檔案是否存在。
<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false" (1)
reopen="true" (2)
file-delay="10000"/>
1 | 檔案從開頭 (end="false" ) 而非末尾(預設值)開始跟蹤。 |
2 | 檔案對每個塊進行重新開啟(預設是保持檔案開啟)。 |
指定 delay 、end 或 reopen 屬性會強制使用 Apache commons-io 介面卡,並使 native-options 屬性不可用。 |