-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparse_and_check_output.py
276 lines (227 loc) · 9.14 KB
/
parse_and_check_output.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# -*- coding: utf-8 -*-
"""
parse_and_check_output.py
More advanced version of parse_output.py based on aspsmatrix's matrix
Uses kernel rank to check the output, verify if the flux modes are EFMs
"""
import numpy as np
import scipy.linalg
import numpy as np
import numpy.linalg
import pandas
import re, sys
from aspsmatrix import AspSmatrix, metatool_file, sbml_file, file_collection
from datetime import timedelta
from copy import deepcopy
from functools import reduce
from scipy.optimize import linprog
from argparse import ArgumentParser
from mparser import parsehelper as ph
import warnings
import pickle
SBML = 1
MTTL = -1
PICKLE = 0
def enumerate_list(lst):
"""
Simple method enumerating a list and printing it to sysout
Params:
lst: Python list
"""
for i, elem in enumerate(lst):
print(i+1, ":", elem)
def is_kernel_dimension_one(matrix):
"""
Checks if the kernel of a matrix is of dimension one
Uses SVD to calculate the rank of the matrix
Params:
matrix: stoichiometric matrix
Returns:
boolean: True if the kernel is of dimension one, else False
"""
# utilise SVD à la place de Gaussian Elimination (Klamt, 2005)
rank = numpy.linalg.matrix_rank(matrix)
# print("forme matrice", matrix.shape, "rang", rank)
if (rank == matrix.shape[1] - 1): # nb columns aka nb reactions
return True
return False
def support_as_boolean(support, reacidx):
"""
Converts support to a boolean table matching the network file matrix
Params:
support: set of string names, support of the reaction, from clingoLP output
reacidx: hash map associating reaction names to indices, from network file
Returns:
booltable: table of booleans:
True if the reaction is in the support, else False
"""
booltable = []
itersup = support.copy()
for val in reacidx: # for each reaction name
match = val in itersup # boolean
booltable.append(match)
if match:
itersup.remove(val)
if itersup: # if support set is not empty
raise ValueError("Reactions do not match between clingoLP and network file")
return booltable
def get_efms_from_solutions(sols, matrix, reacidx):
"""
Gets all EFMs from a list of solutions
Uses the stoichiometric matrix
Tests for each solution if the kernel of the submatrix is dimension one
The submatrix is the stoichiometric matrix minus the inactive reactions columns
Params:
sols: Python List of solutions
matrix: NumPy Array, stoichiometric matrix
reacidx: Python Dict of names corresponding to matrix indices
Returns:
efms: Python List of EFMs
"""
efms = []
for support in sols:
if is_efm(support, matrix):
efms.append(support)
return efms
def is_efm(support, matrix):
"""
Checks if a support reaction names set is an elementary flux mode
Params:
fluxv: a flux vector returned by clingo[LP], as pandas dataframe
matrix: original matrix, to check flux vectors minimality
AspSmatrix format
Returns:
status: boolean, true if it is an efm else false
"""
boolsupp = support_as_boolean(support, matrix.reactions_index())
submatrix = matrix.matrix[:,boolsupp]
return is_kernel_dimension_one(submatrix)
def str_is_efm(fluxv, matrix):
"""
Checks if a flux vector is an elementary flux mode
Params:
fluxv: a flux vector returned by clingo[LP], as pandas dataframe
matrix: original matrix, to check flux vectors minimality
AspSmatrix format (optional)
Returns:
status: "unknown" if matrix is None,
"true" if fluxv is an elementary mode, else "false"
"""
if fluxv.empty:
raise ValueError("Empty flux vector dataframe")
if matrix is None:
return "unknown"
else:
support = list(map(ph.smart_remove_quotes, fluxv.index.values))
support = set(list(map(ph.smart_remove_revs, support)))
return str(is_efm(support, matrix)).lower()
def get_flux_vectors(fname):
"""
Reads the Clingo[LP] output and gets the flux vector for each solution
TODO: Not use this function and get answers with clingo API instead
Params:
fname: input file name, output from Clingo[LP]
Returns:
fluxes: list of flux vectors
names: list of reaction or enzyme subset names
"""
f = open(fname)
lines = f.readlines()
fluxes = []
names = []
# uses the fact that the min function is 0 and always returns 0
# after beginning brace index
bb_idx = len('(0.0, {')
# this filtering is a bit too specific to be reused...
lines = filter(lambda w: w.startswith("(0.0,"), lines)
spl = None
length = -1
for i, l in enumerate(lines):
# end brace index
eb_idx = l.rindex('})')
# remove beginning and ending braces
l = l[bb_idx:eb_idx]
spl = l.split(",")
try:
ls = list(map(lambda w: float(w.split(":")[1]), spl))
assert(length == -1 or length == len(spl))
fluxes.append(np.array(ls))
except (IndexError, AssertionError):
warnings.warn(f'formatting defect on answer {i+1}')
length = len(spl)
if spl is None:
raise ValueError() # Clingo LP output format error
names = spl
def get_word_in_quotes(e):
res = re.compile(r'flux\(("?.*)"?\)').search(e)
return "None" if res is None else res.group(1)
names = list(map(get_word_in_quotes, names))
f.close()
return fluxes, names
def write_flux_vectors(inname, outname, checkfile, pklfile, mcfmfile, format, jsonfile, aspfile):
"""
Writes the flux vectors retrieved from a clingo[LP] output
Params:
inname: input file name, output from Clingo[LP]
outname: file containing all elementary flux modes in text format
checkfile: original network file, to check flux vectors minimality (optional)
pklfile: additional file to store flux modes in pickle format (optional)
mcfmfile: additional file to store non elementary flux modes indices in pickle format (optional)
format: constant indicating if network file is in SBML format, METATOOL format or other
jsonfile: additional file to store the last elementary flux mode in json format (optional)
aspfile: additional file to store answer sets as an Answer Set Programming program
"""
fluxes, names = get_flux_vectors(inname)
fvs = np.vstack(fluxes)
pfvs = pandas.DataFrame(data=fvs, columns=names)
pfvs = pfvs.reindex(sorted(pfvs.columns), axis=1)
matrix = None
if checkfile is not None:
try:
#format_file = sbml_file if sbml else metatool_file
if format == SBML:
matrix = AspSmatrix(sbml_file(checkfile))
elif format == MTTL:
matrix = AspSmatrix(metatool_file(checkfile))
else:
with open(checkfile, 'rb') as f:
data = pickle.load(f)
matrix = AspSmatrix(data, from_files=False)
except IOError:
print('File not found')
if pklfile is not None:
with open(pklfile, 'wb') as pkl:
pickle.dump(pfvs, pkl)
is_efm_ss = []
fluxv = None
with open(outname, "w") as f:
for i in range(len(fvs)):
fluxv = pfvs.loc[i, pfvs.loc[i]>0]
f.write(str(fluxv))
is_efm_str = str_is_efm(fluxv, matrix)
is_efm_ss.append(is_efm_str)
f.write(', elementary mode: ' + is_efm_str)
f.write('\n\n')
if mcfmfile is not None:
is_efm_false = [i for i, x in enumerate(is_efm_ss) if x == 'false']
with open(mcfmfile, 'wb') as pkl:
pickle.dump(is_efm_false, pkl)
if jsonfile is not None:
with open(jsonfile, 'w') as j:
fluxv.to_json(j)
if aspfile is not None:
raise NotImplementedError
if __name__== "__main__":
parser = ArgumentParser()
parser.add_argument('infile', metavar='clingoLP.file', help='Input file name')
parser.add_argument('outfile', metavar='output.file', help='Output file name')
parser.add_argument('--check', metavar='network.file', help='Check flux modes')
parser.add_argument('--pickle', metavar='pickle.file', help='Store flux modes with pickle')
parser.add_argument('--mcfm', metavar='mcfm.file', help='Store non elementary modes with pickle')
parser.add_argument('--sbml', action='store_true', help='If network file is in SBML format')
parser.add_argument('--metatool', action='store_true', help='If network file is in Metatool format')
parser.add_argument('--json', metavar='json.file', help='Store flux modes in json')
parser.add_argument('--solutions', metavar='asp.file', help='Store answer sets as an ASP program')
opts = parser.parse_args()
format = SBML if opts.sbml else MTTL if opts.metatool else PICKLE
write_flux_vectors(opts.infile, opts.outfile, opts.check, opts.pickle, opts.mcfm, format, opts.json, opts.solutions)