Skip to content

Commit 275ae16

Browse files
authored
Merge pull request #255 from broadinstitute/development
Release 1.18.0
2 parents 1ac85e0 + 1a63a27 commit 275ae16

File tree

10 files changed

+611
-46
lines changed

10 files changed

+611
-46
lines changed

.circleci/config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ jobs:
4747
command: |
4848
. venv/bin/activate
4949
cd tests
50-
pytest -k 'not test_genomes and not test_make_toy' --cov-report=xml --cov=../ingest/
50+
pytest -k 'not test_genomes and not test_make_toy and not test_delocalize_file' --cov-report=xml --cov=../ingest/
5151
5252
- codecov/upload:
5353
file: tests/coverage.xml

ingest/de.py

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
from email.headerregistry import Group
21
import logging
32
import numpy as np
43
import pandas as pd
54
import scanpy as sc
65
import re
6+
import glob
77

88
try:
99
from monitor import setup_logger, log_exception
@@ -124,9 +124,11 @@ def process_annots(metadata_file_path, allowed_file_types, headers, dtypes):
124124
"""
125125
annot_redux = IngestFiles(metadata_file_path, allowed_file_types)
126126
annot_file_type = annot_redux.get_file_type(metadata_file_path)[0]
127-
annot_file_handle = annot_redux.open_file(metadata_file_path)[1]
127+
annot_file_handle, local_file_path = IngestFiles.resolve_path(
128+
annot_redux, metadata_file_path
129+
)
128130
annots = annot_redux.open_pandas(
129-
metadata_file_path,
131+
local_file_path,
130132
annot_file_type,
131133
open_file_object=annot_file_handle,
132134
names=headers,
@@ -225,17 +227,24 @@ def get_genes(genes_path):
225227
If two columns present, check if there are duplicates in 2nd col
226228
If no duplicates, use as var_names, else use 1st column
227229
"""
228-
genes_df = pd.read_csv(genes_path, sep="\t", header=None)
230+
genes_object = IngestFiles(genes_path, None)
231+
local_genes_path = genes_object.resolve_path(genes_path)[1]
232+
233+
genes_df = pd.read_csv(local_genes_path, sep="\t", header=None)
229234
if len(genes_df.columns) > 1:
230235
# unclear if falling back to gene_id is useful (SCP-4283)
231236
# print so we're aware of dups during dev testing
232-
if genes_df[1].count() == genes_df[1].nunique():
233-
msg = "dev_info: Features file contains duplicate identifiers (col 2)"
237+
if genes_df[1].count() != genes_df[1].nunique():
238+
msg = (
239+
"dev_info: Features file contains duplicate identifiers in column 2"
240+
)
234241
print(msg)
235242
return genes_df[1].tolist()
236243
else:
237-
if genes_df[0].count() == genes_df[0].nunique():
238-
msg = "dev_info: Features file contains duplicate identifiers (col 1)"
244+
if genes_df[0].count() != genes_df[0].nunique():
245+
msg = (
246+
"dev_info: Features file contains duplicate identifiers in column 1"
247+
)
239248
print(msg)
240249
return genes_df[0].tolist()
241250

@@ -252,13 +261,29 @@ def get_barcodes(barcodes_path):
252261
def adata_from_mtx(matrix_file_path, genes_path, barcodes_path):
253262
""" reconstitute AnnData object from matrix, genes, barcodes files
254263
"""
255-
adata = sc.read_mtx(matrix_file_path)
264+
# process smaller files before reading larger matrix file
265+
barcodes = DifferentialExpression.get_barcodes(barcodes_path)
266+
features = DifferentialExpression.get_genes(genes_path)
267+
matrix_object = IngestFiles(matrix_file_path, None)
268+
local_file_path = matrix_object.resolve_path(matrix_file_path)[1]
269+
adata = sc.read_mtx(local_file_path)
256270
# For AnnData, obs are cells and vars are genes
257271
# BUT transpose needed for both dense and sparse
258272
# so transpose step is after this data object composition step
259273
# therefore the assignements below are the reverse of expected
260-
adata.var_names = DifferentialExpression.get_barcodes(barcodes_path)
261-
adata.obs_names = DifferentialExpression.get_genes(genes_path)
274+
adata.var_names = barcodes
275+
adata.obs_names = features
276+
return adata
277+
278+
@staticmethod
279+
def remove_single_sample_data(adata, annotation):
280+
""" identify and remove cells that would constitute an annotation label
281+
that has data with only a single sample
282+
"""
283+
counts = adata.obs[annotation].value_counts(dropna=False)
284+
for label, count in counts.iteritems():
285+
if count == 1:
286+
adata = adata[adata.obs[annotation] != label]
262287
return adata
263288

264289
@staticmethod
@@ -285,7 +310,9 @@ def run_scanpy_de(
285310

286311
if matrix_file_type == "dense":
287312
# will need try/except (SCP-4205)
288-
adata = sc.read(matrix_file_path)
313+
matrix_object = IngestFiles(matrix_file_path, None)
314+
local_file_path = matrix_object.resolve_path(matrix_file_path)[1]
315+
adata = sc.read(local_file_path)
289316
else:
290317
# MTX reconstitution UNTESTED (SCP-4203)
291318
# will want try/except here to catch failed data object composition
@@ -300,6 +327,8 @@ def run_scanpy_de(
300327
# will need try/except (SCP-4205)
301328
adata.obs = DifferentialExpression.order_annots(de_annots, adata.obs_names)
302329

330+
adata = DifferentialExpression.remove_single_sample_data(adata, annotation)
331+
303332
sc.pp.normalize_total(adata, target_sum=1e4)
304333
sc.pp.log1p(adata)
305334
DifferentialExpression.de_logger.info("calculating DE")
@@ -348,3 +377,24 @@ def run_scanpy_de(
348377

349378
DifferentialExpression.de_logger.info("DE processing complete")
350379

380+
@staticmethod
381+
def string_for_output_match(arguments):
382+
cleaned_cluster_name = re.sub(r'\W+', '_', arguments["cluster_name"])
383+
cleaned_annotation_name = re.sub(r'\W+', '_', arguments["annotation_name"])
384+
files_to_match = f"{cleaned_cluster_name}--{cleaned_annotation_name}*.tsv"
385+
return files_to_match
386+
387+
@staticmethod
388+
def delocalize_de_files(file_path, study_file_id, files_to_match):
389+
""" Copy DE output files to study bucket
390+
"""
391+
392+
files = glob.glob(files_to_match)
393+
for file in files:
394+
IngestFiles.delocalize_file(
395+
study_file_id,
396+
None,
397+
file_path,
398+
file,
399+
f"_scp_internal/differential_expression/{file}",
400+
)

ingest/ingest_files.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import re
1212
from dataclasses import dataclass
1313
from typing import Dict, Generator, List, Tuple, Union # noqa: F401
14+
import warnings
15+
1416

1517
import pandas as pd # NOqa: F821
1618
from google.cloud import storage
@@ -73,6 +75,10 @@ class IngestFiles:
7375
# General logger for class
7476
# Logger provides more details
7577
dev_logger = setup_logger(__name__, "log.txt", format="support_configs")
78+
# Filter out warnings about using end user credentials when running ingest_pipeline as dev
79+
warnings.filterwarnings(
80+
"ignore", "Your application has authenticated using end user credentials"
81+
)
7682

7783
def __init__(self, file_path, allowed_file_types):
7884
self.file_path = file_path

ingest/ingest_pipeline.py

Lines changed: 58 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,20 @@
3434
3535
# Ingest mtx files
3636
python ingest_pipeline.py --study-id 5d276a50421aa9117c982845 --study-file-id 5dd5ae25421aa910a723a337 ingest_expression --taxon-name 'Homo sapiens' --taxon-common-name human --matrix-file ../tests/data/mtx/matrix.mtx --matrix-file-type mtx --gene-file ../tests/data/genes.tsv --barcode-file ../tests/data/barcodes.tsv
37+
38+
# Differential Expression analysis (dense matrix)
39+
python ingest_pipeline.py --study-id addedfeed000000000000000 --study-file-id dec0dedfeed1111111111111 differential_expression --annotation-name cell_type__ontology_label --annotation-type group --annotation-scope study --matrix-file-path ../tests/data/differential_expression/de_integration.tsv --matrix-file-type dense --annotation-file ../tests/data/differential_expression/de_integration_unordered_metadata.tsv --cluster-file ../tests/data/differential_expression/de_integration_cluster.tsv --cluster-name de_integration --study-accession SCPdev --differential-expression
40+
41+
# Differential Expression analysis (sparse matrix)
42+
python ingest_pipeline.py --study-id addedfeed000000000000000 --study-file-id dec0dedfeed1111111111111 differential_expression --annotation-name cell_type__ontology_label --annotation-type group --annotation-scope study --matrix-file-path ../tests/data/differential_expression/sparse/sparsemini_matrix.mtx --gene-file ../tests/data/differential_expression/sparse/sparsemini_features.tsv --barcode-file ../tests/data/differential_expression/sparse/sparsemini_barcodes.tsv --matrix-file-type mtx --cell-metadata-file ../tests/data/differential_expression/sparse/sparsemini_metadata.txt --cluster-file ../tests/data/differential_expression/sparse/sparsemini_cluster.txt --cluster-name de_sparse_integration --study-accession SCPsparsemini --differential-expression
43+
3744
"""
3845
import json
3946
import logging
4047
import os
4148
import re
4249
import sys
50+
import re
4351
from contextlib import nullcontext
4452
from typing import Dict, Generator, List, Tuple, Union
4553
from wsgiref.simple_server import WSGIRequestHandler # noqa: F401
@@ -519,44 +527,66 @@ def run_ingest(ingest, arguments, parsed_args):
519527
config.set_parent_event_name("ingest-pipeline:differential-expression")
520528
status_de = ingest.calculate_de()
521529
status.append(status_de)
530+
print(f'STATUS post-DE {status}')
522531

523532
return status, status_cell_metadata
524533

525534

535+
def get_delocalization_info(arguments):
536+
""" extract info on study file for delocalization decision-making
537+
"""
538+
for argument in list(arguments.keys()):
539+
captured_argument = re.match("(\w*file)$", argument)
540+
if captured_argument is not None:
541+
study_file_id = arguments["study_file_id"]
542+
matched_argument = captured_argument.groups()[0]
543+
file_path = arguments[matched_argument]
544+
545+
# Need 1 argument that has a path to identify google bucket
546+
# Break after first argument
547+
break
548+
return file_path, study_file_id
549+
550+
526551
def exit_pipeline(ingest, status, status_cell_metadata, arguments):
527552
"""Logs any errors, then exits Ingest Pipeline with standard OS code
528553
"""
529554
if len(status) > 0:
530-
if all(i < 1 for i in status):
555+
# for successful DE jobs, need to delocalize results
556+
if "differential_expression" in arguments and all(i < 1 for i in status):
557+
file_path, study_file_id = get_delocalization_info(arguments)
558+
# append status?
559+
if IngestFiles.is_remote_file(file_path):
560+
files_to_match = DifferentialExpression.string_for_output_match(
561+
arguments
562+
)
563+
DifferentialExpression.delocalize_de_files(
564+
file_path, study_file_id, files_to_match
565+
)
566+
# all non-DE ingest jobs can exit on success
567+
elif all(i < 1 for i in status):
531568
sys.exit(os.EX_OK)
532569
else:
533-
# delocalize errors file
534-
for argument in list(arguments.keys()):
535-
captured_argument = re.match("(\w*file)$", argument)
536-
if captured_argument is not None:
537-
study_file_id = arguments["study_file_id"]
538-
matched_argument = captured_argument.groups()[0]
539-
file_path = arguments[matched_argument]
540-
if IngestFiles.is_remote_file(file_path):
541-
# Delocalize support log
542-
IngestFiles.delocalize_file(
543-
study_file_id,
544-
arguments["study_id"],
545-
file_path,
546-
"log.txt",
547-
f"parse_logs/{study_file_id}/log.txt",
548-
)
549-
# Delocalize user log
550-
IngestFiles.delocalize_file(
551-
study_file_id,
552-
arguments["study_id"],
553-
file_path,
554-
"user_log.txt",
555-
f"parse_logs/{study_file_id}/user_log.txt",
556-
)
557-
# Need 1 argument that has a path to identify google bucket
558-
# Break after first argument
559-
break
570+
file_path, study_file_id = get_delocalization_info(arguments)
571+
if IngestFiles.is_remote_file(file_path):
572+
if "differential_expression" in arguments:
573+
log_path = (
574+
f"parse_logs/differential_expression/{study_file_id}/log.txt"
575+
)
576+
else:
577+
log_path = f"parse_logs/{study_file_id}/log.txt"
578+
# Delocalize support log
579+
IngestFiles.delocalize_file(
580+
study_file_id, arguments["study_id"], file_path, "log.txt", log_path
581+
)
582+
# Delocalize user log
583+
IngestFiles.delocalize_file(
584+
study_file_id,
585+
arguments["study_id"],
586+
file_path,
587+
"user_log.txt",
588+
log_path,
589+
)
560590
if status_cell_metadata is not None:
561591
if status_cell_metadata > 0 and ingest.cell_metadata.is_remote_file:
562592
# PAPI jobs failing metadata validation against convention report

0 commit comments

Comments
 (0)