-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPMSJob.py
118 lines (92 loc) · 3.16 KB
/
PMSJob.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import os
import json
from enum import Enum
class IOType(Enum):
local = 0
xrootd = 1
gfal = 2
class PMSJob:
def __init__(self):
self.job = {
"exe_args": [],
"input": {
"files": []
},
"output": None,
"tags": [],
}
# sets the user
def SetUser(self, user):
self.job["user"] = user
# set the executable
def SetExecutable(self, exe):
self.job["executable"] = exe
# add a setenv script
def AddSetenvScript(self, setenv):
if not "env" in self.job:
self.job["env"] = {}
self.job["env"]["type"] = "script"
self.job["env"]["file"] = setenv
def AddInputTransfer(self, protocol, filename, source):
self.job["input"]["files"].append({
"protocol": protocol.name,
"file": filename,
"source": source
})
def AddOutputTransferWithTag(self, protocol, filename, destination, tag):
if self.job["output"] != None and not isinstance(self.job["output"], list):
print("Error: non-tagged file transfer already set up for this job")
exit
if self.job["output"] == None:
self.job["output"] = []
matched_items = [item for item in self.job["output"] if item["tag"] == tag]
if len(matched_items) > 1:
raise RuntimeError("Two output file transfers with the same tag. Should not happen.")
elif len(matched_items) == 0:
self.job["output"].append({
"tag": tag,
"files": [{
"protocol": protocol.name,
"file": filename,
"destination": destination
}]
})
# we have exactly one match
else:
matched_items[0]["files"].append({
"protocol": protocol.name,
"file": filename,
"destination": destination
})
def AddOutputTransfer(self, protocol, filename, destination):
if self.job["output"] == None:
self.job["output"] = {
"files": []
}
self.job["output"]["files"].append({
"protocol": protocol.name,
"file": filename,
"destination": destination
})
# set files for stdout and stderr
def SetJobIO(self, name):
self.job["stdin"] = ""
self.job["stdout"] = f"{name}.out.log"
self.job["stderr"] = f"{name}.err.log"
def SetJobName(self, jobname):
self.job["jobName"] = jobname
# add a flag to the executable
def AddFlag(self, flag):
if not "exe_args" in self.job:
self.job["exe_args"] = []
self.job["exe_args"].append(flag)
def AddTags(self, *args):
self.job["tags"] += [tag for tag in args]
# add a generic key to the job description
# NB: use only if you really need it
def AddGenericKey(self, key, value):
self.job[key] = value
def AsDict(self):
return self.job
def AsJson(self):
return json.dumps(self.job, indent=2)