Skip to content

Commit e1ff1a9

Browse files
committed
extend observer with onFileStage
Signed-off-by: Robrecht Cannoodt <[email protected]>
1 parent 36153c6 commit e1ff1a9

File tree

3 files changed

+30
-3
lines changed

3 files changed

+30
-3
lines changed

modules/nextflow/src/main/groovy/nextflow/Session.groovy

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1149,6 +1149,18 @@ class Session implements ISession {
11491149
}
11501150
}
11511151

1152+
void notifyFileStage(Path destination, Path source=null) {
1153+
def copy = new ArrayList<TraceObserver>(observers)
1154+
for( TraceObserver observer : copy ) {
1155+
try {
1156+
observer.onFileStage(destination, source)
1157+
}
1158+
catch( Exception e ) {
1159+
log.error "Failed to invoke observer on file stage: $observer", e
1160+
}
1161+
}
1162+
}
1163+
11521164
void notifyFlowComplete() {
11531165
def copy = new ArrayList<TraceObserver>(observers)
11541166
for( TraceObserver observer : copy ) {

modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ class FilePorter {
105105
}
106106

107107
protected FileTransfer createFileTransfer(Path source, Path target) {
108-
return new FileTransfer(source, target, maxRetries, semaphore)
108+
return new FileTransfer(source, target, maxRetries, semaphore, session)
109109
}
110110

111111
protected FileTransfer getOrSubmit(FileCopy copy) {
@@ -281,15 +281,17 @@ class FilePorter {
281281
volatile Future result
282282
private String message
283283
private int debugDelay
284+
final private Session session
284285

285-
FileTransfer(Path foreignPath, Path stagePath, int maxRetries, Semaphore semaphore) {
286+
FileTransfer(Path foreignPath, Path stagePath, int maxRetries, Semaphore semaphore, Session session) {
286287
this.semaphore = semaphore
287288
this.source = foreignPath
288289
this.target = stagePath
289290
this.maxRetries = maxRetries
290291
this.message = "Staging foreign file: ${source.toUriString()}"
291292
this.refCount = new AtomicInteger(0)
292293
this.debugDelay = System.getProperty('filePorter.debugDelay') as Integer ?: 0
294+
this.session = session
293295
}
294296

295297
@Override
@@ -325,7 +327,9 @@ class FilePorter {
325327
int count = 0
326328
while( true ) {
327329
try {
328-
return stageForeignFile0(filePath, stagePath)
330+
def output = stageForeignFile0(filePath, stagePath)
331+
this.session.notifyFileStage(stagePath, filePath)
332+
return output
329333
}
330334
catch( IOException e ) {
331335
// remove the target file that could be have partially downloaded

modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,4 +150,15 @@ trait TraceObserver {
150150
void onFilePublish(Path destination, Path source){
151151
onFilePublish(destination)
152152
}
153+
154+
/**
155+
* Method that is invoke when an output file is staged
156+
* into the work directory.
157+
*
158+
* @param destination
159+
* The destination path at staging folder.
160+
* @param source
161+
* The source remote source path.
162+
*/
163+
void onFileStage(Path destination, Path source){}
153164
}

0 commit comments

Comments
 (0)