FTP入站通道介面卡
FTP入站通道介面卡是一種特殊的監聽器,它連線到FTP伺服器並監聽遠端目錄事件(例如,建立了新檔案),並在此時啟動檔案傳輸。以下示例展示瞭如何配置一個inbound-channel-adapter
<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
auto-create-local-directory="true"
delete-remote-files="true"
filename-pattern="*.txt"
remote-directory="some/remote/path"
remote-file-separator="/"
preserve-timestamp="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
local-directory=".">
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
如上所示配置,您可以使用inbound-channel-adapter
元素配置FTP入站通道介面卡,同時提供各種屬性的值,例如local-directory
、filename-pattern
(基於簡單的模式匹配,而非正則表示式),以及對session-factory
的引用。
預設情況下,傳輸的檔案與原始檔案同名。如果您想覆蓋此行為,可以設定local-filename-generator-expression
屬性,該屬性允許您提供一個SpEL表示式來生成本地檔案的名稱。與出站閘道器和介面卡不同,後者SpEL評估上下文的根物件是一個Message
,此入站介面卡在評估時還沒有訊息,因為最終它會使用傳輸的檔案作為payload生成訊息。因此,SpEL評估上下文的根物件是遠端檔案的原始名稱(一個String
)。
入站通道介面卡首先檢索本地目錄的File
物件,然後根據輪詢器配置發射每個檔案。從版本5.0開始,您現在可以限制在需要檢索新檔案時從FTP伺服器獲取的檔案數量。當目標檔案非常大或在帶有持久化檔案列表過濾器(稍後討論)的集群系統中執行時,這可能非常有益。為此目的使用max-fetch-size
。負值(預設值)表示無限制,將檢索所有匹配的檔案。更多資訊請參閱入站通道介面卡:控制遠端檔案獲取。自版本5.0以來,您還可以透過設定scanner
屬性為inbound-channel-adapter
提供自定義的DirectoryScanner
實現。
從Spring Integration 3.0開始,您可以指定preserve-timestamp
屬性(預設為false
)。當設定為true
時,本地檔案的修改時間戳會被設定為從伺服器獲取的值。否則,它會被設定為當前時間。
從版本4.2開始,您可以指定remote-directory-expression
代替remote-directory
,允許您在每次輪詢時動態確定目錄 — 例如,remote-directory-expression="@myBean.determineRemoteDir()"
。
從版本4.3開始,您可以省略remote-directory
和remote-directory-expression
屬性。它們預設為null
。在這種情況下,根據FTP協議,客戶端工作目錄將用作預設的遠端目錄。
有時,基於filename-pattern
屬性指定的簡單模式的檔案過濾可能不夠。在這種情況下,您可以使用filename-regex
屬性來指定正則表示式(例如filename-regex=".*\.test$"
)。此外,如果您需要完全控制,可以使用filter
屬性並提供對o.s.i.file.filters.FileListFilter
的任何自定義實現的引用,這是一個用於過濾檔案列表的策略介面。此過濾器確定要檢索哪些遠端檔案。您還可以透過使用CompositeFileListFilter
將基於模式的過濾器與其他過濾器(例如AcceptOnceFileListFilter
,以避免同步以前已獲取的檔案)組合使用。
AcceptOnceFileListFilter
將其狀態儲存在記憶體中。如果您希望狀態在系統重啟後仍然存在,請考慮使用FtpPersistentAcceptOnceFileListFilter
。此過濾器將接受的檔名儲存在MetadataStore
策略的例項中(參見元資料儲存)。此過濾器根據檔名和遠端修改時間進行匹配。
自版本4.0起,此過濾器需要ConcurrentMetadataStore
。當與共享資料儲存(例如帶有RedisMetadataStore
的Redis)一起使用時,它允許在多個應用程式或伺服器例項之間共享過濾器鍵。
從版本5.0開始,帶有記憶體中SimpleMetadataStore
的FtpPersistentAcceptOnceFileListFilter
預設應用於FtpInboundFileSynchronizer
。在XML配置中,此過濾器也適用於regex
或pattern
選項,以及Java DSL中的FtpInboundChannelAdapterSpec
。任何其他用例可以透過CompositeFileListFilter
(或ChainFileListFilter
)進行管理。
前面的討論是指在檢索檔案之前進行過濾。一旦檔案被檢索到,系統會向檔案系統上的檔案應用一個額外的過濾器。預設情況下,這是一個AcceptOnceFileListFilter
,如前所述,它在記憶體中保留狀態,並且不考慮檔案的修改時間。除非您的應用程式在處理後刪除檔案,否則介面卡在應用程式重啟後會預設重新處理磁碟上的檔案。
此外,如果您將過濾器配置為使用FtpPersistentAcceptOnceFileListFilter
,並且遠端檔案的時間戳發生變化(導致它被重新獲取),預設的本地過濾器將不允許處理這個新檔案。
有關此過濾器及其使用方法的更多資訊,請參閱遠端持久化檔案列表過濾器。
您可以使用local-filter
屬性配置本地檔案系統過濾器的行為。從版本4.3.8開始,預設配置了FileSystemPersistentAcceptOnceFileListFilter
。此過濾器將接受的檔名和修改時間戳儲存在MetadataStore
策略的例項中(參見元資料儲存),並檢測本地檔案修改時間的變化。預設的MetadataStore
是SimpleMetadataStore
,它將狀態儲存在記憶體中。
自版本4.1.5以來,這些過濾器有一個新屬性(flushOnUpdate
),該屬性使得它們在每次更新時重新整理元資料儲存(如果儲存實現了Flushable
)。
此外,如果您使用分散式MetadataStore (例如Redis),您可以擁有同一介面卡或應用程式的多個例項,並確保每個檔案只處理一次。 |
實際的本地過濾器是一個CompositeFileListFilter
,它包含提供的過濾器和一個模式過濾器,後者用於防止處理正在下載的檔案(基於temporary-file-suffix
)。檔案會以這個字尾(預設為.writing
)下載,並在傳輸完成後重新命名為最終名稱,使其對過濾器“可見”。
remote-file-separator
屬性允許您配置檔案分隔符,以便在預設的'/'不適用於您的特定環境時使用。
有關這些屬性的更多詳細資訊,請參閱schema。
您還應該瞭解,FTP入站通道介面卡是一個輪詢消費者。因此,您必須配置一個輪詢器(透過使用全域性預設配置或本地子元素)。檔案傳輸完成後,將生成一個以java.io.File
作為payload的訊息,併發送到由channel
屬性標識的通道。
從版本6.2開始,您可以使用FtpLastModifiedFileListFilter
基於最後修改策略過濾FTP檔案。可以為該過濾器配置一個age
屬性,以便只有比該值舊的檔案才能透過過濾器。預設的age
為60秒,但您應該選擇一個足夠大的值,以避免過早地獲取檔案(例如,由於網路故障)。請查閱其Javadoc瞭解更多資訊。
更多關於檔案過濾和不完整檔案
有時剛出現在受監控的(遠端)目錄中的檔案是不完整的。通常,這樣的檔案會以臨時副檔名(例如somefile.txt.writing
)寫入,然後在寫入過程完成後重新命名。在大多數情況下,您只對完整的檔案感興趣,並希望只過濾完整的檔案。為了處理這些場景,您可以使用filename-pattern
、filename-regex
和filter
屬性提供的過濾支援。以下示例使用了自定義過濾器實現
<int-ftp:inbound-channel-adapter
channel="ftpChannel"
session-factory="ftpSessionFactory"
filter="customFilter"
local-directory="file:/my_transfers">
remote-directory="some/remote/path"
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
<bean id="customFilter" class="org.example.CustomFilter"/>
入站FTP介面卡的輪詢器配置說明
入站FTP介面卡的任務包括兩個
-
與遠端伺服器通訊,以便將檔案從遠端目錄傳輸到本地目錄。
-
對於每個傳輸的檔案,生成一個以該檔案為payload的訊息,並將其傳送到由“channel”屬性標識的通道。這就是為什麼它們被稱為“通道介面卡”,而不是僅僅是“介面卡”。這種介面卡的主要工作是生成要傳送到訊息通道的訊息。本質上,第二個任務優先執行,如果您的本地目錄中已經有一個或多個檔案,它會首先從這些檔案生成訊息。只有在所有本地檔案都被處理完畢後,它才會啟動遠端通訊來檢索更多檔案。
此外,在配置輪詢器上的觸發器時,您應該密切注意max-messages-per-poll
屬性。對於所有SourcePollingChannelAdapter
例項(包括FTP),其預設值為1
。這意味著,只要處理完一個檔案,它就會等待由您的觸發器配置確定的下一次執行時間。如果您在local-directory
中有意存放了一個或多個檔案,它會在啟動與遠端FTP伺服器的通訊之前處理這些檔案。此外,如果max-messages-per-poll
設定為1
(預設值),它會按照觸發器定義的間隔一次只處理一個檔案,本質上工作方式是“一次輪詢 === 一個檔案”。
對於典型的檔案傳輸用例,您最有可能想要相反的行為:在每次輪詢時處理儘可能多的檔案,然後才等待下一次輪詢。如果是這種情況,請將max-messages-per-poll
設定為-1
。然後,在每次輪詢時,介面卡會嘗試生成儘可能多的訊息。換句話說,它會處理本地目錄中的所有內容,然後連線到遠端目錄,將那裡所有可供本地處理的檔案都傳輸過來。只有這樣,輪詢操作才算完成,並且輪詢器才會等待下一次執行時間。
或者,您可以將max-messages-per-poll
的值設定為一個正數,表示每次輪詢從檔案建立的訊息數量的上限。例如,值為10
表示在每次輪詢時,它嘗試處理的檔案數量不超過十個。
從故障中恢復
理解介面卡的架構非常重要。有一個檔案同步器負責獲取檔案,還有一個FileReadingMessageSource
負責為每個同步的檔案傳送訊息。如前所述,涉及到兩個過濾器。filter
屬性(和模式)指代遠端(FTP)檔案列表,以避免獲取已經獲取過的檔案。local-filter
由FileReadingMessageSource
使用,以確定哪些檔案將被作為訊息傳送。
同步器列出遠端檔案並諮詢其過濾器。然後檔案被傳輸。如果在檔案傳輸過程中發生IO錯誤,任何已經新增到過濾器的檔案都會被移除,以便在下一次輪詢時可以重新獲取。這隻適用於過濾器實現了ReversibleFileListFilter
(例如AcceptOnceFileListFilter
)的情況。
如果在同步檔案後,下游流程處理檔案時發生錯誤,過濾器不會自動回滾,因此預設情況下失敗的檔案不會被重新處理。
如果您希望在發生故障後重新處理這些檔案,可以使用類似於以下的配置來方便地從過濾器中移除失敗的檔案
<int-ftp:inbound-channel-adapter id="ftpAdapter"
session-factory="ftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/ftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-ftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
上述配置適用於任何ResettableFileListFilter
。
從版本5.0開始,入站通道介面卡可以在本地構建與生成的本地檔名對應的子目錄。這也可以是遠端子路徑。為了能夠根據層次結構支援遞迴讀取本地目錄的修改,您現在可以為內部的FileReadingMessageSource
提供一個新的基於Files.walk()
演算法的RecursiveDirectoryScanner
。更多資訊請參閱AbstractInboundFileSynchronizingMessageSource.setScanner()
。此外,您現在可以使用setUseWatchService()
選項將AbstractInboundFileSynchronizingMessageSource
切換到基於WatchService
的DirectoryScanner
。它也配置了所有WatchEventType
例項,以對本地目錄中的任何修改作出反應。前面展示的重新處理示例基於FileReadingMessageSource.WatchServiceDirectoryScanner
的內建功能,即當檔案從本地目錄中刪除(StandardWatchEventKinds.ENTRY_DELETE
)時執行ResettableFileListFilter.remove()
。更多資訊請參閱WatchServiceDirectoryScanner
。
使用Java配置
以下Spring Boot應用程式展示瞭如何使用Java配置配置入站介面卡
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用Java DSL配置
以下Spring Boot應用程式展示瞭如何使用Java DSL配置入站介面卡
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlow
.from(Ftp.inboundAdapter(this.ftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilename(f -> f.toUpperCase() + ".a")
.localDirectory(new File("d:\\ftp_files")),
e -> e.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}