讀取檔案
FileReadingMessageSource 可用於從檔案系統消費檔案。它是 MessageSource 的一個實現,用於從檔案系統目錄建立訊息。以下示例演示如何配置 FileReadingMessageSource
<bean id="pollableFileSource"
class="org.springframework.integration.file.inbound.FileReadingMessageSource"
p:directory="${input.directory}"/>
為了防止為某些檔案建立訊息,您可以提供一個 FileListFilter。預設情況下,我們使用以下過濾器
-
IgnoreHiddenFileListFilter -
AcceptOnceFileListFilter
IgnoreHiddenFileListFilter 確保隱藏檔案不會被處理。請注意,隱藏檔案的確切定義是系統相關的。例如,在基於 UNIX 的系統中,以句點字元開頭的檔案被認為是隱藏檔案。另一方面,Microsoft Windows 有一個專用的檔案屬性來指示隱藏檔案。
|
版本 4.2 引入了 |
AcceptOnceFileListFilter 確保檔案只從目錄中拾取一次。
|
自版本 4.0 以來,此過濾器需要一個 自版本 4.1.5 以來,此過濾器有一個新屬性( |
持久檔案列表過濾器現在有一個布林屬性 forRecursion。將此屬性設定為 true,還會設定 alwaysAcceptDirectories,這意味著出站閘道器上的遞迴操作(ls 和 mget)現在每次都會遍歷完整的目錄樹。這是為了解決目錄樹深處的變化未被檢測到的問題。此外,forRecursion=true 會導致檔案的完整路徑用作元資料儲存鍵;這解決了如果具有相同名稱的檔案在不同目錄中多次出現時過濾器無法正常工作的問題。重要提示:這意味著在頂層目錄下的檔案中,持久元資料儲存中的現有鍵將找不到。因此,此屬性預設為 false;這可能會在未來的版本中更改。
以下示例配置了一個帶過濾器的 FileReadingMessageSource
<bean id="pollableFileSource"
class="org.springframework.integration.file.inbound.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>
讀取檔案的一個常見問題是,檔案可能在準備好之前就被檢測到(也就是說,其他程序可能仍在寫入檔案)。預設的 AcceptOnceFileListFilter 並不能阻止這種情況。在大多數情況下,如果檔案寫入程序在檔案準備好讀取後立即重新命名每個檔案,則可以防止這種情況。一個只接受已準備好檔案(可能基於已知字尾)的 filename-pattern 或 filename-regex 過濾器,與預設的 AcceptOnceFileListFilter 組合使用,可以處理這種情況。CompositeFileListFilter 啟用這種組合,如下例所示
<bean id="pollableFileSource"
class="org.springframework.integration.file.inbound.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>
<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>
如果無法使用臨時名稱建立檔案並重命名為最終名稱,Spring Integration 提供另一種替代方案。版本 4.2 添加了 LastModifiedFileListFilter。此過濾器可以配置一個 age 屬性,以便只有早於此值的檔案才能透過過濾器。年齡預設為 60 秒,但您應該選擇一個足夠大的年齡,以避免過早地拾取檔案(例如,由於網路故障)。以下示例演示如何配置 LastModifiedFileListFilter
<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>
從版本 4.3.7 開始,引入了一個 ChainFileListFilter(CompositeFileListFilter 的擴充套件),以允許後續過濾器只檢視上一個過濾器結果的場景。(使用 CompositeFileListFilter,所有過濾器都會看到所有檔案,但它只通過所有過濾器都透過的檔案)。新行為所需的示例是 LastModifiedFileListFilter 和 AcceptOnceFileListFilter 的組合,當我們不希望在經過一段時間後才接受檔案時。使用 CompositeFileListFilter,由於 AcceptOnceFileListFilter 在第一次傳遞時看到所有檔案,因此當其他過濾器傳遞時,它不會在以後傳遞。當模式過濾器與查詢輔助檔案以指示檔案傳輸完成的自定義過濾器組合時,CompositeFileListFilter 方法很有用。模式過濾器可能只傳遞主檔案(例如 something.txt),但“完成”過濾器需要檢視(例如)something.done 是否存在。
假設我們有檔案 a.txt、a.done 和 b.txt。
模式過濾器只通過 a.txt 和 b.txt,而“完成”過濾器看到所有三個檔案,只通過 a.txt。組合過濾器的最終結果是隻釋放 a.txt。
使用 ChainFileListFilter,如果鏈中的任何過濾器返回空列表,則不呼叫其餘過濾器。 |
版本 5.0 引入了 ExpressionFileListFilter,用於針對檔案作為上下文評估根物件執行 SpEL 表示式。為此,所有用於檔案處理(本地和遠端)的 XML 元件,以及現有的 filter 屬性,都提供了 filter-expression 選項,如下例所示
<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>
版本 5.0.5 引入了 DiscardAwareFileListFilter 實現,它們對被拒絕的檔案感興趣。為此,此類過濾器實現應透過 addDiscardCallback(Consumer<File>) 提供回撥。在框架中,此功能用於 FileReadingMessageSource.WatchServiceDirectoryScanner,結合 LastModifiedFileListFilter。與常規的 DirectoryScanner 不同,WatchService 根據目標檔案系統上的事件提供檔案進行處理。在輪詢這些檔案的內部佇列時,LastModifiedFileListFilter 可能會因為它們相對於其配置的 age 太年輕而丟棄它們。因此,我們丟失了檔案以供將來可能的考慮。丟棄回撥鉤子允許我們將檔案保留在內部佇列中,以便在後續輪詢中可以對其進行 age 檢查。CompositeFileListFilter 還實現了一個 DiscardAwareFileListFilter,並將其丟棄回撥填充到所有 DiscardAwareFileListFilter 委託中。
由於 CompositeFileListFilter 將檔案與所有委託進行匹配,因此 discardCallback 可能會為同一個檔案呼叫多次。 |
從版本 5.1 開始,FileReadingMessageSource 不再檢查目錄是否存在,也不再建立它,直到呼叫其 start()(通常透過包裝 SourcePollingChannelAdapter)。以前,當引用目錄時(例如從測試中,或當權限稍後應用時),沒有簡單的方法可以防止作業系統許可權錯誤。
與 LastModifiedFileListFilter 相反,從版本 6.5 開始引入了 RecentFileListFilter 策略。它是 AbstractRecentFileListFilter 的本地檔案系統擴充套件。預設情況下,它接受不早於 1 天的檔案。請參閱其其他實現以獲取相應的遠端檔案協議。
訊息頭
從版本 5.0 開始,FileReadingMessageSource(除了作為輪詢 File 的 payload)還會將以下標頭填充到出站 Message
-
FileHeaders.FILENAME: 要傳送的檔案的File.getName()。可用於後續的重新命名或複製邏輯。 -
FileHeaders.ORIGINAL_FILE:File物件本身。通常,此標頭由框架元件(如 拆分器 或 轉換器)在丟失原始File物件時自動填充。但是,為了與任何其他自定義用例保持一致和方便,此標頭對於訪問原始檔案可能很有用。 -
FileHeaders.RELATIVE_PATH: 引入了一個新標頭,用於表示檔案路徑中相對於掃描根目錄的部分。當需要將源目錄層次結構恢復到其他位置時,此標頭可能很有用。為此,可以配置DefaultFileNameGenerator(參見 "`生成檔名)以使用此標頭。
目錄掃描和輪詢
FileReadingMessageSource 不會立即為目錄中的檔案生成訊息。它使用內部佇列儲存由 scanner 返回的“符合條件的檔案”。scanEachPoll 選項用於確保內部佇列在每次輪詢時都使用最新的輸入目錄內容進行重新整理。預設情況下(scanEachPoll = false),FileReadingMessageSource 在再次掃描目錄之前清空其佇列。這種預設行為在減少對目錄中大量檔案的掃描時特別有用。但是,在需要自定義排序的情況下,考慮將此標誌設定為 true 的影響很重要。檔案處理的順序可能不像預期的那樣。預設情況下,佇列中的檔案按其自然(path)順序處理。即使佇列中已有檔案,透過掃描新增的新檔案也會插入到適當的位置以保持自然順序。要自定義順序,FileReadingMessageSource 可以接受 Comparator<File> 作為建構函式引數。它由內部(PriorityBlockingQueue)使用,根據業務需求重新排序其內容。因此,要按特定順序處理檔案,您應該向 FileReadingMessageSource 提供一個比較器,而不是對自定義 DirectoryScanner 生成的列表進行排序。
版本 5.0 引入了 RecursiveDirectoryScanner 以執行檔案樹遍歷。該實現基於 Files.walk(Path start, int maxDepth, FileVisitOption… options) 功能。根目錄(DirectoryScanner.listFiles(File))引數從結果中排除。所有其他子目錄的包含和排除都基於目標 FileListFilter 實現。例如,SimplePatternFileListFilter 預設情況下會過濾掉目錄。有關詳細資訊,請參見 AbstractDirectoryAwareFileListFilter 及其實現。
從版本 5.5 開始,Java DSL 的 FileInboundChannelAdapterSpec 有一個方便的 recursive(boolean) 選項,可以在目標 FileReadingMessageSource 中使用 RecursiveDirectoryScanner 而不是預設的掃描器。 |
從版本 7.0 開始,FileReadingMessageSource 可以使用 SpEL 表示式配置其 directory 屬性。每次請求新掃描時都會評估此表示式。這種邏輯在輸入目錄在掃描完成後應更改或基於時間戳輪換的場景中可能很有用。
名稱空間支援
透過使用檔案特定的名稱空間,可以簡化檔案讀取的配置。為此,請使用以下模板
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>
在此名稱空間中,您可以將 FileReadingMessageSource 簡化並將其封裝在入站通道介面卡中,如下所示
<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>
<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />
<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />
<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />
第一個通道介面卡示例依賴於預設的 FileListFilter 實現
-
IgnoreHiddenFileListFilter(不處理隱藏檔案) -
AcceptOnceFileListFilter(防止重複)
因此,您也可以省略 prevent-duplicates 和 ignore-hidden 屬性,因為它們預設為 true。
|
Spring Integration 4.2 引入了 |
第二個通道介面卡示例使用自定義過濾器,第三個使用 filename-pattern 屬性新增基於 AntPathMatcher 的過濾器,第四個使用 filename-regex 屬性向 FileReadingMessageSource 新增基於正則表示式模式的過濾器。filename-pattern 和 filename-regex 屬性與常規的 filter 引用屬性互斥。但是,您可以使用 filter 屬性引用 CompositeFileListFilter 例項,該例項結合了任意數量的過濾器,包括一個或多個基於模式的過濾器,以滿足您的特定需求。
當多個程序從同一個目錄讀取時,您可能需要鎖定檔案以防止它們併發地被拾取。為此,您可以使用 FileLocker。有一個基於 java.nio 的實現可用,但也可以實現自己的鎖定方案。nio 鎖定器可以按如下方式注入
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>
您可以按如下方式配置自定義鎖定器
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
當檔案入站介面卡配置了鎖定器時,它負責在檔案被允許接收之前獲取鎖定。它不承擔解鎖檔案的責任。如果您已經處理了檔案並讓鎖定掛起,則會出現記憶體洩漏。如果這是一個問題,您應該在適當的時候自己呼叫 FileLocker.unlock(File file)。 |
當過濾和鎖定檔案不足以滿足需求時,您可能需要完全控制檔案列表的方式。要實現此類需求,您可以使用 DirectoryScanner 的實現。此掃描器允許您精確確定每次輪詢中列出的檔案。這也是 Spring Integration 內部用於將 FileListFilter 例項和 FileLocker 連線到 FileReadingMessageSource 的介面。您可以將自定義 DirectoryScanner 注入到 <int-file:inbound-channel-adapter/> 的 scanner 屬性上,如下例所示
<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>
這樣做讓您可以完全自由地選擇排序、列表和鎖定策略。
同樣重要的是要理解,過濾器(包括 patterns、regex、prevent-duplicates 等)和 locker 例項實際上是由 scanner 使用的。介面卡上設定的任何這些屬性隨後都會注入到內部 scanner 中。對於外部 scanner 的情況,FileReadingMessageSource 上禁止使用所有過濾器和鎖定器屬性。它們必須在自定義 DirectoryScanner 上指定(如果需要)。換句話說,如果您將 scanner 注入到 FileReadingMessageSource 中,您應該在該 scanner 上提供 filter 和 locker,而不是在 FileReadingMessageSource 上。
預設情況下,DefaultDirectoryScanner 使用 IgnoreHiddenFileListFilter 和 AcceptOnceFileListFilter。要阻止它們的使用,您可以配置自己的過濾器(例如 AcceptAllFileListFilter),甚至將其設定為 null。 |
WatchServiceDirectoryScanner
FileReadingMessageSource.WatchServiceDirectoryScanner 在新檔案新增到目錄時依賴檔案系統事件。在初始化期間,目錄被註冊以生成事件。初始檔案列表也在初始化期間構建。在遍歷目錄樹時,遇到的任何子目錄也會被註冊以生成事件。在第一次輪詢中,返回遍歷目錄的初始檔案列表。在後續輪詢中,返回來自新建立事件的檔案。如果添加了新的子目錄,其建立事件將用於遍歷新的子樹以查詢現有檔案並註冊任何新發現的子目錄。
當程式的內部事件 queue 沒有像目錄修改事件發生那樣快速地被排出時,WatchKey 會出現一個問題。如果佇列大小超過,會發出 StandardWatchEventKinds.OVERFLOW 以指示可能丟失了一些檔案系統事件。在這種情況下,根目錄會被完全重新掃描。為了避免重複,請考慮使用適當的 FileListFilter(例如 AcceptOnceFileListFilter)或在處理完成後刪除檔案。 |
可以透過 FileReadingMessageSource.use-watch-service 選項啟用 WatchServiceDirectoryScanner,該選項與 scanner 選項互斥。將為提供的 directory 填充一個內部 FileReadingMessageSource.WatchServiceDirectoryScanner 例項。
此外,現在 WatchService 輪詢邏輯可以跟蹤 StandardWatchEventKinds.ENTRY_MODIFY 和 StandardWatchEventKinds.ENTRY_DELETE。
如果您需要跟蹤現有檔案以及新檔案的修改,則應在 FileListFilter 中實現 ENTRY_MODIFY 事件邏輯。否則,來自這些事件的檔案將以相同的方式處理。
ResettableFileListFilter 實現會捕獲 ENTRY_DELETE 事件。因此,它們的檔案將用於 remove() 操作。當啟用此事件時,諸如 AcceptOnceFileListFilter 等過濾器將刪除檔案。因此,如果出現同名檔案,它會透過過濾器並作為訊息傳送。
為此,引入了 watch-events 屬性(FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents))。(WatchEventType 是 FileReadingMessageSource 中的一個公共內部列舉。) 透過此選項,我們可以對新檔案使用一個下游流邏輯,對修改過的檔案使用另一個邏輯。以下示例演示如何配置不同的邏輯以在同一目錄中建立和修改事件
值得一提的是,ENTRY_DELETE 事件與被監視目錄的子目錄的重新命名操作有關。更具體地說,與舊目錄名相關的 ENTRY_DELETE 事件先於通知新(重新命名)目錄的 ENTRY_CREATE 事件。在某些作業系統(如 Windows)上,必須註冊 ENTRY_DELETE 事件才能處理這種情況。否則,在檔案資源管理器中重新命名被監視的子目錄可能會導致在該子目錄中檢測不到新檔案。
<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>
<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->
從版本 6.1 開始,FileReadingMessageSource 暴露了兩個新的 WatchService 相關選項
-
watchMaxDepth-Files.walkFileTree(Path root, Set attributes, int maxDepth, FileVisitor visitor)API 的引數; -
watchDirPredicate- 一個Predicate<Path>,用於測試掃描樹中的目錄是否應被遍歷並註冊到WatchService和配置的監視事件型別。
限制記憶體消耗
您可以使用 HeadDirectoryScanner 限制記憶體中保留的檔案數量。這在掃描大型目錄時非常有用。透過 XML 配置,可以透過在入站通道介面卡上設定 queue-size 屬性來啟用此功能。
在版本 4.2 之前,此設定與任何其他過濾器的使用不相容。任何其他過濾器(包括 prevent-duplicates="true")都會覆蓋用於限制大小的過濾器。
|
使用 通常,在這種情況下,您應該刪除已處理的檔案,而不是使用 |
使用 Java 配置
以下 Spring Boot 應用程式展示瞭如何使用 Java 配置配置出站介面卡的示例
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
}
使用 Java DSL 進行配置
以下 Spring Boot 應用程式演示瞭如何使用 Java DSL 配置出站介面卡
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}
}
“跟蹤”檔案
另一個流行的用例是從檔案的末尾(或尾部)獲取“行”,並在新增新行時捕獲它們。提供了兩種實現。第一種是 OSDelegatingFileTailingMessageProducer,它使用本機 tail 命令(在具有該命令的作業系統上)。這通常是這些平臺上最有效的實現。對於沒有 tail 命令的作業系統,第二種實現 ApacheCommonsFileTailingMessageProducer 使用 Apache commons-io 的 Tailer 類。
在這兩種情況下,檔案系統事件,例如檔案不可用和其他事件,都透過正常的 Spring 事件釋出機制釋出為 ApplicationEvent 例項。此類事件的示例包括以下內容
[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]
例如,在檔案輪換時,可能會發生前面示例中所示的事件序列。
從版本 5.0 開始,當檔案在 idleEventInterval 期間沒有資料時,會發出 FileTailingIdleEvent。以下示例展示了此類事件的樣子
[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
並非所有支援 tail 命令的平臺都提供這些狀態訊息。 |
從這些端點發出的訊息具有以下標頭
-
FileHeaders.ORIGINAL_FILE:File物件 -
FileHeaders.FILENAME:檔名(File.getName())
在 5.0 版本之前,FileHeaders.FILENAME 標頭包含檔案絕對路徑的字串表示。您現在可以透過呼叫原始檔案標頭上的 getAbsolutePath() 來獲取該字串表示。 |
以下示例建立了一個具有預設選項('-F -n 0',表示從當前末尾跟蹤檔名)的本機介面卡。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>
以下示例建立了一個帶有 '-F -n +0' 選項(表示跟蹤檔名,發出所有現有行)的本機介面卡。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>
如果 tail 命令失敗(在某些平臺上,檔案缺失會導致 tail 失敗,即使指定了 -F),該命令將每 10 秒重試一次。
預設情況下,本機介面卡從標準輸出捕獲內容並將其作為訊息傳送。它們還從標準錯誤捕獲內容以引發事件。從版本 4.3.6 開始,您可以透過將 enable-status-reader 設定為 false 來丟棄標準錯誤事件,如下例所示
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>
在以下示例中,IdleEventInterval 設定為 5000,這意味著,如果五秒鐘內沒有寫入行,則每五秒觸發一次 FileTailingIdleEvent
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>
這在需要停止介面卡時很有用。
以下示例建立了一個 Apache commons-io Tailer 介面卡,該介面卡每兩秒檢查一次檔案中的新行,並每十秒檢查一次缺失檔案的存在
<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false" (1)
reopen="true" (2)
file-delay="10000"/>
| 1 | 檔案從開頭(end="false")而不是結尾(這是預設設定)進行跟蹤。 |
| 2 | 檔案為每個塊重新開啟(預設是保持檔案開啟)。 |
指定 delay、end 或 reopen 屬性會強制使用 Apache commons-io 介面卡,並使 native-options 屬性不可用。 |