Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
sbx_kraken:
kraken_db_fp: ''
kraken_db_fp: ''
77 changes: 0 additions & 77 deletions envs/sbx_kraken_env.linux-64.pin.txt

This file was deleted.

1 change: 1 addition & 0 deletions envs/sbx_kraken_env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ channels:
- bioconda
- conda-forge
dependencies:
- ete3
- kraken2
219 changes: 121 additions & 98 deletions scripts/summarize_kraken2_reports.py
Original file line number Diff line number Diff line change
@@ -1,109 +1,132 @@
import csv
import os
import sys
import tempfile
from ete3 import NCBITaxa
from pathlib import Path
from typing import Iterator, TextIO


allowed_ranks = {"D": "k", "P": "p", "C": "c", "O": "o", "F": "f", "G": "g", "S": "s"}
rank_order = {"D": 1, "P": 2, "C": 3, "O": 4, "F": 5, "G": 6, "S": 7}


def parse_kraken2_tsv_report(
file_handler: TextIO,
) -> Iterator[dict[str, float | int | str]]:
for line in file_handler:
data = line.strip().split("\t")
if len(data) == 6:
allowed_ranks = {
"D": "k__",
"P": "p__",
"C": "c__",
"O": "o__",
"F": "f__",
"G": "g__",
"S": "s__",
}
ncbi_rank_to_prefix = {
"domain": "k__",
"phylum": "p__",
"class": "c__",
"order": "o__",
"family": "f__",
"genus": "g__",
"species": "s__",
}


def parse_kraken(fp: Path) -> dict[str, str]:
results = {}
with open(fp, "r") as f:
reader = csv.reader(f, delimiter="\t")
for row in reader:
(
percentage,
fragments_covered,
fragments_assigned,
rank,
taxon_id,
scientific_name,
) = data
if rank in allowed_ranks.keys():
yield {
"percentage": float(percentage),
"fragments_covered": int(fragments_covered),
"fragments_assigned": int(fragments_assigned),
"rank": rank,
"taxon_id": int(taxon_id),
"scientific_name": scientific_name,
}


def consensus_lineage_str(rank_stack: list[str]) -> str:
missing_ranks = [k for k, v in rank_order.items() if v > len(rank_stack)]
rank_stack += [f"{allowed_ranks[r]}__" for r in missing_ranks]
return "; ".join(rank_stack)


def create_kraken2_tsv_report(
reports: list[Iterator[dict[str, float | int | str]]], report_names: list[str]
) -> tuple[dict[str, dict[int, int]], dict[int, str]]:
consensus_lineages = {}
report_counts = {}

for report, report_name in zip(reports, report_names):
rank_stack = []
counts = {}

for line in report:
if line["rank"] in allowed_ranks.keys():
# Update fragments assigned count
counts[line["taxon_id"]] = line["fragments_assigned"]

# Update rank stack
if len(rank_stack) < rank_order[line["rank"]]:
rank_stack.append(
f"{allowed_ranks[line['rank']]}__{line['scientific_name'].lstrip()}"
)
elif len(rank_stack) == rank_order[line["rank"]]:
rank_stack[-1] = (
f"{allowed_ranks[line['rank']]}__{line['scientific_name'].lstrip()}"
)
else:
rank_stack = rank_stack[: rank_order[line["rank"]]]
rank_stack[-1] = (
f"{allowed_ranks[line['rank']]}__{line['scientific_name'].lstrip()}"
)

# Update consensus lineages
if line["taxon_id"] not in consensus_lineages:
consensus_lineages[line["taxon_id"]] = consensus_lineage_str(
rank_stack
)

# Update report counts
report_counts[report_name] = counts

return report_counts, consensus_lineages


def write_kraken2_tsv_summary(
report_counts: dict[str, dict[int, int]],
consensus_lineages: dict[int, str],
file_handler: TextIO,
) = row
if rank in allowed_ranks:
results[taxon_id] = fragments_assigned

return results


def get_consensus_lineage(taxon_id: str, ncbi: NCBITaxa) -> str:
lineage = ncbi.get_lineage(taxon_id)
names = ncbi.get_taxid_translator(lineage)
lineage_ranks = ncbi.get_rank(lineage)

if not lineage:
raise ValueError(f"Taxon ID {taxon_id} not found in NCBI taxonomy database.")

lineage_parts = []
for taxid in lineage:
rank = lineage_ranks.get(taxid)
if rank in ncbi_rank_to_prefix:
prefix = ncbi_rank_to_prefix[rank]
name = names.get(taxid, "unknown")
lineage_parts.append(f"{prefix}{name}")

return "; ".join(lineage_parts)


def update_ncbi_taxonomy() -> NCBITaxa:
return NCBITaxa()


def write_kraken_summary(
report_counts: dict[str, dict[str, str]],
fp: Path,
ncbi: NCBITaxa,
) -> None:
# Write header
header = (
"#OTU ID\t"
+ "\t".join([k for k, _ in report_counts.items()])
+ "\tConsensus Lineage\n"
)
file_handler.write(header)

# Loop through consensus lineages
for taxon_id, lineage in consensus_lineages.items():
output_line = f"{taxon_id}\t"
for report_name in [k for k, _ in report_counts.items()]:
output_line += f"{report_counts[report_name].get(taxon_id, 0)}\t"
file_handler.write(output_line + f"{lineage}\n")


report_names = [Path(x).stem for x in snakemake.input.reports]
report_counts, consensus_lineages = create_kraken2_tsv_report(
[parse_kraken2_tsv_report(open(x)) for x in snakemake.input.reports], report_names
)
write_kraken2_tsv_summary(
report_counts, consensus_lineages, open(snakemake.output.summary, "w")
)
with open(fp, "w") as f:
writer = csv.writer(f, delimiter="\t")
header = ["#OTU ID"] + list(report_counts.keys()) + ["Consensus Lineage"]
writer.writerow(header)

all_ids = []
for _, v in report_counts.items():
all_ids.extend(v.keys())

# Remove duplicates while preserving order
all_ids = list(dict.fromkeys(all_ids))

for taxon_id in all_ids:
row = [taxon_id]
for report_name in report_counts.keys():
row.append(report_counts[report_name].get(taxon_id, "0"))
row.append(get_consensus_lineage(taxon_id, ncbi))
writer.writerow(row)


def main(reports: list[Path], summary: Path) -> None:
report_counts = {
report.stem.replace("-taxa", ""): parse_kraken(report) for report in reports
}
with tempfile.TemporaryDirectory() as tmpdir:
fp = Path(tmpdir)
os.environ["ETE_CACHE_DIR"] = fp.as_posix()
ncbi = update_ncbi_taxonomy()
write_kraken_summary(report_counts, summary, ncbi)


if "snakemake" in globals():
log = snakemake.log[0] # type: ignore
reports = [Path(fp) for fp in snakemake.input.reports] # type: ignore
summary = Path(snakemake.output.summary) # type: ignore

with open(log, "w") as log_f:
try:
sys.stdout = log_f
sys.stderr = log_f
main(reports, summary)
except Exception as e:
log_f.write(f"Error: {e}\n")
raise


if __name__ == "__main__":
if len(sys.argv) < 3:
print(
"Usage: python summarize_kraken2_reports.py <output_summary.tsv> <report1.tsv> [<report2.tsv> ...]"
)
sys.exit(1)

output_summary = Path(sys.argv[1])
report_files = [Path(fp) for fp in sys.argv[2:]]

main(report_files, output_summary)
56 changes: 5 additions & 51 deletions scripts/test_summarize_kraken2_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,15 @@
import sys
from pathlib import Path

from scripts.summarize_kraken2_reports_f import (
parse_kraken2_tsv_report,
create_kraken2_tsv_report,
write_kraken2_tsv_summary,
from .summarize_kraken2_reports import (
parse_kraken,
get_consensus_lineage,
write_kraken_summary,
)


@pytest.fixture
def reports():
report_fp = Path(".tests/data/kraken2-outputs/").resolve()
reports = glob.glob(str(report_fp / "*-taxa.tsv"))
return reports, report_fp


def test_write_kraken2_tsv_summary(tmpdir, reports):
reports, report_fp = reports
report_names = [Path(x).stem for x in reports]

report_counts, consensus_lineages = create_kraken2_tsv_report(
[parse_kraken2_tsv_report(open(x)) for x in reports], report_names
)

summary_fp = tmpdir / "summary.tsv"
summary_fp = report_fp / "summary.tsv"
write_kraken2_tsv_summary(report_counts, consensus_lineages, open(summary_fp, "w"))

with open(summary_fp, "r") as f:
summary = f.readlines()
summary = {x.split("\t")[0]: x.split("\t")[1:] for x in summary}
report_generators = [parse_kraken2_tsv_report(open(x)) for x in reports]

assert summary["#OTU ID"] == report_names + ["Consensus Lineage\n"]
assert summary["2"] == [
str(next(x)["fragments_assigned"]) for x in report_generators
] + ["k__Bacteria; p__; c__; o__; f__; g__; s__\n"]

for rg in report_generators:
for line in rg:
assert str(line["fragments_assigned"]) in summary[str(line["taxon_id"])]
assert (
str(line["scientific_name"]).lstrip()
in summary[str(line["taxon_id"])][-1]
)


def test_parse_kraken2_tsv_report(reports):
reports, _ = reports
report = reports[0]
parsed_report = parse_kraken2_tsv_report(open(report))

assert list(next(parsed_report).keys()) == [
"percentage",
"fragments_covered",
"fragments_assigned",
"rank",
"taxon_id",
"scientific_name",
]
return reports, report_fp
Loading