Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/curate-video/process-data/dedup.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Duplicate removal operates on clip-level embeddings produced during processing:
## Before You Start

- Verify local paths or configure S3-compatible credentials. Provide `storage_options` in read/write keyword arguments when reading or writing cloud paths.
- Create output directories for `KMeansStage`, `PairwiseStage`, and `IdentifyDuplicatesStage`.
- Create output directories for the stages you'll use (`KMeansStage`, `PairwiseStage`, and `IdentifyDuplicatesStage`).

---

Expand Down
32 changes: 31 additions & 1 deletion nemo_curator/stages/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from __future__ import annotations

import re
from abc import ABC, ABCMeta, abstractmethod
from typing import TYPE_CHECKING, Any, Generic, TypeVar

Expand All @@ -31,6 +32,34 @@
_STAGE_REGISTRY: dict[str, type[ProcessingStage]] = {}


def _validate_stage_name(name: str) -> None:
"""Validate that stage name follows snake_case convention.

Args:
name: The stage name to validate

Raises:
ValueError: If stage name is not in snake_case format
"""
if not re.fullmatch(r"[a-z][a-z0-9_]*", name):
msg = f"Stage name must be snake_case, got '{name}'"
raise ValueError(msg)


def _camel_to_snake(name: str) -> str:
"""Convert CamelCase to snake_case.

Args:
name: The CamelCase string to convert

Returns:
The snake_case version of the input string
"""
# Insert an underscore before any uppercase letter that follows a lowercase letter or digit
s1 = re.sub("([a-z0-9])([A-Z])", r"\1_\2", name)
return s1.lower()


class StageMeta(ABCMeta):
"""Metaclass that automatically registers concrete Stage subclasses.
A class is considered *concrete* if it directly inherits from
Expand Down Expand Up @@ -80,7 +109,7 @@ class ProcessingStage(ABC, Generic[X, Y], metaclass=StageMeta):
"""

_is_abstract_root = True # prevent base from registering itself
_name = "ProcessingStage"
_name = "processing_stage" # Changed to snake_case
_resources = Resources(cpus=1.0)
_batch_size = 1

Expand Down Expand Up @@ -251,6 +280,7 @@ def with_(

# Override the instance attributes directly
if name is not None:
_validate_stage_name(name) # Validate the new name
new_instance._name = name
if resources is not None:
new_instance._resources = resources
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/stages/deduplication/exact/identification.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class ExactDuplicateIdentification(DeduplicationIO, ShuffleStage):
Whether the underlying rapidsmpf shuffler should collect shuffle statistics.
"""

_name = "ExactDuplicateIds"
_name = "exact_duplicate_ids"

def __init__( # noqa: PLR0913
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class BucketsToEdgesStage(ProcessingStage[FileGroupTask, FileGroupTask]):
Only the storage_options key is supported for now.
"""

_name = "BucketsToEdgesStage"
_name = "buckets_to_edges_stage"
_resources = Resources(cpus=1.0)

def __init__(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(
self.read_kwargs = read_kwargs if read_kwargs is not None else {}
self.write_kwargs = write_kwargs if write_kwargs is not None else {}

self._name = self.__class__.__name__
self._name = "connected_components_stage"
self._resources = Resources(cpus=1.0, gpus=1.0)
self._batch_size = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class IdentifyDuplicatesStage(ShuffleStage):
Whether the underlying rapidsmpf shuffler should collect shuffle statistics.
"""

_name = "IdentifyDuplicates"
_name = "identify_duplicates"

def __init__( # noqa: PLR0913
self,
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/stages/deduplication/fuzzy/lsh/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class LSHStage(ProcessingStage[FileGroupTask, FileGroupTask]):
If None, the number of partitions will be decided automatically by the executor as the closest power of 2 <= number of input tasks.
"""

_name = "LSHStage"
_name = "lsh_stage"
_resources = Resources(gpus=1.0)

# Core Algo objects
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/stages/deduplication/fuzzy/minhash.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def __init__( # noqa: PLR0913
pool: bool = True,
):
# Set ProcessingStage attributes
self._name = self.__class__.__name__
self._name = "minhash_stage"
self._resources = Resources(gpus=1.0) # Requires 1 GPU

self.text_field = text_field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class IdentifyDuplicatesStage(ProcessingStage[FileGroupTask, FileGroupTask]):
def __post_init__(self):
"""Initialize parent class after dataclass initialization."""
super().__init__()
self._name = "RemovalStage"
self._name = "removal_stage"

self._batch_size = 10 # We want to load multiple clusters at once

Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/stages/deduplication/semantic/kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def __init__( # noqa: PLR0913
self.input_storage_options = self.read_kwargs.pop("storage_options", None)
self.output_storage_options = self.write_kwargs.pop("storage_options", None)

self._name = "KMeansStage"
self._name = "kmeans_stage"
self._resources = Resources(cpus=1.0, gpus=1.0)

def process(self, task: FileGroupTask) -> _EmptyTask:
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/stages/deduplication/semantic/pairwise.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def __init__( # noqa: PLR0913
check_disallowed_kwargs(self.write_kwargs, ["index"])
self.input_storage_options = self.read_kwargs.pop("storage_options", None) if self.read_kwargs else None
self.output_storage_options = self.write_kwargs.pop("storage_options", None) if self.write_kwargs else None
self._name = "PairwiseCosineSimilarityStage"
self._name = "pairwise_cosine_similarity_stage"
self._resources = Resources(cpus=1.0, gpus=1.0)

def process(self, task: FileGroupTask) -> FileGroupTask:
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/stages/deduplication/shuffle_utils/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class ShuffleStage(ProcessingStage[FileGroupTask, FileGroupTask]):
Whether the underlying rapidsmpf shuffler should collect shuffle statistics.
"""

_name = "ShuffleStage"
_name = "shuffle_stage"
_resources = Resources(gpus=1.0)

# Use BulkRapidsMPFShuffler directly
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/stages/text/deduplication/removal.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class TextDuplicatesRemovalStage(ProcessingStage[DocumentBatch, DocumentBatch]):
def __post_init__(self):
"""Initialize parent class after dataclass initialization."""
super().__init__()
self._name = "DuplicatesRemovalStage"
self._name = "text_duplicates_removal_stage"
self.read_kwargs = self.read_kwargs.copy() if self.read_kwargs else {}

def process(self, task: DocumentBatch) -> DocumentBatch:
Expand Down
4 changes: 3 additions & 1 deletion nemo_curator/stages/text/filters/doc_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

from abc import ABC, abstractmethod

from nemo_curator.stages.base import _camel_to_snake


class DocumentFilter(ABC):
"""
Expand All @@ -26,7 +28,7 @@ class DocumentFilter(ABC):

def __init__(self):
super().__init__()
self._name = self.__class__.__name__
self._name = _camel_to_snake(self.__class__.__name__)
self._sentences = None
self._paragraphs = None
self._ngrams = None
Expand Down
4 changes: 3 additions & 1 deletion nemo_curator/stages/text/modifiers/doc_modifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

from abc import ABC, abstractmethod

from nemo_curator.stages.base import _camel_to_snake


class DocumentModifier(ABC):
"""
Expand All @@ -29,7 +31,7 @@ class DocumentModifier(ABC):

def __init__(self) -> None:
super().__init__()
self._name = self.__class__.__name__
self._name = _camel_to_snake(self.__class__.__name__)
self._sentences = None
self._paragraphs = None
self._ngrams = None
Expand Down
3 changes: 2 additions & 1 deletion nemo_curator/stages/video/preview/preview.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from loguru import logger

from nemo_curator.stages.base import ProcessingStage
from nemo_curator.stages.base import ProcessingStage, _camel_to_snake
from nemo_curator.stages.resources import Resources
from nemo_curator.tasks.video import Video, VideoTask, _Window
from nemo_curator.utils.operation_utils import make_pipeline_temporary_dir
Expand Down Expand Up @@ -46,6 +46,7 @@ def outputs(self) -> tuple[list[str], list[str]]:
return ["data"], ["clips"]

def __post_init__(self) -> None:
self._name = _camel_to_snake(self.__class__.__name__)
self._resources = Resources(cpus=self.num_cpus_per_worker)

def process(self, task: VideoTask) -> VideoTask:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def test_output_directory_cleanup(self, input_task: FileGroupTask, tmp_path: Pat
"""Test that existing output directory is cleaned up."""
output_dir = tmp_path / "output"

existing_dir = output_dir / "BucketsToEdgesStage"
existing_dir = output_dir / "buckets_to_edges_stage"
existing_dir.mkdir(parents=True)
existing_file = existing_dir / "existing.txt"
existing_file.write_text("This should be deleted")
Expand Down
6 changes: 3 additions & 3 deletions tests/stages/deduplication/fuzzy/test_fuzzy_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,11 @@ def test_fuzzy_dedup_no_duplicates(

workflow.run(initial_tasks=tasks)

assert not (cache_path / "ConnectedComponentsStage").exists()
assert not (cache_path / "BucketsToEdgesStage").exists()
assert not (cache_path / "connected_components_stage").exists()
assert not (cache_path / "buckets_to_edges_stage").exists()
Comment on lines +248 to +249
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ayushdg , Can you check this test please

assert not (output_path / DUPLICATE_IDS_SUBDIR).exists()

lsh_df = cudf.read_parquet(cache_path / "LSHStage")
lsh_df = cudf.read_parquet(cache_path / "lsh_stage")
assert len(lsh_df) == 0

def test_bad_inputs(self, tmp_path: Path) -> None:
Expand Down
Loading
Loading