Skip to content

Commit f1ba08e

Browse files
authored
Merge pull request #273 from broadinstitute/jb-exp-writer-integration
Integrating `ExpressionWriter` into `IngestPipeline` (SCP-4648)
2 parents 71fc658 + 545c4a6 commit f1ba08e

20 files changed

+954
-434
lines changed

.circleci/config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ jobs:
1111
build:
1212
docker:
1313
- image: circleci/python:3.7.5-stretch
14+
resource_class: large
1415

1516
working_directory: ~/scp-ingest-pipeline
1617

README.md

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,49 @@ pytest test_ingest.py
128128
# Run all tests, show code coverage metrics
129129
pytest --cov=../ingest/
130130
```
131-
132131
For more, see <https://docs.pytest.org/en/stable/usage.html>.
133132

133+
## Testing in Docker
134+
If you have difficulties installing and configuring `scp-ingest-pipeline` due to hardware issues (e.g. Mac M1 chips),
135+
you can alternatively test locally by building the Docker image and then running any commands inside the container.
136+
There are some extra steps required, but this sidesteps the need to install packages locally.
137+
138+
### 1. Build the image
139+
Run the following command to build the testing Docker image locally (make sure Docker is running first):
140+
```
141+
docker build -t gcr.io/broad-singlecellportal-staging/ingest-pipeline:test-candidate .
142+
```
143+
### 2. Set up environment variables
144+
Run the following to pull database-specific secrets out of vault (passing in the path to your vault token):
145+
```
146+
source scripts/setup_mongo_dev.sh ~/.your-vault-token
147+
```
148+
Now run `env` to make sure you've set the following values:
149+
```
150+
MONGODB_USERNAME=single_cell
151+
DATABASE_NAME=single_cell_portal_development
152+
MONGODB_PASSWORD=<password>
153+
DATABASE_HOST=<ip address>
154+
```
155+
### 3. Print out your service account keyfile
156+
Run the following to export out your default service account JSON keyfile:
157+
```
158+
vault read -format=json secret/kdux/scp/development/$(whoami)/scp_service_account.json | jq .data > /tmp/keyfile.json
159+
```
160+
### 4. Start the Docker container
161+
Run the container, passing in the proper environment variables:
162+
```
163+
docker run --name scp-ingest-test -e MONGODB_USERNAME="$MONGODB_USERNAME" -e DATABASE_NAME="$DATABASE_NAME" \
164+
-e MONGODB_PASSWORD="$MONGODB_PASSWORD" -e DATABASE_HOST="$DATABASE_HOST" \
165+
-e GOOGLE_APPLICATION_CREDENTIALS=/tmp/keyfile.json --rm -it \
166+
gcr.io/broad-singlecellportal-staging/ingest-pipeline:test-candidate bash
167+
```
168+
### 5. Copy keyfile to running container
169+
In a separate terminal window, copy the JSON keyfile from above to the expected location:
170+
```
171+
docker cp /tmp/keyfile.json scp-ingest-test:/tmp
172+
```
173+
You can now run any `ingest_pipeline.py` command you wish inside the container.
134174
# Use
135175

136176
Run this every time you start a new terminal to work on this project:

ingest/cli_parser.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,34 @@ def create_parser():
347347
"--anndata-file", required=True, help="Path to AnnData file"
348348
)
349349

350+
parser_expression_writer = subparsers.add_parser(
351+
"render_expression_arrays",
352+
help="Indicates preprocessing of cluster/expression files for image pipeline"
353+
)
354+
355+
parser_expression_writer.add_argument(
356+
'--render-expression-arrays', action="store_true", help='Invoke expression_writer.py', required=True
357+
)
358+
359+
parser_expression_writer.add_argument(
360+
'--cluster-file', help='path to cluster file', required=True
361+
)
362+
parser_expression_writer.add_argument(
363+
'--cluster-name', help='name of cluster object', required=True
364+
)
365+
parser_expression_writer.add_argument(
366+
'--matrix-file-path', help='path to matrix file', required=True
367+
)
368+
parser_expression_writer.add_argument(
369+
'--matrix-file-type', help='type to matrix file (dense or mtx)', required=True, choices=['dense', 'mtx']
370+
)
371+
parser_expression_writer.add_argument(
372+
'--gene-file', help='path to gene file (omit for dense matrix files)'
373+
)
374+
parser_expression_writer.add_argument(
375+
'--barcode-file', help='path to barcode file (omit for dense matrix files)'
376+
)
377+
350378
return parser
351379

352380

ingest/expression_writer.py

Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
"""expression_writer.py
2+
extract gene-level expression data from dense/sparse matrix files and process in the
3+
context of a cluster file's cells.
4+
this mimics the `expression` data array in visualization API responses
5+
6+
EXAMPLES (must be invoked via ingest_pipeline.py)
7+
8+
dense matrix:
9+
python3 ingest_pipeline.py --study-id 5d276a50421aa9117c982845 --study-file-id 5dd5ae25421aa910a723a337 \
10+
render_expression_arrays --matrix-file-path ../tests/data/dense_expression_matrix.txt \
11+
--matrix-file-type dense \
12+
--cluster-file ../tests/data/cluster_example.txt \
13+
--cluster-name 'Dense Example' --render-expression-arrays
14+
15+
sparse matrix:
16+
python3 ingest_pipeline.py --study-id 5d276a50421aa9117c982845 --study-file-id 5dd5ae25421aa910a723a337 \
17+
render_expression_arrays --matrix-file-path ../tests/data/mtx/matrix_with_header.mtx \
18+
--matrix-file-type mtx \
19+
--gene-file ../tests/data/mtx/sampled_genes.tsv \
20+
--barcode-file ../tests/data/mtx/barcodes.tsv \
21+
--cluster-file ../tests/data/mtx/cluster_mtx_barcodes.tsv \
22+
--cluster-name 'Sparse Example' --render-expression-arrays
23+
"""
24+
from __future__ import annotations
25+
26+
import os
27+
import re
28+
import multiprocessing
29+
import sys
30+
import datetime
31+
from dateutil.relativedelta import relativedelta
32+
from functools import partial
33+
34+
try:
35+
from writer_functions import round_exp, encode_cluster_name, open_file, make_data_dir, get_matrix_size, \
36+
get_cluster_cells, load_entities_as_list, process_sparse_fragment, write_gene_scores, process_dense_line, \
37+
COMMA_OR_TAB
38+
from monitor import setup_logger
39+
from ingest_files import IngestFiles
40+
except ImportError:
41+
from .writer_functions import round_exp, encode_cluster_name, open_file, make_data_dir, get_matrix_size, \
42+
get_cluster_cells, load_entities_as_list, process_sparse_fragment, write_gene_scores, process_dense_line, \
43+
COMMA_OR_TAB
44+
from .monitor import setup_logger
45+
from .ingest_files import IngestFiles
46+
47+
class ExpressionWriter:
48+
DELOCALIZE_FOLDER = "_scp_internal/cache/expression_scatter/data"
49+
denominator = 2 if re.match('darwin', sys.platform) else 1
50+
num_cores = int(multiprocessing.cpu_count() / denominator) - 1
51+
52+
def __init__(
53+
self, matrix_file_path, matrix_file_type, cluster_file_path, cluster_name, gene_file, barcode_file, **kwargs
54+
):
55+
self.matrix_file_path = matrix_file_path
56+
self.matrix_file_type = matrix_file_type
57+
self.cluster_file_path = cluster_file_path
58+
self.cluster_name = cluster_name
59+
self.gene_file = gene_file
60+
self.barcode_file = barcode_file
61+
62+
# localize main files, if needed
63+
self.local_matrix_path = open_file(self.matrix_file_path)[1]
64+
self.local_cluster_path = open_file(self.cluster_file_path)[1]
65+
66+
# set storage bucket name, if needed
67+
self.bucket = self.get_storage_bucket_name()
68+
69+
timestamp = datetime.datetime.now().isoformat(sep="T", timespec="seconds")
70+
url_safe_timestamp = re.sub(':', '', timestamp)
71+
log_name = f"expression_scatter_data_{url_safe_timestamp}_log.txt"
72+
self.dev_logger = setup_logger(__name__, log_name, format="support_configs")
73+
74+
def get_storage_bucket_name(self):
75+
"""
76+
get GCS storage bucket name, if available
77+
"""
78+
path_header = self.matrix_file_path[:5]
79+
if path_header == 'gs://':
80+
path_segments = self.matrix_file_path[5:].split("/")
81+
bucket_name = path_segments[0]
82+
return f"{path_header}{bucket_name}"
83+
84+
def get_file_seek_points(self) -> list[list]:
85+
"""
86+
Determine start/stop points in a matrix to process in parallel
87+
Will read in chunks and return a list of start/stop points
88+
Ensures breaks on newlines
89+
90+
:returns: list[list]
91+
"""
92+
file_size = get_matrix_size(self.local_matrix_path)
93+
chunk_size = int(file_size / self.num_cores)
94+
self.dev_logger.info(
95+
f" determining seek points for {self.local_matrix_path} with chunk size {chunk_size}"
96+
)
97+
with open_file(self.local_matrix_path)[0] as matrix_file:
98+
# fast-forward through any comments if this is a sparse matrix
99+
first_char = matrix_file.read(1)
100+
header_lines = 3 if first_char == '%' else 1
101+
for i in range(header_lines):
102+
matrix_file.readline()
103+
current_pos = matrix_file.tell()
104+
current_seek = [current_pos]
105+
seek_points = []
106+
for index in range(self.num_cores):
107+
seek_point = current_pos + chunk_size
108+
matrix_file.seek(seek_point)
109+
current_byte = matrix_file.read(1)
110+
if current_byte == '': # eof
111+
current_seek.append(file_size)
112+
seek_points.append(current_seek)
113+
break
114+
while current_byte != "\n":
115+
current_byte = matrix_file.read(1)
116+
seek_point += 1
117+
current_seek.append(seek_point)
118+
seek_points.append(current_seek)
119+
current_pos = seek_point + 1
120+
current_seek = [current_pos]
121+
return seek_points
122+
123+
def divide_sparse_matrix(self, genes, data_dir):
124+
"""
125+
Slice a sparse matrix into 1GB chunks and write out individual
126+
gene-level files in parallel
127+
128+
:param genes: (list) gene names from features file
129+
:param data_dir: (str) name of output dir
130+
"""
131+
self.dev_logger.info(f" loading sparse data from {self.local_matrix_path}")
132+
slice_indexes = self.get_file_seek_points()
133+
pool = multiprocessing.Pool(self.num_cores)
134+
processor = partial(self.read_sparse_matrix_slice, genes=genes, data_dir=data_dir)
135+
pool.map(processor, slice_indexes)
136+
137+
def read_sparse_matrix_slice(self, indexes, genes, data_dir):
138+
"""
139+
Read a sparse matrix using start/stop indexes and append to individual gene-level files
140+
141+
:param indexes: (list) start/stop index points to read from/to
142+
:param genes: (list) gene names from features file
143+
:param data_dir: (str) name of output dir
144+
"""
145+
start_pos, end_pos = indexes
146+
self.dev_logger.info(f"reading {self.local_matrix_path} at index {start_pos}:{end_pos}")
147+
with open_file(self.local_matrix_path)[0] as matrix_file:
148+
current_pos = start_pos
149+
matrix_file.seek(current_pos)
150+
while current_pos < end_pos:
151+
line = matrix_file.readline()
152+
gene_idx = int(line.split()[0])
153+
gene_name = genes[gene_idx - 1]
154+
fragment_path = f"{data_dir}/gene_entries/{gene_name}__entries.txt"
155+
with open(fragment_path, 'a+') as file:
156+
file.write(line)
157+
current_pos += len(line)
158+
159+
def process_sparse_data_fragments(self, barcodes, cluster_cells, data_dir):
160+
"""
161+
Find and process all generated single-gene sparse data fragments
162+
163+
:param barcodes: (list) list of cell barcodes
164+
:param cluster_cells: (list) list of cells from cluster file
165+
:param data_dir: (str) name of output dir
166+
"""
167+
fragments = os.listdir(f"{data_dir}/gene_entries")
168+
self.dev_logger.info(f" subdivision complete, processing {len(fragments)} fragments")
169+
pool = multiprocessing.Pool(self.num_cores)
170+
processor = partial(process_sparse_fragment, barcodes=barcodes, cluster_cells=cluster_cells, data_dir=data_dir)
171+
pool.map(processor, fragments)
172+
173+
def write_empty_sparse_genes(self, genes, num_cluster_cells, data_dir):
174+
"""
175+
Write out empty arrays of expression values for genes with no significant expression in a sparse matrix
176+
177+
:param genes: (list) gene names from features file
178+
:param num_cluster_cells: (Integer) number of cells from cluster file
179+
:param data_dir: (str) name of output dir
180+
"""
181+
gene_fragments = filter(lambda file: file[0] != '.', os.listdir(f"{data_dir}/gene_entries"))
182+
significant_genes = set([gene.split('__')[0] for gene in gene_fragments])
183+
missing_genes = [gene for gene in genes if gene not in significant_genes]
184+
empty_expression = [0] * num_cluster_cells
185+
pool = multiprocessing.Pool(self.num_cores)
186+
processor = partial(write_gene_scores, exp_values=empty_expression, data_dir=data_dir)
187+
pool.map(processor, missing_genes)
188+
189+
def process_dense_data(self, cluster_cells, data_dir):
190+
"""
191+
Main handler to read dense matrix data and process entries at the gene level
192+
193+
:param cluster_cells: (list) cell names from cluster file
194+
:param data_dir: (str) name of output dir
195+
"""
196+
pool = multiprocessing.Pool(self.num_cores)
197+
slice_indexes = self.get_file_seek_points()
198+
matrix_file, local_matrix_path = open_file(self.matrix_file_path)
199+
header = matrix_file.readline().rstrip()
200+
values = re.split(COMMA_OR_TAB, header)
201+
cells = values[1:]
202+
processor = partial(
203+
self.read_dense_matrix_slice, matrix_cells=cells, cluster_cells=cluster_cells, data_dir=data_dir
204+
)
205+
pool.map(processor, slice_indexes)
206+
207+
def read_dense_matrix_slice(self, indexes, matrix_cells, cluster_cells, data_dir):
208+
"""
209+
Read a dense matrix using start/stop indexes and create to individual gene-level files
210+
:param indexes: (list) start/stop index points to read from/to
211+
:param matrix_cells: (list) cell names from matrix file
212+
:param cluster_cells: (list) cell names from cluster file
213+
:param data_dir: (str) name of output dir
214+
"""
215+
start_pos, end_pos = indexes
216+
self.dev_logger.info(f" reading {self.local_matrix_path} at index {start_pos}:{end_pos}")
217+
with open_file(self.local_matrix_path)[0] as matrix_file:
218+
current_pos = start_pos
219+
matrix_file.seek(current_pos)
220+
while current_pos < end_pos:
221+
line = matrix_file.readline()
222+
process_dense_line(line, matrix_cells, cluster_cells, data_dir)
223+
current_pos += len(line)
224+
225+
def render_artifacts(self):
226+
"""
227+
Main handler, determines type of processing to execute (dense vs. sparse)
228+
"""
229+
start_time = datetime.datetime.now()
230+
sanitized_cluster_name = encode_cluster_name(self.cluster_name)
231+
self.dev_logger.info(f" creating data directory at {sanitized_cluster_name}")
232+
make_data_dir(sanitized_cluster_name)
233+
cluster_cells = get_cluster_cells(self.local_cluster_path)
234+
if self.matrix_file_type == 'mtx' and self.gene_file is not None and self.barcode_file is not None:
235+
self.dev_logger.info(f" reading {self.matrix_file_path} as sparse matrix")
236+
self.dev_logger.info(f" reading entities from {self.gene_file}")
237+
genes_file = open_file(self.gene_file)[0]
238+
genes = load_entities_as_list(genes_file)
239+
self.dev_logger.info(f" reading entities from {self.barcode_file}")
240+
barcodes_file = open_file(self.barcode_file)[0]
241+
barcodes = load_entities_as_list(barcodes_file)
242+
self.divide_sparse_matrix(genes, sanitized_cluster_name)
243+
self.write_empty_sparse_genes(genes, len(cluster_cells), sanitized_cluster_name)
244+
self.process_sparse_data_fragments(barcodes, cluster_cells, sanitized_cluster_name)
245+
elif self.matrix_file_type == 'dense':
246+
self.dev_logger.info(f" reading {self.matrix_file_path} as dense matrix")
247+
self.process_dense_data(cluster_cells, sanitized_cluster_name)
248+
end_time = datetime.datetime.now()
249+
time_diff = relativedelta(end_time, start_time)
250+
self.dev_logger.info(
251+
f" completed, total runtime: {time_diff.hours}h, {time_diff.minutes}m, {time_diff.seconds}s"
252+
)
253+
self.delocalize_outputs(sanitized_cluster_name)
254+
255+
def delocalize_outputs(self, cluster_name):
256+
"""
257+
Copy all output files to study bucket in parallel using gsutil (since there are usually ~25-30K files)
258+
259+
:param cluster_name: (str) encoded name of cluster
260+
"""
261+
if self.bucket is not None:
262+
bucket_path = f"{self.DELOCALIZE_FOLDER}/{cluster_name}"
263+
self.dev_logger.info(f" pushing all output files to {bucket_path}")
264+
dir_files = os.listdir(cluster_name)
265+
files_to_push = list(file for file in dir_files if 'gene_entries' not in file)
266+
for file in files_to_push:
267+
local_path = f"{cluster_name}/{file}"
268+
IngestFiles.delocalize_file(None, None, self.matrix_file_path, local_path, f"{bucket_path}/{file}")
269+
self.dev_logger.info(" push completed")
270+
handler = self.dev_logger.handlers[0]
271+
log_filename = handler.baseFilename.split("/").pop()
272+
IngestFiles.delocalize_file(None, None, self.matrix_file_path, log_filename, f"parse_logs/{log_filename}")
273+

0 commit comments

Comments
 (0)