FTP 流式入站通道介面卡
版本 4.3 引入了流式入站通道介面卡。此介面卡生成的Payload型別為 InputStream
的訊息,允許在不寫入本地檔案系統的情況下獲取檔案。由於會話保持開啟狀態,消費應用程式負責在檔案被消費後關閉會話。會話在 closeableResource
訊息頭 (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
) 中提供。標準框架元件,例如 FileSplitter
和 StreamTransformer
,會自動關閉會話。有關這些元件的更多資訊,請參閱 檔案分割器 和 流轉換器。以下示例展示瞭如何配置 inbound-streaming-channel-adapter
<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>
只能指定 filename-pattern
, filename-regex
, filter
或 filter-expression
中的一個。
從版本 5.0 開始,預設情況下,FtpStreamingMessageSource 介面卡使用基於記憶體 SimpleMetadataStore 的 FtpPersistentAcceptOnceFileListFilter 來防止遠端檔案重複。預設情況下,此過濾器也適用於檔名模式(或正則表示式)。如果需要允許重複,可以使用 AcceptAllFileListFilter 。任何其他用例可以透過 CompositeFileListFilter (或 ChainFileListFilter )處理。Java 配置(文件後面部分)展示了一種在處理後刪除遠端檔案以避免重複的技術。 |
有關 FtpPersistentAcceptOnceFileListFilter
及其使用方式的更多資訊,請參閱 遠端持久檔案列表過濾器。
使用 max-fetch-size
屬性來限制每次輪詢時(需要獲取檔案時)獲取的檔案數量。在叢集環境中執行時,將其設定為 1
並使用持久過濾器。有關更多資訊,請參閱 入站通道介面卡:控制遠端檔案獲取。
介面卡分別將遠端目錄和檔名放在 FileHeaders.REMOTE_DIRECTORY
和 FileHeaders.REMOTE_FILE
訊息頭中。從版本 5.0 開始,FileHeaders.REMOTE_FILE_INFO
訊息頭提供了額外的遠端檔案資訊(預設以 JSON 格式表示)。如果將 FtpStreamingMessageSource
上的 fileInfoJson
屬性設定為 false
,則訊息頭包含一個 FtpFileInfo
物件。可以透過底層 Apache Net 庫提供的 FTPFile
物件,使用 FtpFileInfo.getFileInfo()
方法進行訪問。在使用 XML 配置時,fileInfoJson
屬性不可用,但可以透過將 FtpStreamingMessageSource
注入到配置類中來設定它。另請參閱 遠端檔案資訊。
從版本 5.1 開始,comparator
的泛型型別是 FTPFile
。之前是 AbstractFileInfo<FTPFile>
。這是因為排序現在在處理過程中的更早階段執行,即在過濾和應用 maxFetch
之前。
使用 Java 配置進行配置
以下 Spring Boot 應用程式展示瞭如何使用 Java 配置來配置入站介面卡的示例
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
messageSource.setRemoteDirectory("ftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public FtpRemoteFileTemplate template() {
return new FtpRemoteFileTemplate(ftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
注意,在此示例中,轉換器下游的訊息處理器包含一個 advice
,用於在處理後刪除遠端檔案。