Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplication reporting update #187

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions pairtools/cli/dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,16 @@
help="Output stats in yaml format instead of table. [output stats format option]",
)

# Add column with the number of duplicates
@click.option(
"--count-dups/--no-count-dups",
is_flag=True,
default=False,
help="Add column to the output pairs with the number of duplicates. "
"Comparible with sklearn and scipy backends only. "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CompaTible

"Is not counted by default. [output dedup pairs format option]",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe "Off by default"?

)

# Filtering options for reporting stats:
@click.option(
"--filter",
Expand Down Expand Up @@ -388,6 +398,11 @@ def dedup_py(
send_header_to_dedup = send_header_to in ["both", "dedup"]
send_header_to_dup = send_header_to in ["both", "dups"]

count_dups = kwargs.get("count_dups", False)
if backend=="cython" and count_dups:
logger.warning("Not countin number of duplicates with Cython backend.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

countinG

count_dups = False

instream = fileio.auto_open(
pairs_path,
mode="r",
Expand Down Expand Up @@ -486,6 +501,8 @@ def dedup_py(
"Pairs file appears not to be sorted, dedup might produce wrong results."
)
header = headerops.append_new_pg(header, ID=UTIL_NAME, PN=UTIL_NAME)
if count_dups:
header = headerops.append_columns(header, ["n_dups"])
if send_header_to_dedup:
outstream.writelines((l + "\n" for l in header))
if send_header_to_dup and outstream_dups and (outstream_dups != outstream):
Expand Down Expand Up @@ -555,6 +572,7 @@ def dedup_py(
out_stat=out_stat,
backend=backend,
n_proc=n_proc,
count_dups=count_dups,
)
else:
raise ValueError("Unknown backend")
Expand Down
79 changes: 74 additions & 5 deletions pairtools/lib/dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,58 @@ def streaming_dedup(
out_stat,
backend,
n_proc,
count_dups=False
):
"""
Deduplication with sklearn or scipy KD_trees backend.

Parameters
----------
in_stream : file
Input stream of pairs
colnames : list of str
Names of the columns in the input file
chunksize : int
How many lines to read at a time
carryover : int
How many lines to keep in memory to match with the next chunk
method : str
'sum' or 'max' - whether 'r' uses sum of distances on two ends of pairs, or the
maximal distance
mark_dups : bool
If True, will add "DD" as the pair_type
max_mismatch : int
Allowed distance between two pairs to call them duplicates
extra_col_pairs : list of tuples
List of extra column pairs that need to match between two reads for them be
considered duplicates (e.g. useful if alleles are annotated)
unmapped_chrom : str
Which character denotes unmapped reads in the chrom1/chrom2 fields
outstream : file
Output stream of deduplicated pairs
outstream_dups : file
Output stream of duplicates (optionally with added parent_id, see keep_parent_id option)
outstream_unmapped : file
Output stream of unmapped pairs
keep_parent_id : bool
If True, the read ID of the read that was not labelled as a duplicate from a
group of duplicates is recorded for each read marked as duplicate.
Only possible with non-cython backends.
out_stat : PairCounter
Output statistics
backend : str
'scipy', 'sklearn'; work effectively the same
n_proc : int
How many cores to use, by default 1
Only works for 'sklearn' backend
count_dups: bool
if True, will count the number of duplicates for each unique pair and report it as extra column.
Compatible with sklean and scipy backends only.

Returns
-------

"""

deduped_chunks = _dedup_stream(
in_stream=in_stream,
Expand All @@ -54,6 +105,7 @@ def streaming_dedup(
keep_parent_id=keep_parent_id,
backend=backend,
n_proc=n_proc,
count_dups=count_dups
)

t0 = time.time()
Expand Down Expand Up @@ -118,6 +170,7 @@ def _dedup_stream(
keep_parent_id,
backend,
n_proc,
count_dups=False,
):
# Stream the input dataframe:
dfs = pd.read_table(
Expand All @@ -140,6 +193,7 @@ def _dedup_stream(
extra_col_pairs=extra_col_pairs,
backend=backend,
n_proc=n_proc,
count_dups=count_dups
)
df_marked = df_marked.loc[prev_i:, :].reset_index(drop=True)
mask_duplicated = df_marked["duplicate"]
Expand All @@ -165,8 +219,10 @@ def _dedup_chunk(
backend,
unmapped_chrom="!",
n_proc=1,
count_dups=False,
):
"""Mark duplicates in a dataframe of pairs
"""
Mark duplicates in a dataframe of pairs

Parameters
----------
Expand All @@ -193,6 +249,8 @@ def _dedup_chunk(
n_proc : int, optional
How many cores to use, by default 1
Only works for 'sklearn' backend
count_dups: bool, optional
if True, will count the number of duplicates for each unique pair and report it as extra column

Returns
-------
Expand Down Expand Up @@ -220,6 +278,14 @@ def _dedup_chunk(
# Set up columns to store the dedup info:
df.loc[:, "clusterid"] = np.nan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agalitsyna , tests fail at this line with an error message "ValueError: cannot set a frame with no defined index and a scalar".
Googling suggests that such an error arises when one tried to index an empty frame: https://stackoverflow.com/questions/48306694/valueerror-cannot-set-a-frame-with-no-defined-index-and-a-value-that-cannot-be

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe df.assign? What's your favorite way of filling in columns in dataframe?
https://stackoverflow.com/questions/34811971/how-do-i-fill-a-column-with-one-value-in-pandas

df.loc[:, "duplicate"] = False
if count_dups:
# If we are counting dups, the input is always a mix of already "counted" duplicates
# and new dataframe chunk. So we need to reset counter to 1 but only if the counter was missing:
if "n_duplicated" not in df.columns:
df.loc[:, "n_duplicates"] = 1
else:
df.loc[:, "n_duplicates"] = df["n_duplicates"].fillna(1)
# Here, the name "n_duplicates" is a little bit unfair, as in fact it is the total number of reads

# Split mapped and unmapped reads:
mask_unmapped = (df["chrom1"] == unmapped_chrom) | (df["chrom2"] == unmapped_chrom)
Expand Down Expand Up @@ -264,7 +330,7 @@ def _dedup_chunk(
a_mat = coo_matrix((np.ones_like(a0), (a0, a1)), shape=(N_mapped, N_mapped))

# Set up inferred clusterIDs:
df_mapped.loc[:, "clusterid"] = connected_components(a_mat, directed=False)[1]
df_mapped.loc[:, "clusterid"] = connected_components(a_mat, directed=False)[1].astype(np.uint)

mask_dups = df_mapped["clusterid"].duplicated()
df_mapped.loc[mask_dups, "duplicate"] = True
Expand All @@ -278,11 +344,14 @@ def _dedup_chunk(

# Reconstruct original dataframe with removed duplicated reads:
df = pd.concat([df_unmapped, df_mapped]).reset_index(drop=True)
if count_dups:
df["clusterid"] = df["clusterid"].fillna(-1)
df.loc[:, "n_duplicates"] = df.groupby("clusterid", dropna=False).transform("sum")["n_duplicates"]

df = df.set_index(index_colname) # Set up the original index
df = df.drop(
["clusterid"], axis=1
) # Remove the information that we don't need anymore:

) # Remove the information that we don't need anymore
return df


Expand All @@ -307,7 +376,7 @@ def streaming_dedup_cython(
out_stat,
mark_dups,
keep_parent_id=False,
readid_ind=0,
readid_ind=0
):
"""
Cython-powered deduplication with online algorithm based on indexed list.
Expand Down