SFTP 流式入站通道介面卡
4.3 版本引入了流式入站通道介面卡。此介面卡生成負載型別為 InputStream
的訊息,允許您在不寫入本地檔案系統的情況下抓取檔案。由於 session 保持開啟狀態,因此使用方應用程式負責在檔案消費後關閉 session。session 會在 closeableResource
頭部 (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
) 中提供。標準框架元件,如 FileSplitter
和 StreamTransformer
,會自動關閉 session。有關這些元件的更多資訊,請參見檔案分割器和流轉換器。以下示例展示瞭如何配置 SFTP 流式入站通道介面卡。
<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>
您只能使用 filename-pattern
、filename-regex
、filter
或 filter-expression
中的一個。
從 5.0 版本開始,SftpStreamingMessageSource 介面卡預設使用基於記憶體 SimpleMetadataStore 的 SftpPersistentAcceptOnceFileListFilter 來防止遠端檔案重複。預設情況下,此過濾器也與檔名模式(或正則表示式)一起應用。如果需要允許重複,可以使用 AcceptAllFileListFilter 。您可以使用 CompositeFileListFilter (或 ChainFileListFilter )來處理任何其他用例。稍後展示的 Java 配置展示了一種在處理後刪除遠端檔案以避免重複的技術。 |
有關 SftpPersistentAcceptOnceFileListFilter
的更多資訊及其使用方法,請參見遠端持久化檔案列表過濾器。
您可以使用 max-fetch-size
屬性來限制每次輪詢時需要抓取的檔案數量。在叢集環境中執行時,將其設定為 1
並使用持久化過濾器。有關更多資訊,請參見入站通道介面卡:控制遠端檔案抓取。
介面卡會將遠端目錄和檔名分別放入頭部(FileHeaders.REMOTE_DIRECTORY
和 FileHeaders.REMOTE_FILE
)。從 5.0 版本開始,FileHeaders.REMOTE_FILE_INFO
頭部提供了額外的遠端檔案資訊(JSON 格式)。如果您將 SftpStreamingMessageSource
上的 fileInfoJson
屬性設定為 false
,則頭部包含一個 SftpFileInfo
物件。您可以透過使用 SftpFileInfo.getFileInfo()
方法訪問底層 SftpClient
提供的 SftpClient.DirEntry
物件。在使用 XML 配置時,fileInfoJson
屬性不可用,但您可以透過將 SftpStreamingMessageSource
注入到您的配置類中來設定它。另請參見遠端檔案資訊。
使用 Java 配置進行配置
以下 Spring Boot 應用程式展示瞭如何使用 Java 配置入站介面卡的示例。
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
messageSource.setRemoteDirectory("sftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public SftpRemoteFileTemplate template() {
return new SftpRemoteFileTemplate(sftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + '/' + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
注意,在此示例中,轉換器下游的訊息處理器有一個 `advice`,用於在處理後刪除遠端檔案。