|
1 | 1 | import re |
2 | | -from logging import warning |
| 2 | +from logging import log, warning |
3 | 3 | from pathlib import Path |
4 | | -from typing import Any, Dict, Literal, Union |
| 4 | +from typing import Any, Dict, List, Literal, Union |
5 | 5 |
|
6 | 6 | import numpy as np |
7 | 7 | import pandas as pd |
@@ -72,29 +72,81 @@ def create_transcript_gene_map( |
72 | 72 | def create_transcript_gene_map_from_annotation( |
73 | 73 | file_path: Union[str, Path], |
74 | 74 | source_field: Literal["transcript_id", "transcript_name"] = "transcript_id", |
75 | | - target_field: Literal["gene_id", "gene_name"] = "gene_id", |
| 75 | + target_field: Union[ |
| 76 | + Literal["gene_id", "gene_name", "gene_biotype", "transcript_name"], |
| 77 | + List[Literal["gene_id", "gene_name", "gene_biotype", "transcript_name"]], |
| 78 | + ] = "gene_id", |
| 79 | + use_transcript_name_as_replacement_id: bool = True, |
| 80 | + use_gene_name_as_replacement_id: bool = True, |
76 | 81 | chunk_size: int = 100000, |
77 | | - keep_biotype: bool = False, |
78 | 82 | **kwargs: Dict[str, Any], |
79 | 83 | ) -> pd.DataFrame: |
80 | 84 | """Create a mapping from transcript ids to gene ids using a GTF annotation file. |
81 | 85 |
|
| 86 | + Basic example: |
| 87 | +
|
| 88 | + .. code-block:: python |
| 89 | +
|
| 90 | + from pytximport.utils import create_transcript_gene_map_from_annotation |
| 91 | +
|
| 92 | + # Create a mapping from transcript ids to gene names |
| 93 | + transcript_gene_map = create_transcript_gene_map_from_annotation( |
| 94 | + "path/to/annotation.gtf", |
| 95 | + target_field="gene_name", |
| 96 | + ) |
| 97 | +
|
| 98 | + # Create a mapping from transcript ids to transcript names and include the gene biotype |
| 99 | + transcript_gene_map = create_transcript_gene_map_from_annotation( |
| 100 | + "path/to/annotation.gtf", |
| 101 | + target_field=["transcript_name", "gene_biotype"], |
| 102 | + ) |
| 103 | +
|
82 | 104 | Args: |
83 | 105 | file_path (Union[str, Path]): The path to the GTF annotation file. |
84 | | - field (Literal["gene_id", "gene_name"], optional): The identifier to get for each transcript id. |
| 106 | + source_field (Literal["transcript_id", "transcript_name"], optional): The identifier to get for each transcript |
| 107 | + id. Defaults to "transcript_id". |
| 108 | + target_field (Union[ Literal["gene_id", "gene_name", "gene_biotype"], List[Literal["gene_id", "gene_name", |
| 109 | + "gene_biotype"]], optional): The corresponding identifier(s) to get for each transcript. |
85 | 110 | Defaults to "gene_id". |
86 | 111 | chunk_size (int, optional): The number of lines to read at a time. Defaults to 100000. |
| 112 | + use_transcript_name_as_replacement_id (bool, optional): Whether to use the transcript name as the transcript id |
| 113 | + if the transcript id is missing. Defaults to True. |
| 114 | + use_gene_name_as_replacement_id (bool, optional): Whether to use the gene name as the gene id if the gene id is |
| 115 | + missing. Defaults to True. |
87 | 116 | keep_biotype (bool, optional): Whether to keep the gene_biotype column. Defaults to False. |
88 | 117 |
|
89 | 118 | Returns: |
90 | 119 | pd.DataFrame: The mapping from transcript ids to gene ids. |
91 | 120 | """ |
| 121 | + assert source_field != target_field, "The source_field and target_field must be different." |
| 122 | + |
92 | 123 | transcript_gene_map = pd.DataFrame(columns=["transcript_id", "gene_id", "gene_name", "gene_biotype"]) |
93 | 124 |
|
94 | 125 | if "field" in kwargs: |
95 | 126 | warning("The field argument is deprecated. Please use the source_field and target_field arguments instead.") |
96 | 127 |
|
97 | | - for chunk in pd.read_csv(file_path, sep="\t", chunksize=chunk_size, header=None, comment="#"): |
| 128 | + if "keep_biotype" in kwargs and kwargs["keep_biotype"]: |
| 129 | + warning("The keep_biotype argument is deprecated. Please use the target_field argument with a list instead.") |
| 130 | + if target_field != "gene_biotype" and not (isinstance(target_field, list) and "gene_biotype" in target_field): |
| 131 | + target_field = ( |
| 132 | + [*target_field, "gene_biotype"] if isinstance(target_field, list) else [target_field, "gene_biotype"] |
| 133 | + ) |
| 134 | + |
| 135 | + if not isinstance(file_path, Path): |
| 136 | + file_path = Path(file_path) |
| 137 | + |
| 138 | + if not Path(file_path).exists(): |
| 139 | + raise FileNotFoundError(f"The file {file_path} does not exist.") |
| 140 | + |
| 141 | + for chunk in pd.read_csv( |
| 142 | + file_path, |
| 143 | + sep="\t", |
| 144 | + chunksize=chunk_size, |
| 145 | + header=None, |
| 146 | + comment="#", |
| 147 | + compression=("gzip" if file_path.suffix == ".gz" else None), |
| 148 | + engine="c", |
| 149 | + ): |
98 | 150 | # See: https://www.ensembl.org/info/website/upload/gff.html |
99 | 151 | chunk.columns = ["seqname", "source", "feature", "start", "end", "score", "strand", "frame", "attribute"] |
100 | 152 |
|
@@ -131,26 +183,46 @@ def create_transcript_gene_map_from_annotation( |
131 | 183 | transcript_gene_map["gene_name"], |
132 | 184 | ) |
133 | 185 |
|
134 | | - if source_field == "transcript_name": |
| 186 | + # If only the transcript_name is present, we can drop the id and rename the transcript_name to transcript_id |
| 187 | + if source_field == "transcript_name" and use_transcript_name_as_replacement_id: |
135 | 188 | transcript_gene_map.drop("transcript_id", axis=1, inplace=True) |
136 | 189 | transcript_gene_map.rename(columns={"transcript_name": "transcript_id"}, inplace=True) |
137 | 190 |
|
138 | | - if target_field == "gene_name": |
| 191 | + source_field = "transcript_id" |
| 192 | + |
| 193 | + # If only the gene_name is present, we can drop the gene_id and rename the gene_name to gene_id |
| 194 | + if ( |
| 195 | + (target_field == "gene_name" or (isinstance(target_field, list) and "gene_name" in target_field)) |
| 196 | + and not (target_field == "gene_id" or (isinstance(target_field, list) and "gene_id" in target_field)) |
| 197 | + and use_gene_name_as_replacement_id |
| 198 | + ): |
| 199 | + log( |
| 200 | + 25, |
| 201 | + ( |
| 202 | + "No gene_id target field was provided. Renaming gene_name to gene_id. " |
| 203 | + "You can disable this behavior by setting use_gene_name_as_replacement_id to False." |
| 204 | + ), |
| 205 | + ) |
| 206 | + |
139 | 207 | transcript_gene_map.drop("gene_id", axis=1, inplace=True) |
140 | 208 | transcript_gene_map.rename(columns={"gene_name": "gene_id"}, inplace=True) |
141 | 209 |
|
142 | | - fields_to_keep = ["transcript_id", "gene_id"] |
| 210 | + if isinstance(target_field, list): |
| 211 | + target_field = [field if field != "gene_name" else "gene_id" for field in target_field] |
| 212 | + else: |
| 213 | + target_field = "gene_id" if target_field == "gene_name" else target_field |
143 | 214 |
|
144 | | - if keep_biotype and "gene_biotype" in transcript_gene_map.columns: |
145 | | - fields_to_keep.append("gene_biotype") |
| 215 | + fields_to_keep = [source_field, *target_field] if isinstance(target_field, list) else [source_field, target_field] |
146 | 216 |
|
147 | 217 | transcript_gene_map = transcript_gene_map[fields_to_keep] |
148 | | - |
149 | | - transcript_gene_map[["gene_id", "transcript_id"]] = transcript_gene_map[["gene_id", "transcript_id"]].replace( |
150 | | - "", np.nan |
151 | | - ) |
| 218 | + transcript_gene_map.replace("", np.nan, inplace=True) |
152 | 219 | transcript_gene_map.dropna(inplace=True) |
153 | | - transcript_gene_map.drop_duplicates(subset=["gene_id", "transcript_id"], inplace=True) |
| 220 | + |
| 221 | + if source_field == "transcript_id" and ( |
| 222 | + target_field == "gene_id" or (isinstance(target_field, list) and "gene_id" in target_field) |
| 223 | + ): |
| 224 | + transcript_gene_map.drop_duplicates(subset=["transcript_id", "gene_id"], inplace=True) |
| 225 | + |
154 | 226 | transcript_gene_map.reset_index(drop=True, inplace=True) |
155 | 227 |
|
156 | 228 | return transcript_gene_map |
0 commit comments