Skip to content

Commit 9fbaa74

Browse files
authored
Merge pull request #306 from broadinstitute/development
Release 1.26.1
2 parents 99968dd + caf9926 commit 9fbaa74

File tree

6 files changed

+130
-211
lines changed

6 files changed

+130
-211
lines changed

ingest/anndata_.py

Lines changed: 58 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,18 @@
11
import pandas as pd # NOqa: F821
22
import os
3-
import datetime
3+
import gzip
4+
import shutil
45
import scanpy as sc
6+
import scipy
7+
from scipy.io.mmio import MMFile
8+
9+
# scipy.io.mmwrite uses scientific notation by default
10+
# https://stackoverflow.com/questions/64748513
11+
class MMFileFixedFormat(MMFile):
12+
def _field_template(self, field, precision):
13+
# Override MMFile._field_template.
14+
return f'%.{precision}f\n'
15+
516

617
try:
718
from ingest_files import IngestFiles
@@ -35,7 +46,7 @@ def obtain_adata(self):
3546
except ValueError as e:
3647
raise ValueError(e)
3748

38-
def validate(self):
49+
def basic_validation(self):
3950
"""
4051
Currently, file passes "basic validation" if file
4152
can be opened by scanpy
@@ -91,6 +102,7 @@ def generate_cluster_body(adata, clustering_name):
91102
pd.DataFrame(cluster_body).to_csv(
92103
filename, sep="\t", mode="a", header=None, index=False
93104
)
105+
AnnDataIngestor.compress_file(filename)
94106

95107
@staticmethod
96108
def set_clustering_filename(name):
@@ -114,17 +126,58 @@ def generate_metadata_file(adata, output_name):
114126
f.write('\t'.join(headers) + '\n')
115127
f.write('\t'.join(types) + '\n')
116128
adata.obs.to_csv(output_name, sep="\t", mode="a", header=None, index=True)
129+
AnnDataIngestor.compress_file(output_name)
117130

118131
@staticmethod
119132
def clusterings_to_delocalize(arguments):
120133
# ToDo - check if names using obsm_keys need sanitization
121134
cluster_file_names = []
122135
for name in arguments["obsm_keys"]:
123-
cluster_file_names.append(AnnDataIngestor.set_clustering_filename(name))
136+
compressed_file = AnnDataIngestor.set_clustering_filename(name) + ".gz"
137+
cluster_file_names.append(compressed_file)
124138
return cluster_file_names
125139

126140
@staticmethod
127-
def delocalize_extracted_files(file_path, study_file_id, files_to_delocalize):
141+
def compress_file(filename):
142+
with open(filename, 'rb') as file_in:
143+
compressed_file = filename + '.gz'
144+
with gzip.open(compressed_file, 'wb') as file_gz:
145+
shutil.copyfileobj(file_in, file_gz)
146+
os.remove(filename)
147+
148+
@staticmethod
149+
def generate_processed_matrix(adata):
150+
"""
151+
Generate matrix files with the following file names:
152+
h5ad_frag.matrix.processed.mtx
153+
h5ad_frag.barcodes.processed.tsv
154+
h5ad_frag.features.processed.tsv
155+
Gzip files for faster delocalization
156+
"""
157+
pd.DataFrame(adata.var.index).to_csv(
158+
"h5ad_frag.features.processed.tsv.gz",
159+
sep="\t",
160+
index=False,
161+
header=False,
162+
compression="gzip",
163+
)
164+
pd.DataFrame(adata.obs.index).to_csv(
165+
"h5ad_frag.barcodes.processed.tsv.gz",
166+
sep="\t",
167+
index=False,
168+
header=False,
169+
compression="gzip",
170+
)
171+
mtx_filename = "h5ad_frag.matrix.processed.mtx"
172+
MMFileFixedFormat().write(
173+
mtx_filename, a=scipy.sparse.csr_matrix(adata.X.T), precision=3
174+
)
175+
AnnDataIngestor.compress_file(mtx_filename)
176+
177+
@staticmethod
178+
def delocalize_extracted_files(
179+
file_path, study_file_id, accession, files_to_delocalize
180+
):
128181
"""Copy extracted files to study bucket"""
129182

130183
for file in files_to_delocalize:
@@ -133,121 +186,5 @@ def delocalize_extracted_files(file_path, study_file_id, files_to_delocalize):
133186
None,
134187
file_path,
135188
file,
136-
f"_scp_internal/anndata_ingest/{study_file_id}/{file}",
137-
)
138-
139-
@staticmethod
140-
def check_valid(adata):
141-
error_messages = []
142-
143-
try:
144-
AnnDataIngestor.check_names_unique(adata.var_names, "Feature")
145-
except ValueError as v:
146-
error_messages.append(str(v))
147-
try:
148-
AnnDataIngestor.check_names_unique(adata.obs_names, "Obs")
149-
except ValueError as v:
150-
error_messages.append(str(v))
151-
if len(error_messages) > 0:
152-
raise ValueError("; ".join(error_messages))
153-
154-
return True
155-
156-
def process_matrix(self):
157-
"""Perform matrix processing"""
158-
if self.check_valid(self.adata):
159-
self.transform()
160-
161-
@staticmethod
162-
def check_names_unique(names, name_type):
163-
"""Return True if names are unique, else false
164-
Expected name_types: ["Feature", "Obs"]
165-
"""
166-
# check feature_name and obs names, feature_id logic not included
167-
# TODO (SCP-5105) non-happy path - add feature_id assessment
168-
if len(names) == len(names.unique()):
169-
return True
170-
else:
171-
dups = list_duplicates(names)
172-
features_for_msg = 2
173-
end = features_for_msg if len(dups) > features_for_msg else len(dups)
174-
dup_list = dups[:end]
175-
dup_string = " ".join(dup_list)
176-
177-
msg = (
178-
f"{name_type} names must be unique within a file. "
179-
f"{len(dups)} duplicates found, including: {dup_string}"
180-
)
181-
GeneExpression.log_for_mixpanel(
182-
"error", "content:duplicate:values-within-file", msg
183-
)
184-
raise ValueError(msg)
185-
186-
def transform(self):
187-
"""Transforms matrix into gene data model."""
188-
# initialize settings for mock data loads in tests
189-
self.test_models = None
190-
self.models_processed = 0
191-
192-
# derive file name from file path
193-
file_name = os.path.basename(self.file_path)
194-
start_time = datetime.datetime.now()
195-
GeneExpression.dev_logger.info("Starting run at " + str(start_time))
196-
num_processed = 0
197-
gene_models = []
198-
data_arrays = []
199-
for all_cell_model in GeneExpression.create_data_arrays(
200-
name=f"{file_name} Cells",
201-
array_type="cells",
202-
values=self.adata.obs.index.tolist(),
203-
linear_data_type="Study",
204-
linear_data_id=self.study_file_id,
205-
**self.data_array_kwargs,
206-
):
207-
data_arrays.append(all_cell_model)
208-
209-
# ASSUMPTION all_cell_model same for raw_count and processed_expression
210-
# TODO (SCP-5103): if raw counts is indicated check that .raw slot is populated
211-
212-
# Iterate over feature names (for happy path)
213-
for feature in self.adata.var_names.tolist():
214-
print(f"processing feature: {feature}")
215-
feature_expression_series = sc.get.obs_df(self.adata, keys=feature)
216-
if feature_expression_series.hasnans:
217-
msg = (
218-
f'Expected numeric expression score - '
219-
f'expression data has NaN values for feature "{feature}"'
220-
)
221-
GeneExpression.log_for_mixpanel(
222-
"error", "content:type:not-numeric", msg
223-
)
224-
raise ValueError(msg)
225-
# capture sparse (only non zero values and their cell IDs)
226-
# check mtx.py for all zero gene handling
227-
filtered_expression_series = feature_expression_series[
228-
feature_expression_series.values > 0
229-
]
230-
231-
exp_cells = filtered_expression_series.index.tolist()
232-
233-
untrimmed_exp_scores = filtered_expression_series.values.tolist()
234-
235-
# trim expression data to three significant digits
236-
exp_scores = [round(float(value), 3) for value in untrimmed_exp_scores]
237-
# TODO (SCP-5105) for None value below, replace with feature ID (string)
238-
data_arrays, gene_models, num_processed = self.create_models(
239-
exp_cells,
240-
exp_scores,
241-
feature,
242-
None,
243-
gene_models,
244-
data_arrays,
245-
num_processed,
246-
False,
247-
)
248-
# Load any remaining models. This is necessary because the amount of
249-
# models may be less than the batch size.
250-
if len(gene_models) > 0 or len(data_arrays) > 0:
251-
self.create_models(
252-
[], [], None, None, gene_models, data_arrays, num_processed, True
189+
f"_scp_internal/anndata_ingest/{accession}_{study_file_id}/{file}",
253190
)

ingest/ingest_files.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ def open_pandas(self, file_path, file_type, **kwargs):
341341
def open_anndata(self, file_path, **kwargs):
342342
"""Opens file as AnnData object"""
343343
try:
344-
return sc.read_h5ad(file_path, backed='r')
344+
return sc.read_h5ad(file_path)
345345
except OSError as e:
346346
msg = f"Scanpy cannot read file, \"{file_path}\"."
347347
log_exception(IngestFiles.dev_logger, IngestFiles.user_logger, msg)

ingest/ingest_pipeline.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -500,14 +500,14 @@ def extract_from_anndata(self):
500500
self.anndata = AnnDataIngestor(
501501
self.anndata_file, self.study_id, self.study_file_id, **self.kwargs
502502
)
503-
if self.anndata.validate():
504-
self.report_validation("success")
505-
# process matrix data
506-
### TODO (SCP-5102, SCP-5103): how to associate "raw_count" cells to anndata file
507-
if self.kwargs.get("extract") and "processed_expression" in self.kwargs.get(
508-
"extract"
509-
):
510-
self.anndata.process_matrix()
503+
if self.anndata.basic_validation():
504+
# Get metadata extraction parameters and perform extraction
505+
if self.kwargs.get("extract") and "metadata" in self.kwargs.get("extract"):
506+
metadata_filename = "h5ad_frag.metadata.tsv"
507+
# TODO (SCP-5104): perform check for successful extraction or report failure and exit
508+
AnnDataIngestor.generate_metadata_file(
509+
self.anndata.adata, metadata_filename
510+
)
511511
# Get cluster extraction parameters and perform extraction
512512
if self.kwargs.get("extract") and "cluster" in self.kwargs.get("extract"):
513513
if not self.kwargs["obsm_keys"]:
@@ -519,13 +519,13 @@ def extract_from_anndata(self):
519519
self.anndata.adata, key
520520
)
521521
AnnDataIngestor.generate_cluster_body(self.anndata.adata, key)
522-
# Get metadata extraction parameters and perform extraction
523-
if self.kwargs.get("extract") and "metadata" in self.kwargs.get("extract"):
524-
metadata_filename = f"h5ad_frag.metadata.tsv"
525-
# TODO (SCP-5104): perform check for successful extraction or report failure and exit
526-
AnnDataIngestor.generate_metadata_file(
527-
self.anndata.adata, metadata_filename
528-
)
522+
# process matrix data
523+
### TODO (SCP-5102, SCP-5103): how to associate "raw_count" cells to anndata file
524+
if self.kwargs.get("extract") and "processed_expression" in self.kwargs.get(
525+
"extract"
526+
):
527+
self.anndata.generate_processed_matrix(self.anndata.adata)
528+
self.report_validation("success")
529529
return 0
530530
# scanpy unable to open AnnData file
531531
else:
@@ -652,10 +652,19 @@ def exit_pipeline(ingest, status, status_cell_metadata, arguments):
652652
AnnDataIngestor.clusterings_to_delocalize(arguments)
653653
)
654654
if "metadata" in arguments.get("extract"):
655-
metadata_filename = f"h5ad_frag.metadata.tsv"
655+
metadata_filename = f"h5ad_frag.metadata.tsv.gz"
656656
files_to_delocalize.append(metadata_filename)
657+
if "processed_expression" in arguments.get("extract"):
658+
mtx = "h5ad_frag.matrix.processed.mtx.gz"
659+
barcodes = "h5ad_frag.barcodes.processed.tsv.gz"
660+
features = "h5ad_frag.features.processed.tsv.gz"
661+
mtx_bundle = [mtx, barcodes, features]
662+
files_to_delocalize.extend(mtx_bundle)
657663
AnnDataIngestor.delocalize_extracted_files(
658-
file_path, study_file_id, files_to_delocalize
664+
file_path,
665+
study_file_id,
666+
arguments["study_accession"],
667+
files_to_delocalize,
659668
)
660669
# all non-DE, non-anndata ingest jobs can exit on success
661670
elif all(i < 1 for i in status):
@@ -726,6 +735,7 @@ def main() -> None:
726735
# Log Mixpanel events
727736
MetricsService.log(config.get_parent_event_name(), config.get_metric_properties())
728737
# Exit pipeline
738+
arguments["study_accession"] = metrics_dump["studyAccession"]
729739
exit_pipeline(ingest, status, status_cell_metadata, arguments)
730740

731741

0 commit comments

Comments
 (0)