SFTP 入站通道介面卡
SFTP 入站通道介面卡是一個特殊的監聽器,它連線到伺服器並監聽遠端目錄事件(例如建立新檔案),此時它會啟動檔案傳輸。以下示例展示瞭如何配置 SFTP 入站通道介面卡
<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
session-factory="sftpSessionFactory"
channel="requestChannel"
filename-pattern="*.txt"
remote-directory="/foo/bar"
preserve-timestamp="true"
local-directory="file:target/foo"
auto-create-local-directory="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
delete-remote-files="false">
<int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>
前面的配置示例展示瞭如何為各種屬性提供值,包括以下內容
-
local-directory
:檔案將要傳輸到的位置 -
remote-directory
:檔案將要從中傳輸的遠端源目錄 -
session-factory
:對我們之前配置的 bean 的引用
預設情況下,傳輸的檔案與原始檔案同名。如果要覆蓋此行為,可以設定 local-filename-generator-expression
屬性,它允許您提供一個 SpEL 表示式來生成本地檔案的名稱。與出站閘道器和介面卡不同,在這些情況下 SpEL 評估上下文的根物件是 Message
,而此入站介面卡在評估時還沒有訊息,因為這是它最終用傳輸的檔案作為其負載生成的內容。因此,SpEL 評估上下文的根物件是遠端檔案的原始名稱(一個 String
)。
入站通道介面卡首先將檔案檢索到本地目錄,然後根據輪詢器配置發出每個檔案。從 5.0 版本開始,您可以在需要檢索新檔案時限制從 SFTP 伺服器獲取的檔案數量。當目標檔案較大或在帶有持久化檔案列表過濾器的集群系統中執行時,這會很有益,本節後面會討論此過濾器。為此目的使用 max-fetch-size
。負值(預設值)表示沒有限制,並且會檢索所有匹配的檔案。有關更多資訊,請參閱 入站通道介面卡:控制遠端檔案獲取。自 5.0 版本以來,您還可以透過設定 scanner
屬性向 inbound-channel-adapter
提供自定義的 DirectoryScanner
實現。
從 Spring Integration 3.0 開始,您可以指定 preserve-timestamp
屬性(預設為 false
)。當設定為 true
時,本地檔案的修改時間戳會設定為從伺服器檢索到的值。否則,它會設定為當前時間。
從 4.2 版本開始,您可以指定 remote-directory-expression
代替 remote-directory
,它允許您在每次輪詢時動態確定目錄 — 例如,remote-directory-expression="@myBean.determineRemoteDir()"
。
有時,基於透過 filename-pattern
屬性指定的簡單模式進行的檔案過濾可能不夠用。如果出現這種情況,您可以使用 filename-regex
屬性來指定正則表示式(例如,filename-regex=".*\.test$"
)。如果您需要完全控制,可以使用 filter
屬性提供對 org.springframework.integration.file.filters.FileListFilter
自定義實現的引用,這是一個用於過濾檔案列表的策略介面。此過濾器確定哪些遠端檔案會被檢索。您還可以透過使用 CompositeFileListFilter
將基於模式的過濾器與其他過濾器(例如 AcceptOnceFileListFilter
,以避免同步之前已獲取的檔案)結合使用。
AcceptOnceFileListFilter
將其狀態儲存在記憶體中。如果您希望狀態在系統重啟後仍然存在,請考慮使用 SftpPersistentAcceptOnceFileListFilter
。此過濾器會將接受的檔名儲存在 MetadataStore
策略的例項中(請參閱 元資料儲存)。此過濾器根據檔名和遠端修改時間進行匹配。
自 4.0 版本以來,此過濾器需要一個 ConcurrentMetadataStore
。當與共享資料儲存(例如使用 RedisMetadataStore
的 Redis
)一起使用時,這允許在多個應用程式或伺服器例項之間共享過濾鍵。
從 5.0 版本開始,預設情況下,對於 SftpInboundFileSynchronizer
會應用帶有記憶體中的 SimpleMetadataStore
的 SftpPersistentAcceptOnceFileListFilter
。此過濾器也會與 XML 配置中的 regex
或 pattern
選項以及 Java DSL 中的 SftpInboundChannelAdapterSpec
一起應用。您可以透過使用 CompositeFileListFilter
(或 ChainFileListFilter
)處理任何其他用例。
上面的討論是指在檢索檔案之前對檔案進行過濾。檔案檢索後,會對檔案系統上的檔案應用額外的過濾器。預設情況下,這是一個 AcceptOnceFileListFilter
,如本節所述,它將狀態保留在記憶體中,並且不考慮檔案的修改時間。除非您的應用程式在處理後刪除檔案,否則介面卡在應用程式重啟後預設會重新處理磁碟上的檔案。
此外,如果您將 filter
配置為使用 SftpPersistentAcceptOnceFileListFilter
並且遠端檔案時間戳發生變化(導致其被重新獲取),則預設的本地過濾器不允許處理此新檔案。
有關此過濾器的更多資訊以及如何使用它,請參閱 遠端持久化檔案列表過濾器。
您可以使用 local-filter
屬性來配置本地檔案系統過濾器的行為。從 4.3.8 版本開始,預設配置了一個 FileSystemPersistentAcceptOnceFileListFilter
。此過濾器將接受的檔名和修改時間戳儲存在 MetadataStore
策略的例項中(請參閱 元資料儲存),並檢測本地檔案修改時間的變化。預設的 MetadataStore
是一個 SimpleMetadataStore
,它將狀態儲存在記憶體中。
自 4.1.5 版本以來,這些過濾器新增了一個名為 flushOnUpdate
的屬性,它會導致它們在每次更新時重新整理元資料儲存(如果儲存實現了 Flushable
)。
此外,如果您使用分散式 MetadataStore (例如 Redis 元資料儲存),您可以擁有同一介面卡或應用程式的多個例項,並確保只有一個例項處理檔案。 |
實際的本地過濾器是一個 CompositeFileListFilter
,它包含提供的過濾器和一個模式過濾器,用於阻止處理正在下載中的檔案(基於 temporary-file-suffix
)。檔案會以該字尾(預設為 .writing
)下載,並在傳輸完成後重新命名為最終名稱,使其對過濾器“可見”。
有關這些屬性的更多詳細資訊,請參閱 schema。
SFTP 入站通道介面卡是一個輪詢消費者。因此,您必須配置一個輪詢器(可以是全域性預設的或本地元素)。檔案傳輸到本地目錄後,會生成一個以 java.io.File
作為負載型別的訊息,併發送到由 channel
屬性標識的通道。
從 6.2 版本開始,您可以使用 SftpLastModifiedFileListFilter
基於最後修改策略來過濾 SFTP 檔案。此過濾器可以使用 age
屬性進行配置,以便只有早於此值的檔案才透過過濾器。預設年齡為 60 秒,但您應選擇足夠大的年齡以避免過早獲取檔案(例如由於網路故障)。有關更多資訊,請檢視其 Javadoc。
更多關於檔案過濾和大型檔案的資訊
有時,剛出現在被監控(遠端)目錄中的檔案可能不完整。通常,此類檔案會寫入帶有臨時副檔名(例如檔名為 something.txt.writing
的檔案上的 .writing
),然後在寫入過程完成後重新命名。在大多數情況下,開發人員只對完整的檔案感興趣,並希望僅過濾這些檔案。為了處理這些情況,您可以使用 filename-pattern
、filename-regex
和 filter
屬性提供的過濾支援。如果您需要自定義過濾器實現,可以透過設定 filter
屬性在您的介面卡中包含一個引用。以下示例展示瞭如何實現
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="receiveChannel"
session-factory="sftpSessionFactory"
filter="customFilter"
local-directory="file:/local-test-dir"
remote-directory="/remote-test-dir">
<int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>
<bean id="customFilter" class="org.foo.CustomFilter"/>
從故障中恢復
您應該瞭解介面卡的架構。檔案同步器獲取檔案,而 FileReadingMessageSource
為每個同步檔案發出一條訊息。如前所述,涉及兩個過濾器。filter
屬性(和模式)指代遠端 (SFTP) 檔案列表,以避免獲取已經獲取的檔案。FileReadingMessageSource
使用 local-filter
來確定哪些檔案將作為訊息傳送。
同步器列出遠端檔案並諮詢其過濾器。然後傳輸檔案。如果在檔案傳輸過程中發生 IO 錯誤,任何已新增到過濾器的檔案都會被移除,以便它們在下一次輪詢時有資格被重新獲取。這僅適用於過濾器實現了 ReversibleFileListFilter
(例如 AcceptOnceFileListFilter
)的情況。
如果在同步檔案後,下游流處理檔案時發生錯誤,則不會自動回滾過濾器,因此預設情況下失敗的檔案不會被重新處理。
如果您希望在故障後重新處理此類檔案,可以使用類似於以下的配置來方便地從過濾器中移除失敗的檔案
<int-sftp:inbound-channel-adapter id="sftpAdapter"
session-factory="sftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/sftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-sftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
前面的配置適用於任何 ResettableFileListFilter
。
從 5.0 版本開始,入站通道介面卡可以根據生成的本地檔名在本地構建子目錄。這也可以是遠端子路徑。為了能夠根據層次結構支援遞迴讀取本地目錄進行修改,您現在可以使用基於 Files.walk()
演算法的新 RecursiveDirectoryScanner
為內部的 FileReadingMessageSource
提供支援。有關更多資訊,請參閱 AbstractInboundFileSynchronizingMessageSource.setScanner()
。此外,您現在可以透過使用 setUseWatchService()
選項將 AbstractInboundFileSynchronizingMessageSource
切換到基於 WatchService
的 DirectoryScanner
。它也配置為所有 WatchEventType
例項對本地目錄中的任何修改做出反應。前面展示的重新處理示例基於 FileReadingMessageSource.WatchServiceDirectoryScanner
的內建功能,該功能在檔案從本地目錄中刪除時(StandardWatchEventKinds.ENTRY_DELETE
)使用 ResettableFileListFilter.remove()
。有關更多資訊,請參閱 WatchServiceDirectoryScanner
。
使用 Java 配置
以下 Spring Boot 應用程式展示瞭如何使用 Java 配置入站介面卡的示例
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(port);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
factory.setTestSession(true);
return new CachingSessionFactory<>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File("sftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 配置
以下 Spring Boot 應用程式展示瞭如何使用 Java DSL 配置入站介面卡的示例
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlow
.from(Sftp.inboundAdapter(this.sftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilenameExpression("#this.toUpperCase() + '.a'")
.localDirectory(new File("sftp-inbound")),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}