Skip to content

Commit f22493e

Browse files
authored
Merge pull request #272 from broadinstitute/development
Release 1.21.0
2 parents 70c35b5 + 71fc658 commit f22493e

File tree

9 files changed

+529
-46
lines changed

9 files changed

+529
-46
lines changed

Dockerfile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@ FROM marketplace.gcr.io/google/ubuntu1804:latest
1616

1717
# Install Python 3.7
1818
RUN apt-get -y update && \
19-
apt -y install software-properties-common && \
19+
apt-get -y install software-properties-common && \
2020
add-apt-repository ppa:deadsnakes/ppa && \
21-
apt -y install python3-pip && \
22-
apt -y install python3.7 && \
23-
apt -y install python3.7-dev
21+
apt-get -y install python3-pip && \
22+
apt-get -y install python3.7 && \
23+
apt-get -y install python3.7-dev
2424

2525
RUN python3.7 -m pip install --upgrade pip
2626

ingest/mongo_connection.py

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from __future__ import annotations
12
import os
23
import functools
34
import time
@@ -76,40 +77,46 @@ def wrapper(*args, **kwargs):
7677
else:
7778
raise e
7879
except BulkWriteError as bwe:
79-
8080
if attempt < MAX_ATTEMPTS - 1:
8181
dev_logger.warning(
8282
"Batch ops error occurred. Reinsert attempt %s.", str(attempt)
8383
)
84-
error_docs = bwe.details["writeErrors"]
84+
error = bwe.details["writeErrors"]
8585
# Check error code to see if any failures are due to violating a unique index (error code 11000)
8686
# and discard those documents before retrying
87-
filtered_docs = discard_inserted_documents(error_docs, args[0])
87+
filtered_docs = discard_inserted_documents(error, args[0])
8888
if len(filtered_docs) > 0:
8989
args[0] = filtered_docs
9090
retry(attempt)
9191
else:
9292
return args[0]
9393
else:
94-
dev_logger.debug(str(bwe.details))
94+
log_error_without_values(bwe)
9595
raise bwe
9696

9797
return wrapper
9898

99+
def log_error_without_values(error):
100+
"""Remove 'values' array from log messages as this is usually very long and makes logs difficult to read
99101
100-
def discard_inserted_documents(error_documents, original_documents):
101-
"""Discard any documents that have already been inserted which are violating index constraints
102-
such documents will have an error code of 11000 for a DuplicateKey error
103-
from https://github.com/mongodb/mongo/blob/master/src/mongo/base/error_codes.yml#L467
102+
:param error: BulkWriteError from failed transaction
103+
"""
104+
for entry in error.details["writeErrors"]:
105+
if 'values' in entry['op']:
106+
entry['op']['values'] = '<filtered>'
107+
dev_logger.debug(str(error.details))
104108

105-
Parameters:
106-
error_documents (List[Dict]): list of documents that failed to insert in original transaction
107-
original_documents (List[Dict]): list of documents from original transaction that failed
108-
error_code (Int): error status code to filter on
109+
def discard_inserted_documents(errors, original_documents) -> list[dict]:
110+
"""Discard any documents that have already been inserted which are violating index constraints
111+
such documents will have an error code of 11000 for a DuplicateKey error
112+
from https://github.com/mongodb/mongo/blob/master/src/mongo/base/error_codes.yml#L467
109113
110-
Returns:
111-
List[Dict]: list of documents with matching error code entries removed
114+
:param errors: (list[dict]) Documents that failed to insert from transaction
115+
:param original_documents: (list[dict]) list of documents from original transaction that failed
116+
:returns list[dict]: list of documents with existing entries removed
112117
"""
113-
# doc['op'] returns the actual document from the previous transaction
114-
errors = list(doc['op'] for doc in error_documents if doc['code'] == 11000)
115-
return list(doc for doc in original_documents if doc not in errors)
118+
error_docs = []
119+
for doc in errors:
120+
if doc['code'] == 11000:
121+
error_docs.append({'name': doc['op']['name'], 'array_index': doc['op']['array_index']})
122+
return list(d for d in original_documents if {'name': d['name'], 'array_index': d['array_index']} not in error_docs)
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
"""expression_writer.py
2+
extract gene-level expression data from dense/sparse matrix files and process in the
3+
context of a cluster file's cells.
4+
this mimics the `expression` data array in visualization API responses
5+
6+
EXAMPLES
7+
8+
dense matrix:
9+
python3 expression_writer.py --matrix-file ../../tests/data/dense_expression_matrix.txt \
10+
--cluster-file ../../tests/data/cluster_example.txt \
11+
--cluster-name 'Dense Example'
12+
13+
sparse matrix:
14+
python3 expression_writer.py --matrix-file ../../tests/data/mtx/matrix_with_header.mtx \
15+
--genes-file ../../tests/data/mtx/sampled_genes.tsv \
16+
--barcodes-file ../../tests/data/mtx/barcodes.tsv \
17+
--cluster-file ../../tests/data/mtx/cluster_mtx_barcodes.tsv \
18+
--cluster-name 'Sparse Example'
19+
"""
20+
21+
import os
22+
import re
23+
import argparse
24+
import time
25+
import multiprocessing
26+
import sys
27+
from functools import partial
28+
from writer_functions import round_exp, open_file, make_data_dir, get_matrix_size, get_cluster_cells,\
29+
load_entities_as_list, read_sparse_matrix_slice, process_sparse_fragment, write_gene_scores,\
30+
process_dense_line, COMMA_OR_TAB
31+
32+
class ExpressionWriter:
33+
denominator = 2 if re.match('darwin', sys.platform) else 1
34+
num_cores = int(multiprocessing.cpu_count() / denominator) - 1
35+
print(f"num_cores: {num_cores}")
36+
37+
def __init__(
38+
self, matrix_file_path, cluster_file_path, cluster_name, genes_file, barcodes_file
39+
):
40+
self.matrix_file_path = matrix_file_path
41+
self.cluster_file_path = cluster_file_path
42+
self.cluster_name = cluster_name
43+
self.genes_file = genes_file
44+
self.barcodes_file = barcodes_file
45+
46+
def get_file_seek_points(self):
47+
"""
48+
Determine start/stop points in a matrix to process in parallel
49+
Will read in chunks and return a list of start/stop points
50+
Ensures breaks on newlines
51+
52+
:return: (List<List>)
53+
"""
54+
file_size = get_matrix_size(self.matrix_file_path)
55+
chunk_size = int(file_size / self.num_cores)
56+
print(f"determining seek points for {self.matrix_file_path} with chunk size {chunk_size}")
57+
with open_file(self.matrix_file_path) as matrix_file:
58+
# fast-forward through any comments if this is a sparse matrix
59+
first_char = matrix_file.read(1)
60+
header_lines = 3 if first_char == '%' else 1
61+
for i in range(header_lines):
62+
matrix_file.readline()
63+
current_pos = matrix_file.tell()
64+
current_seek = [current_pos]
65+
seek_points = []
66+
for index in range(self.num_cores):
67+
seek_point = current_pos + chunk_size
68+
matrix_file.seek(seek_point)
69+
current_byte = matrix_file.read(1)
70+
if current_byte == '': # eof
71+
current_seek.append(file_size)
72+
seek_points.append(current_seek)
73+
break
74+
while current_byte != "\n":
75+
current_byte = matrix_file.read(1)
76+
seek_point += 1
77+
current_seek.append(seek_point)
78+
seek_points.append(current_seek)
79+
current_pos = seek_point + 1
80+
current_seek = [current_pos]
81+
return seek_points
82+
83+
def divide_sparse_matrix(self, genes, data_dir):
84+
"""
85+
Slice a sparse matrix into 1GB chunks and write out individual
86+
gene-level files in parallel
87+
88+
:param genes: (List) gene names from features file
89+
:param data_dir: (String) name out output dir
90+
"""
91+
print(f"loading sparse data from {self.matrix_file_path}")
92+
slice_indexes = self.get_file_seek_points()
93+
pool = multiprocessing.Pool(self.num_cores)
94+
processor = partial(read_sparse_matrix_slice,
95+
matrix_file_path=self.matrix_file_path, genes=genes, data_dir=data_dir)
96+
pool.map(processor, slice_indexes)
97+
98+
99+
def process_sparse_data_fragments(self, barcodes, cluster_cells, cluster_name, data_dir):
100+
"""
101+
Find and process all generated single-gene sparse data fragments
102+
103+
:param barcodes: (List) list of cell barcodes
104+
:param cluster_cells: (List) list of cells from cluster file
105+
:param cluster_name: (String) name of cluster object
106+
:param data_dir: (String) name out output dir
107+
"""
108+
fragments = os.listdir(f"{data_dir}/gene_entries")
109+
print(f"subdivision complete, processing {len(fragments)} fragments")
110+
pool = multiprocessing.Pool(self.num_cores)
111+
processor = partial(process_sparse_fragment,
112+
barcodes=barcodes, cluster_cells=cluster_cells,
113+
cluster_name=cluster_name, data_dir=data_dir)
114+
pool.map(processor, fragments)
115+
116+
def write_empty_sparse_genes(self, genes, num_cluster_cells, cluster_name, data_dir):
117+
"""
118+
Write out empty arrays of expression values for genes with no significant expression in a sparse matrix
119+
120+
:param genes: (List) gene names from features file
121+
:param num_cluster_cells: (Integer) number of cells from cluster file
122+
:param cluster_name: (String) name of cluster object
123+
:param data_dir: (String) name out output dir
124+
"""
125+
gene_fragments = filter(lambda file: file[0] != '.', os.listdir(f"{data_dir}/gene_entries"))
126+
significant_genes = set([gene.split('__')[0] for gene in gene_fragments])
127+
missing_genes = [gene for gene in genes if gene not in significant_genes]
128+
empty_expression = [0] * num_cluster_cells
129+
pool = multiprocessing.Pool(self.num_cores)
130+
processor = partial(write_gene_scores,
131+
cluster_name=cluster_name, exp_values=empty_expression, data_dir=data_dir)
132+
pool.map(processor, missing_genes)
133+
134+
def process_dense_data(self, cluster_cells, cluster_name, data_dir):
135+
"""
136+
Main handler to read dense matrix data and process entries at the gene level
137+
138+
:param cluster_cells: (List) cell names from cluster file
139+
:param cluster_name: (String) name of cluster object
140+
:param data_dir: (String) name out output dir
141+
"""
142+
pool = multiprocessing.Pool(self.num_cores)
143+
slice_indexes = self.get_file_seek_points()
144+
matrix_file = open_file(self.matrix_file_path)
145+
header = matrix_file.readline().rstrip()
146+
values = re.split(COMMA_OR_TAB, header)
147+
cells = values[1:]
148+
processor = partial(
149+
self.read_dense_matrix_slice,
150+
matrix_cells=cells, cluster_cells=cluster_cells, cluster_name=cluster_name, data_dir=data_dir
151+
)
152+
pool.map(processor, slice_indexes)
153+
154+
def read_dense_matrix_slice(self, indexes, matrix_cells, cluster_cells, cluster_name, data_dir):
155+
"""
156+
Read a dense matrix using start/stop indexes and create to individual gene-level files
157+
:param indexes: (List) start/stop index points to read from/to
158+
:param matrix_cells: (List) cell names from matrix file
159+
:param cluster_cells: (List) cell names from cluster file
160+
:param cluster_name: (String) name of cluster object
161+
:param data_dir: (String) name out output dir
162+
"""
163+
start_pos, end_pos = indexes
164+
print(f"reading {self.matrix_file_path} at index {start_pos}:{end_pos}")
165+
with open_file(self.matrix_file_path) as matrix_file:
166+
current_pos = start_pos
167+
matrix_file.seek(current_pos)
168+
while current_pos < end_pos:
169+
line = matrix_file.readline()
170+
process_dense_line(line, matrix_cells, cluster_cells, cluster_name, data_dir)
171+
current_pos += len(line)
172+
173+
def main(self):
174+
"""
175+
Main handler, determines type of processing to execute (dense vs. sparse)
176+
"""
177+
cluster_file_path = args.cluster_file
178+
expression_file_path = args.matrix_file
179+
sanitized_cluster_name = re.sub(r'\W', '_', args.cluster_name)
180+
data_dir = make_data_dir(sanitized_cluster_name)
181+
cluster_cells = get_cluster_cells(cluster_file_path)
182+
if self.genes_file is not None and self.barcodes_file is not None:
183+
print(f"reading {expression_file_path} as sparse matrix")
184+
genes_file = open_file(args.genes_file)
185+
genes = load_entities_as_list(genes_file)
186+
barcodes_file = open_file(args.barcodes_file)
187+
barcodes = load_entities_as_list(barcodes_file)
188+
self.divide_sparse_matrix(genes, data_dir)
189+
self.write_empty_sparse_genes(genes, len(cluster_cells), sanitized_cluster_name, data_dir)
190+
self.process_sparse_data_fragments(barcodes, cluster_cells, sanitized_cluster_name, data_dir)
191+
else:
192+
print(f"reading {expression_file_path} as dense matrix")
193+
self.process_dense_data(cluster_cells, sanitized_cluster_name, data_dir)
194+
195+
if __name__ == '__main__':
196+
start_time = time.time()
197+
parser = argparse.ArgumentParser()
198+
parser.add_argument('--cluster-file', help='path to cluster file', required=True)
199+
parser.add_argument('--cluster-name', help='name of cluster object', required=True)
200+
parser.add_argument('--matrix-file', help='path to matrix file', required=True)
201+
parser.add_argument('--genes-file', help='path to genes file (None for dense matrix files)')
202+
parser.add_argument('--barcodes-file', help='path to barcodes file (None for dense matrix files)')
203+
args = parser.parse_args()
204+
expression_file = args.matrix_file
205+
cluster_file = args.cluster_file
206+
writer = ExpressionWriter(
207+
args.matrix_file, args.cluster_file, args.cluster_name, args.genes_file, args.barcodes_file
208+
)
209+
writer.main()
210+
end_time = time.time()
211+
total_time = end_time - start_time
212+
time_in_min = round_exp(total_time, 3) / 60
213+
print(f"completed, total runtime in minutes: {time_in_min}")

0 commit comments

Comments
 (0)