Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/curate-video/process-data/dedup.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Duplicate removal operates on clip-level embeddings produced during processing:
## Before You Start

- Verify local paths or configure S3-compatible credentials. Provide `storage_options` in read/write keyword arguments when reading or writing cloud paths.
- Create output directories for `KMeansStage`, `PairwiseStage`, and `IdentifyDuplicatesStage`.
- Create output directories for the stages you'll use (`KMeansStage`, `PairwiseStage`, and `IdentifyDuplicatesStage`).

---

Expand Down
18 changes: 17 additions & 1 deletion nemo_curator/stages/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from __future__ import annotations

import re
from abc import ABC, ABCMeta, abstractmethod
from typing import TYPE_CHECKING, Any, Generic, TypeVar

Expand All @@ -31,6 +32,20 @@
_STAGE_REGISTRY: dict[str, type[ProcessingStage]] = {}


def _validate_stage_name(name: str) -> None:
"""Validate that stage name follows snake_case convention.

Args:
name: The stage name to validate

Raises:
ValueError: If stage name is not in snake_case format
"""
if not re.fullmatch(r"[a-z][a-z0-9_]*", name):
msg = f"Stage name must be snake_case, got '{name}'"
raise ValueError(msg)


class StageMeta(ABCMeta):
"""Metaclass that automatically registers concrete Stage subclasses.
A class is considered *concrete* if it directly inherits from
Expand Down Expand Up @@ -80,7 +95,7 @@ class ProcessingStage(ABC, Generic[X, Y], metaclass=StageMeta):
"""

_is_abstract_root = True # prevent base from registering itself
_name = "ProcessingStage"
_name = "processing_stage" # Changed to snake_case
_resources = Resources(cpus=1.0)
_batch_size = 1

Expand Down Expand Up @@ -251,6 +266,7 @@ def with_(

# Override the instance attributes directly
if name is not None:
_validate_stage_name(name) # Validate the new name
new_instance._name = name
if resources is not None:
new_instance._resources = resources
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/stages/deduplication/exact/identification.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class ExactDuplicateIdentification(DeduplicationIO, ShuffleStage):
Whether the underlying rapidsmpf shuffler should collect shuffle statistics.
"""

_name = "ExactDuplicateIds"
_name = "exact_duplicate_ids"

def __init__( # noqa: PLR0913
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class BucketsToEdgesStage(ProcessingStage[FileGroupTask, FileGroupTask]):
Only the storage_options key is supported for now.
"""

_name = "BucketsToEdgesStage"
_name = "buckets_to_edges_stage"
_resources = Resources(cpus=1.0)

def __init__(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(
self.read_kwargs = read_kwargs if read_kwargs is not None else {}
self.write_kwargs = write_kwargs if write_kwargs is not None else {}

self._name = self.__class__.__name__
self._name = "connected_components_stage"
self._resources = Resources(cpus=1.0, gpus=1.0)
self._batch_size = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class IdentifyDuplicatesStage(ShuffleStage):
Whether the underlying rapidsmpf shuffler should collect shuffle statistics.
"""

_name = "IdentifyDuplicates"
_name = "identify_duplicates"

def __init__( # noqa: PLR0913
self,
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/stages/deduplication/fuzzy/lsh/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class LSHStage(ProcessingStage[FileGroupTask, FileGroupTask]):
If None, the number of partitions will be decided automatically by the executor as the closest power of 2 <= number of input tasks.
"""

_name = "LSHStage"
_name = "lsh_stage"
_resources = Resources(gpus=1.0)

# Core Algo objects
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/stages/deduplication/fuzzy/minhash.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def __init__( # noqa: PLR0913
pool: bool = True,
):
# Set ProcessingStage attributes
self._name = self.__class__.__name__
self._name = "min_hash_stage"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self._name = "min_hash_stage"
self._name = "minhash_stage"

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot , please address

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 0274d6d. Changed the minhash stage name from "min_hash_stage" to "minhash_stage" as suggested.

self._resources = Resources(gpus=1.0) # Requires 1 GPU

self.text_field = text_field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class IdentifyDuplicatesStage(ProcessingStage[FileGroupTask, FileGroupTask]):
def __post_init__(self):
"""Initialize parent class after dataclass initialization."""
super().__init__()
self._name = "RemovalStage"
self._name = "removal_stage"

self._batch_size = 10 # We want to load multiple clusters at once

Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/stages/deduplication/semantic/kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def __init__( # noqa: PLR0913
self.input_storage_options = self.read_kwargs.pop("storage_options", None)
self.output_storage_options = self.write_kwargs.pop("storage_options", None)

self._name = "KMeansStage"
self._name = "kmeans_stage"
self._resources = Resources(cpus=1.0, gpus=1.0)

def process(self, task: FileGroupTask) -> _EmptyTask:
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/stages/deduplication/semantic/pairwise.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def __init__( # noqa: PLR0913
check_disallowed_kwargs(self.write_kwargs, ["index"])
self.input_storage_options = self.read_kwargs.pop("storage_options", None) if self.read_kwargs else None
self.output_storage_options = self.write_kwargs.pop("storage_options", None) if self.write_kwargs else None
self._name = "PairwiseCosineSimilarityStage"
self._name = "pairwise_cosine_similarity_stage"
self._resources = Resources(cpus=1.0, gpus=1.0)

def process(self, task: FileGroupTask) -> FileGroupTask:
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/stages/deduplication/shuffle_utils/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class ShuffleStage(ProcessingStage[FileGroupTask, FileGroupTask]):
Whether the underlying rapidsmpf shuffler should collect shuffle statistics.
"""

_name = "ShuffleStage"
_name = "shuffle_stage"
_resources = Resources(gpus=1.0)

# Use BulkRapidsMPFShuffler directly
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/stages/text/deduplication/removal.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class TextDuplicatesRemovalStage(ProcessingStage[DocumentBatch, DocumentBatch]):
def __post_init__(self):
"""Initialize parent class after dataclass initialization."""
super().__init__()
self._name = "DuplicatesRemovalStage"
self._name = "text_duplicates_removal_stage"
self.read_kwargs = self.read_kwargs.copy() if self.read_kwargs else {}

def process(self, task: DocumentBatch) -> DocumentBatch:
Expand Down
10 changes: 9 additions & 1 deletion nemo_curator/stages/text/filters/doc_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import re
from abc import ABC, abstractmethod


def _camel_to_snake(name: str) -> str:
"""Convert CamelCase to snake_case."""
# Insert an underscore before any uppercase letter that follows a lowercase letter or digit
s1 = re.sub("([a-z0-9])([A-Z])", r"\1_\2", name)
return s1.lower()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot , We should use this for across the classes please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in commit 28305e9. The _camel_to_snake function is now centralized in base.py and used consistently across all filter and modifier classes instead of being duplicated.



class DocumentFilter(ABC):
"""
An abstract base class for text-based document filters.
Expand All @@ -26,7 +34,7 @@ class DocumentFilter(ABC):

def __init__(self):
super().__init__()
self._name = self.__class__.__name__
self._name = _camel_to_snake(self.__class__.__name__)
self._sentences = None
self._paragraphs = None
self._ngrams = None
Expand Down
10 changes: 9 additions & 1 deletion nemo_curator/stages/text/modifiers/doc_modifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import re
from abc import ABC, abstractmethod


def _camel_to_snake(name: str) -> str:
"""Convert CamelCase to snake_case."""
# Insert an underscore before any uppercase letter that follows a lowercase letter or digit
s1 = re.sub("([a-z0-9])([A-Z])", r"\1_\2", name)
return s1.lower()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repeated function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 28305e9. Removed the repeated function and now use a single centralized _camel_to_snake function from base.py that's imported by both DocumentFilter and DocumentModifier.



class DocumentModifier(ABC):
"""
Abstract base class for text-based document modifiers.
Expand All @@ -29,7 +37,7 @@ class DocumentModifier(ABC):

def __init__(self) -> None:
super().__init__()
self._name = self.__class__.__name__
self._name = _camel_to_snake(self.__class__.__name__)
self._sentences = None
self._paragraphs = None
self._ngrams = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def test_output_directory_cleanup(self, input_task: FileGroupTask, tmp_path: Pat
"""Test that existing output directory is cleaned up."""
output_dir = tmp_path / "output"

existing_dir = output_dir / "BucketsToEdgesStage"
existing_dir = output_dir / "buckets_to_edges_stage"
existing_dir.mkdir(parents=True)
existing_file = existing_dir / "existing.txt"
existing_file.write_text("This should be deleted")
Expand Down
6 changes: 3 additions & 3 deletions tests/stages/deduplication/fuzzy/test_fuzzy_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,11 @@ def test_fuzzy_dedup_no_duplicates(

workflow.run(initial_tasks=tasks)

assert not (cache_path / "ConnectedComponentsStage").exists()
assert not (cache_path / "BucketsToEdgesStage").exists()
assert not (cache_path / "connected_components_stage").exists()
assert not (cache_path / "buckets_to_edges_stage").exists()
Comment on lines +248 to +249
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ayushdg , Can you check this test please

assert not (output_path / DUPLICATE_IDS_SUBDIR).exists()

lsh_df = cudf.read_parquet(cache_path / "LSHStage")
lsh_df = cudf.read_parquet(cache_path / "lsh_stage")
assert len(lsh_df) == 0

def test_bad_inputs(self, tmp_path: Path) -> None:
Expand Down
Loading