Skip to content

Commit d30c74e

Browse files
committed
add new executor for Fujitsu Technical Computing Suite (TCS)
1 parent 579873b commit d30c74e

File tree

2 files changed

+206
-1
lines changed

2 files changed

+206
-1
lines changed

modules/nextflow/src/main/groovy/nextflow/executor/ExecutorFactory.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ class ExecutorFactory {
6161
'nqsii': NqsiiExecutor,
6262
'moab': MoabExecutor,
6363
'oar': OarExecutor,
64-
'hq': HyperQueueExecutor
64+
'hq': HyperQueueExecutor,
65+
'tcs': TcsExecutor
6566
]
6667

6768
@PackageScope Map<String, Class<? extends Executor>> executorsMap
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
/*
15+
Implemented by Satoshi Ohshima <[email protected]>
16+
I referred to the other executors groovy implementations.
17+
Resource requests and other job characteristics can be controlled via the following process directives:
18+
- time
19+
- clusterOptions
20+
queue (resource group), cpu, node, and other options should be indicated by clusterOptions.
21+
This is because they depend on target systems (required options are not the same) and can be controlled by "-L" options in arguments of pjsub command.
22+
23+
example of nextflow.config on Genkai (Kyushu University)
24+
process {
25+
executor = 'tcs'
26+
time = '00:30:00'
27+
clusterOptions = '-L rscgrp=a-batch -L vnode-core=4'
28+
}
29+
30+
example of nextflow.config on Flow (Nagoya University)
31+
process {
32+
executor = 'tcs'
33+
time = '00:30:00'
34+
clusterOptions = '-L rscunit=cx -L rscgrp=cx-share -L gpu=1'
35+
}
36+
37+
*/
38+
39+
package nextflow.executor
40+
41+
import java.nio.file.Path
42+
import java.util.regex.Pattern
43+
44+
import groovy.transform.CompileStatic
45+
import groovy.util.logging.Slf4j
46+
import nextflow.processor.TaskArrayRun
47+
import nextflow.processor.TaskRun
48+
/*
49+
* Implements a executor for Fujitsu TCS
50+
*
51+
* See https://software.fujitsu.com/jp/manual/manualindex/p21000154.html (in Japanese)
52+
*/
53+
@Slf4j
54+
@CompileStatic
55+
class TcsExecutor extends AbstractGridExecutor implements TaskArrayExecutor {
56+
57+
static private Pattern SUBMIT_REGEX = ~/\[INFO\] PJM 0000 pjsub Job (\d+) submitted./
58+
/* modify jobname for TCS on Fugaku */
59+
static String modName(String name1){
60+
String name2 = name1.replaceAll("\\(", "")
61+
String name3 = name2.replaceAll("\\)", "")
62+
return name3
63+
}
64+
65+
/**
66+
* Gets the directives to submit the specified task to the cluster for execution
67+
*
68+
* @param task A {@link TaskRun} to be submitted
69+
* @param result The {@link List} instance to which add the job directives
70+
* @return A {@link List} containing all directive tokens and values.
71+
*/
72+
protected List<String> getDirectives( TaskRun task, List<String> result ) {
73+
assert result !=null
74+
75+
if( task instanceof TaskArrayRun ) {
76+
final arraySize = task.getArraySize()
77+
result << '--bulk --sparam' << "0-${arraySize - 1}".toString()
78+
}
79+
80+
result << '-N' << modName(getJobNameFor(task))
81+
82+
result << '-o' << (task.isArray() ? '/dev/null' : quote(task.workDir.resolve(TaskRun.CMD_LOG)))
83+
result << '-j' << ''
84+
result << '-S' << ''
85+
86+
// max task duration
87+
if( task.config.getTime() ) {
88+
final duration = task.config.getTime()
89+
result << "-L" << "elapse=${duration.format('HH:mm:ss')}".toString()
90+
}
91+
92+
// -- at the end append the command script wrapped file name
93+
addClusterOptionsDirective(task.config, result)
94+
95+
return result
96+
}
97+
98+
/**
99+
* The command line to submit this job
100+
*
101+
* @param task The {@link TaskRun} instance to submit for execution to the cluster
102+
* @param scriptFile The file containing the job launcher script
103+
* @return A list representing the submit command line
104+
*/
105+
List<String> getSubmitCommandLine(TaskRun task, Path scriptFile ) {
106+
[ 'pjsub', '-N', modName(getJobNameFor(task)), scriptFile.getName() ]
107+
}
108+
109+
protected String getHeaderToken() { '#PJM' }
110+
111+
/**
112+
* Parse the string returned by the {@code pjsub} command and extract the job ID string
113+
*
114+
* @param text The string returned when submitting the job
115+
* @return The actual job ID string
116+
*/
117+
@Override
118+
def parseJobId( String text ) {
119+
for( String line : text.readLines() ) {
120+
log.warn1 line
121+
def m = SUBMIT_REGEX.matcher(line)
122+
if( m.find() ) {
123+
return m.group(1).toString()
124+
}
125+
}
126+
throw new IllegalArgumentException("Invalid TCS submit response:\n$text\n\n")
127+
}
128+
129+
@Override
130+
protected List<String> getKillCommand() { ['pjdel'] }
131+
132+
@Override
133+
protected List<String> queueStatusCommand(Object queue) {
134+
//String cmd = 'pjstat | grep -v JOB_ID'
135+
//if( queue ) cmd += ' ' + queue
136+
//return ['bash','-c', "set -o pipefail; $cmd | { grep -E '(Job Id:|job_state =)' || true; }".toString()]
137+
final result = ['pjstat']
138+
return result
139+
}
140+
141+
static private Map<String,QueueStatus> DECODE_STATUS = [
142+
'ACC': QueueStatus.PENDING,
143+
'QUE': QueueStatus.PENDING,
144+
'RNA': QueueStatus.PENDING,
145+
'RUN': QueueStatus.RUNNING,
146+
'RNO': QueueStatus.RUNNING,
147+
'EXT': QueueStatus.RUNNING,
148+
'CCL': QueueStatus.DONE,
149+
'HLD': QueueStatus.HOLD,
150+
'ERR': QueueStatus.ERROR
151+
]
152+
153+
protected QueueStatus decode(String status) {
154+
DECODE_STATUS.get(status)
155+
}
156+
157+
@Override
158+
protected Map<String, QueueStatus> parseQueueStatus(String text) {
159+
160+
final JOB_ID = 'Job Id:'
161+
final JOB_STATUS = 'job_state ='
162+
final result = new LinkedHashMap<String, QueueStatus>()
163+
164+
String id = null
165+
String status = null
166+
text.eachLine { line ->
167+
if( line.startsWith(JOB_ID) ) {
168+
id = fetchValue(JOB_ID, line)
169+
}
170+
else if( id ) {
171+
status = fetchValue(JOB_STATUS, line)
172+
}
173+
result.put( id, decode(status) ?: QueueStatus.UNKNOWN )
174+
}
175+
176+
return result
177+
}
178+
179+
static String fetchValue( String prefix, String line ) {
180+
final p = line.indexOf(prefix)
181+
return p!=-1 ? line.substring(p+prefix.size()).trim() : null
182+
}
183+
184+
static protected boolean matchOptions(String value) {
185+
value ? SUBMIT_REGEX.matcher(value).find() : null
186+
}
187+
188+
@Override
189+
String getArrayIndexName() {
190+
return 'TCS_SUBJOBID'
191+
}
192+
193+
@Override
194+
int getArrayIndexStart() {
195+
return 0
196+
}
197+
198+
@Override
199+
String getArrayTaskId(String jobId, int index) {
200+
assert jobId, "Missing 'jobId' argument"
201+
return jobId.replace('[]', "[$index]")
202+
}
203+
204+
}

0 commit comments

Comments
 (0)