Skip to content

Commit ae99818

Browse files
authored
Merge pull request #276 from broadinstitute/jlc_ingest_h5ad_cluster
Extract clustering from h5ad and delocalize intermediate files to study bucket (SCP-4771)
2 parents 3249e71 + 2e07bee commit ae99818

File tree

3 files changed

+119
-10
lines changed

3 files changed

+119
-10
lines changed

ingest/anndata_.py

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import pandas as pd # NOqa: F821
2+
13
try:
24
from ingest_files import IngestFiles
35
from monitor import log_exception
@@ -14,13 +16,20 @@ def __init__(self, file_path, study_file_id, study_id, **kwargs):
1416
IngestFiles.__init__(
1517
self, file_path, allowed_file_types=self.ALLOWED_FILE_TYPES
1618
)
17-
pass
19+
# If performing cluster extraction, set obsm_keys
20+
extract_cluster = kwargs.get("extract_cluster")
21+
if extract_cluster:
22+
self.obsm_keys = kwargs["obsm_keys"]
23+
else:
24+
pass
1825

1926
def obtain_adata(self):
2027
try:
21-
self.adata = self.open_file(self.file_path)[0]
22-
print(self.adata)
23-
IngestFiles.dev_logger.info(str(self.adata))
28+
adata = self.open_file(self.file_path)[0]
29+
# for faster dev, print adata info to screen, may want to remove in future
30+
print(adata)
31+
IngestFiles.dev_logger.info(str(adata))
32+
return adata
2433
except ValueError as e:
2534
raise ValueError(e)
2635

@@ -35,3 +44,69 @@ def validate(self):
3544
except ValueError:
3645
return False
3746

47+
@staticmethod
48+
def generate_cluster_header(adata, clustering_name):
49+
"""
50+
Based on clustering dimensions, write clustering NAME line to file
51+
"""
52+
dim = ['NAME', 'X', 'Y']
53+
clustering_dimension = adata.obsm[clustering_name].shape[1]
54+
if clustering_dimension == 3:
55+
headers = dim.append('Z')
56+
elif clustering_dimension == 3:
57+
headers = dim
58+
elif clustering_dimension > 3:
59+
msg = f"Too many dimensions for visualization in obsm \"{clustering_name}\", found {clustering_dimension}, expected 2 or 3."
60+
raise ValueError(msg)
61+
else:
62+
msg = f"Too few dimensions for visualization in obsm \"{clustering_name}\", found {clustering_dimension}, expected 2 or 3."
63+
raise ValueError(msg)
64+
with open(f"{clustering_name}.cluster.anndata_segment.tsv", "w") as f:
65+
f.write('\t'.join(headers) + '\n')
66+
67+
@staticmethod
68+
def generate_cluster_type_declaration(adata, clustering_name):
69+
"""
70+
Based on clustering dimensions, write clustering TYPE line to file
71+
"""
72+
clustering_dimension = adata.obsm[clustering_name].shape[1]
73+
types = ["TYPE", *["numeric"] * clustering_dimension]
74+
with open(f"{clustering_name}.cluster.anndata_segment.tsv", "a") as f:
75+
f.write('\t'.join(types) + '\n')
76+
77+
@staticmethod
78+
def generate_cluster_body(adata, clustering_name):
79+
"""
80+
Append clustering data to clustering file
81+
"""
82+
cluster_cells = pd.DataFrame(adata.obs_names)
83+
cluster_body = pd.concat(
84+
[cluster_cells, pd.DataFrame(adata.obsm[clustering_name])], axis=1
85+
)
86+
pd.DataFrame(cluster_body).to_csv(
87+
f"{clustering_name}.cluster.anndata_segment.tsv",
88+
sep="\t",
89+
mode="a",
90+
header=None,
91+
index=False,
92+
)
93+
94+
@staticmethod
95+
def files_to_delocalize(arguments):
96+
# ToDo - check if names using obsm_keys need sanitization
97+
cluster_file_names = [name + ".tsv" for name in arguments["obsm_keys"]]
98+
return cluster_file_names
99+
100+
@staticmethod
101+
def delocalize_cluster_files(file_path, study_file_id, files_to_delocalize):
102+
""" Copy cluster files to study bucket
103+
"""
104+
105+
for file in files_to_delocalize:
106+
IngestFiles.delocalize_file(
107+
study_file_id,
108+
None,
109+
file_path,
110+
file,
111+
f"_scp_internal/anndata_ingest/{file}",
112+
)

ingest/cli_parser.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -347,13 +347,28 @@ def create_parser():
347347
"--anndata-file", required=True, help="Path to AnnData file"
348348
)
349349

350+
parser_anndata.add_argument(
351+
"--obsm-keys",
352+
type=ast.literal_eval,
353+
help="Array of obsm key(s) to extract as cluster files",
354+
)
355+
356+
parser_anndata.add_argument(
357+
"--extract-cluster",
358+
action="store_true",
359+
help="Indicates clustering data should be extracted",
360+
)
361+
350362
parser_expression_writer = subparsers.add_parser(
351363
"render_expression_arrays",
352-
help="Indicates preprocessing of cluster/expression files for image pipeline"
364+
help="Indicates preprocessing of cluster/expression files for image pipeline",
353365
)
354366

355367
parser_expression_writer.add_argument(
356-
'--render-expression-arrays', action="store_true", help='Invoke expression_writer.py', required=True
368+
'--render-expression-arrays',
369+
action="store_true",
370+
help='Invoke expression_writer.py',
371+
required=True,
357372
)
358373

359374
parser_expression_writer.add_argument(
@@ -366,7 +381,10 @@ def create_parser():
366381
'--matrix-file-path', help='path to matrix file', required=True
367382
)
368383
parser_expression_writer.add_argument(
369-
'--matrix-file-type', help='type to matrix file (dense or mtx)', required=True, choices=['dense', 'mtx']
384+
'--matrix-file-type',
385+
help='type to matrix file (dense or mtx)',
386+
required=True,
387+
choices=['dense', 'mtx'],
370388
)
371389
parser_expression_writer.add_argument(
372390
'--gene-file', help='path to gene file (omit for dense matrix files)'

ingest/ingest_pipeline.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
python ingest_pipeline.py --study-id 5d276a50421aa9117c982845 --study-file-id 5dd5ae25421aa910a723a337 ingest_expression --taxon-name 'Homo sapiens' --taxon-common-name human --ncbi-taxid 9606 --matrix-file ../tests/data/dense_matrix_19_genes_1000_cells.txt --matrix-file-type dense
2828
2929
# Ingest AnnData file
30-
python ingest_pipeline.py --study-id 5d276a50421aa9117c982845 --study-file-id 5dd5ae25421aa910a723a337 ingest_anndata --anndata-file ../tests/data/anndata/test.h5ad
30+
python ingest_pipeline.py --study-id 5d276a50421aa9117c982845 --study-file-id 5dd5ae25421aa910a723a337 ingest_anndata --ingest-anndata --anndata-file ../tests/data/anndata/test.h5ad
3131
3232
# Subsample cluster and metadata file
3333
python ingest_pipeline.py --study-id 5d276a50421aa9117c982845 --study-file-id 5dd5ae25421aa910a723a337 ingest_subsample --cluster-file ../tests/data/test_1k_cluster_Data.csv --name custer1 --cell-metadata-file ../tests/data/test_1k_metadata_Data.csv --subsample
@@ -103,7 +103,7 @@
103103
from .clusters import Clusters
104104
from .expression_files.dense_ingestor import DenseIngestor
105105
from .expression_files.mtx import MTXIngestor
106-
from .anndata import AnnDataIngestor
106+
from .anndata_ import AnnDataIngestor
107107
from .cli_parser import create_parser, validate_arguments
108108
from .de import DifferentialExpression
109109
from .expression_writer import ExpressionWriter
@@ -487,6 +487,13 @@ def ingest_anndata(self):
487487
)
488488
if self.anndata.validate():
489489
self.report_validation("success")
490+
if self.kwargs["extract_cluster"]:
491+
for key in self.kwargs["obsm_keys"]:
492+
AnnDataIngestor.generate_cluster_header(self.anndata.adata, key)
493+
AnnDataIngestor.generate_cluster_type_declaration(
494+
self.anndata.adata, key
495+
)
496+
AnnDataIngestor.generate_cluster_body(self.anndata.adata, key)
490497
return 0
491498
# scanpy unable to open AnnData file
492499
else:
@@ -605,7 +612,16 @@ def exit_pipeline(ingest, status, status_cell_metadata, arguments):
605612
DifferentialExpression.delocalize_de_files(
606613
file_path, study_file_id, files_to_match
607614
)
608-
# all non-DE ingest jobs can exit on success
615+
# for successful anndata jobs, need to delocalize intermediate ingest files
616+
elif "extract_cluster" in arguments and all(i < 1 for i in status):
617+
file_path, study_file_id = get_delocalization_info(arguments)
618+
# append status?
619+
if IngestFiles.is_remote_file(file_path):
620+
files_to_delocalize = AnnDataIngestor.files_to_delocalize(arguments)
621+
AnnDataIngestor.delocalize_cluster_files(
622+
file_path, study_file_id, files_to_delocalize
623+
)
624+
# all non-DE, non-anndata ingest jobs can exit on success
609625
elif all(i < 1 for i in status):
610626
sys.exit(os.EX_OK)
611627
else:

0 commit comments

Comments
 (0)