diff --git a/docs/executor.md b/docs/executor.md index 8595b4917f..264cf83edd 100644 --- a/docs/executor.md +++ b/docs/executor.md @@ -473,3 +473,42 @@ Nextflow does not provide direct support for SLURM multi-clusters. If you need t :::{versionadded} 23.07.0-edge Some SLURM clusters require memory allocations to be specified with `--mem-per-cpu` instead of `--mem`. You can specify `executor.perCpuMemAllocation = true` in the Nextflow configuration to enable this behavior. Nextflow will automatically compute the memory per CPU for each task (by default 1 CPU is used). ::: + +## TCS + +The `tcs` executor allows you to run your pipeline script using a [Fujitsu Technical Computing Suite (TCS)](https://software.fujitsu.com/jp/manual/manualindex/p21000155e.html). + +Nextflow manages each process as a separate job that is submitted to the cluster using the `pjsub` command. + +The pipeline must be launched from a node where the `pjsub` command is available, which is typically the login node. + +To enable the TCS executor, set `process.executor = 'tcs'` in the `nextflow.config` file. + +Resource requests and other job characteristics can be controlled via the following process directives: + +- {ref}`process-clusterOptions` +- {ref}`process-time` + +:::{note} +Other options such as queue (resource group), cpu, and node should be indicated by clusterOptions. +This is because they depend on target systems (required options are not the same) and can be controlled by "-L" options in arguments of pjsub command. + +This is an example of nextflow.config on Supercomputer Genkai (Kyushu University). +``` +process { + executor = 'tcs' + time = '00:30:00' + clusterOptions = '-L rscgrp=a-batch -L vnode-core=4' +} +``` + +This is an example of nextflow.config on Supercomputer Flow (Nagoya University). +``` +process { + executor = 'tcs' + time = '00:30:00' + clusterOptions = '-L rscunit=cx -L rscgrp=cx-share -L gpu=1' +} +``` + +(tcs-executor)= diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/ExecutorFactory.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/ExecutorFactory.groovy index 57a1535b6c..c09ac6cd35 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/ExecutorFactory.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/ExecutorFactory.groovy @@ -61,7 +61,8 @@ class ExecutorFactory { 'nqsii': NqsiiExecutor, 'moab': MoabExecutor, 'oar': OarExecutor, - 'hq': HyperQueueExecutor + 'hq': HyperQueueExecutor, + 'tcs': TcsExecutor ] @PackageScope Map> executorsMap diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/TcsExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/TcsExecutor.groovy new file mode 100644 index 0000000000..966ed93e5f --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/executor/TcsExecutor.groovy @@ -0,0 +1,176 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.executor + +import java.nio.file.Path +import java.util.regex.Pattern + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.processor.TaskArrayRun +import nextflow.processor.TaskRun + +@Slf4j +@CompileStatic +class TcsExecutor extends AbstractGridExecutor implements TaskArrayExecutor { + + static private Pattern SUBMIT_REGEX = ~/\[INFO\] PJM 0000 pjsub Job (\d+) submitted./ + /* modify jobname for TCS on Fugaku */ + static String modName(String name1){ + String name2 = name1.replaceAll("\\(", "") + String name3 = name2.replaceAll("\\)", "") + return name3 + } + + /** + * Gets the directives to submit the specified task to the cluster for execution + * + * @param task A {@link TaskRun} to be submitted + * @param result The {@link List} instance to which add the job directives + * @return A {@link List} containing all directive tokens and values. + */ + protected List getDirectives( TaskRun task, List result ) { + assert result !=null + + if( task instanceof TaskArrayRun ) { + final arraySize = task.getArraySize() + result << '--bulk --sparam' << "0-${arraySize - 1}".toString() + } + + result << '-N' << modName(getJobNameFor(task)) + + result << '-o' << (task.isArray() ? '/dev/null' : quote(task.workDir.resolve(TaskRun.CMD_LOG))) + result << '-j' << '' + result << '-S' << '' + + // max task duration + if( task.config.getTime() ) { + final duration = task.config.getTime() + result << "-L" << "elapse=${duration.format('HH:mm:ss')}".toString() + } + + // -- at the end append the command script wrapped file name + addClusterOptionsDirective(task.config, result) + + return result + } + + /** + * The command line to submit this job + * + * @param task The {@link TaskRun} instance to submit for execution to the cluster + * @param scriptFile The file containing the job launcher script + * @return A list representing the submit command line + */ + List getSubmitCommandLine(TaskRun task, Path scriptFile ) { + [ 'pjsub', '-N', modName(getJobNameFor(task)), scriptFile.getName() ] + } + + protected String getHeaderToken() { '#PJM' } + + /** + * Parse the string returned by the {@code pjsub} command and extract the job ID string + * + * @param text The string returned when submitting the job + * @return The actual job ID string + */ + @Override + def parseJobId( String text ) { + for( String line : text.readLines() ) { + log.warn1 line + def m = SUBMIT_REGEX.matcher(line) + if( m.find() ) { + return m.group(1).toString() + } + } + throw new IllegalArgumentException("Invalid TCS submit response:\n$text\n\n") + } + + @Override + protected List getKillCommand() { ['pjdel'] } + + @Override + protected List queueStatusCommand(Object queue) { + //String cmd = 'pjstat | grep -v JOB_ID' + //if( queue ) cmd += ' ' + queue + //return ['bash','-c', "set -o pipefail; $cmd | { grep -E '(Job Id:|job_state =)' || true; }".toString()] + final result = ['pjstat'] + return result + } + + static private Map DECODE_STATUS = [ + 'ACC': QueueStatus.PENDING, + 'QUE': QueueStatus.PENDING, + 'RNA': QueueStatus.PENDING, + 'RUN': QueueStatus.RUNNING, + 'RNO': QueueStatus.RUNNING, + 'EXT': QueueStatus.RUNNING, + 'CCL': QueueStatus.DONE, + 'HLD': QueueStatus.HOLD, + 'ERR': QueueStatus.ERROR + ] + + protected QueueStatus decode(String status) { + DECODE_STATUS.get(status) + } + + @Override + protected Map parseQueueStatus(String text) { + + final JOB_ID = 'Job Id:' + final JOB_STATUS = 'job_state =' + final result = new LinkedHashMap() + + String id = null + String status = null + text.eachLine { line -> + if( line.startsWith(JOB_ID) ) { + id = fetchValue(JOB_ID, line) + } + else if( id ) { + status = fetchValue(JOB_STATUS, line) + } + result.put( id, decode(status) ?: QueueStatus.UNKNOWN ) + } + + return result + } + + static String fetchValue( String prefix, String line ) { + final p = line.indexOf(prefix) + return p!=-1 ? line.substring(p+prefix.size()).trim() : null + } + + static protected boolean matchOptions(String value) { + value ? SUBMIT_REGEX.matcher(value).find() : null + } + + @Override + String getArrayIndexName() { + return 'TCS_SUBJOBID' + } + + @Override + int getArrayIndexStart() { + return 0 + } + + @Override + String getArrayTaskId(String jobId, int index) { + assert jobId, "Missing 'jobId' argument" + return jobId.replace('[]', "[$index]") + } + +}