diff --git a/docs/curate-video/process-data/dedup.md b/docs/curate-video/process-data/dedup.md index 6db1b1884..3cf327a3f 100644 --- a/docs/curate-video/process-data/dedup.md +++ b/docs/curate-video/process-data/dedup.md @@ -30,7 +30,7 @@ Duplicate removal operates on clip-level embeddings produced during processing: ## Before You Start - Verify local paths or configure S3-compatible credentials. Provide `storage_options` in read/write keyword arguments when reading or writing cloud paths. -- Create output directories for `KMeansStage`, `PairwiseStage`, and `IdentifyDuplicatesStage`. +- Create output directories for the stages you'll use (`KMeansStage`, `PairwiseStage`, and `IdentifyDuplicatesStage`). --- diff --git a/nemo_curator/stages/base.py b/nemo_curator/stages/base.py index 4ba96ce18..af8c01b7a 100644 --- a/nemo_curator/stages/base.py +++ b/nemo_curator/stages/base.py @@ -14,6 +14,7 @@ from __future__ import annotations +import re from abc import ABC, ABCMeta, abstractmethod from typing import TYPE_CHECKING, Any, Generic, TypeVar @@ -31,6 +32,34 @@ _STAGE_REGISTRY: dict[str, type[ProcessingStage]] = {} +def _validate_stage_name(name: str) -> None: + """Validate that stage name follows snake_case convention. + + Args: + name: The stage name to validate + + Raises: + ValueError: If stage name is not in snake_case format + """ + if not re.fullmatch(r"[a-z][a-z0-9_]*", name): + msg = f"Stage name must be snake_case, got '{name}'" + raise ValueError(msg) + + +def _camel_to_snake(name: str) -> str: + """Convert CamelCase to snake_case. + + Args: + name: The CamelCase string to convert + + Returns: + The snake_case version of the input string + """ + # Insert an underscore before any uppercase letter that follows a lowercase letter or digit + s1 = re.sub("([a-z0-9])([A-Z])", r"\1_\2", name) + return s1.lower() + + class StageMeta(ABCMeta): """Metaclass that automatically registers concrete Stage subclasses. A class is considered *concrete* if it directly inherits from @@ -80,7 +109,7 @@ class ProcessingStage(ABC, Generic[X, Y], metaclass=StageMeta): """ _is_abstract_root = True # prevent base from registering itself - _name = "ProcessingStage" + _name = "processing_stage" # Changed to snake_case _resources = Resources(cpus=1.0) _batch_size = 1 @@ -251,6 +280,7 @@ def with_( # Override the instance attributes directly if name is not None: + _validate_stage_name(name) # Validate the new name new_instance._name = name if resources is not None: new_instance._resources = resources diff --git a/nemo_curator/stages/deduplication/exact/identification.py b/nemo_curator/stages/deduplication/exact/identification.py index d5ab14c0b..87d134da6 100644 --- a/nemo_curator/stages/deduplication/exact/identification.py +++ b/nemo_curator/stages/deduplication/exact/identification.py @@ -65,7 +65,7 @@ class ExactDuplicateIdentification(DeduplicationIO, ShuffleStage): Whether the underlying rapidsmpf shuffler should collect shuffle statistics. """ - _name = "ExactDuplicateIds" + _name = "exact_duplicate_ids" def __init__( # noqa: PLR0913 self, diff --git a/nemo_curator/stages/deduplication/fuzzy/buckets_to_edges.py b/nemo_curator/stages/deduplication/fuzzy/buckets_to_edges.py index 9aa808618..153b31637 100644 --- a/nemo_curator/stages/deduplication/fuzzy/buckets_to_edges.py +++ b/nemo_curator/stages/deduplication/fuzzy/buckets_to_edges.py @@ -41,7 +41,7 @@ class BucketsToEdgesStage(ProcessingStage[FileGroupTask, FileGroupTask]): Only the storage_options key is supported for now. """ - _name = "BucketsToEdgesStage" + _name = "buckets_to_edges_stage" _resources = Resources(cpus=1.0) def __init__( diff --git a/nemo_curator/stages/deduplication/fuzzy/connected_components.py b/nemo_curator/stages/deduplication/fuzzy/connected_components.py index dc89473c3..1ba7c8fd6 100644 --- a/nemo_curator/stages/deduplication/fuzzy/connected_components.py +++ b/nemo_curator/stages/deduplication/fuzzy/connected_components.py @@ -59,7 +59,7 @@ def __init__( self.read_kwargs = read_kwargs if read_kwargs is not None else {} self.write_kwargs = write_kwargs if write_kwargs is not None else {} - self._name = self.__class__.__name__ + self._name = "connected_components_stage" self._resources = Resources(cpus=1.0, gpus=1.0) self._batch_size = None diff --git a/nemo_curator/stages/deduplication/fuzzy/identify_duplicates.py b/nemo_curator/stages/deduplication/fuzzy/identify_duplicates.py index 2d3ca95a6..67b31ba7f 100644 --- a/nemo_curator/stages/deduplication/fuzzy/identify_duplicates.py +++ b/nemo_curator/stages/deduplication/fuzzy/identify_duplicates.py @@ -58,7 +58,7 @@ class IdentifyDuplicatesStage(ShuffleStage): Whether the underlying rapidsmpf shuffler should collect shuffle statistics. """ - _name = "IdentifyDuplicates" + _name = "identify_duplicates" def __init__( # noqa: PLR0913 self, diff --git a/nemo_curator/stages/deduplication/fuzzy/lsh/stage.py b/nemo_curator/stages/deduplication/fuzzy/lsh/stage.py index fb9cfa9f8..67fdb9bdb 100644 --- a/nemo_curator/stages/deduplication/fuzzy/lsh/stage.py +++ b/nemo_curator/stages/deduplication/fuzzy/lsh/stage.py @@ -67,7 +67,7 @@ class LSHStage(ProcessingStage[FileGroupTask, FileGroupTask]): If None, the number of partitions will be decided automatically by the executor as the closest power of 2 <= number of input tasks. """ - _name = "LSHStage" + _name = "lsh_stage" _resources = Resources(gpus=1.0) # Core Algo objects diff --git a/nemo_curator/stages/deduplication/fuzzy/minhash.py b/nemo_curator/stages/deduplication/fuzzy/minhash.py index 0a06a0893..96b2eb788 100644 --- a/nemo_curator/stages/deduplication/fuzzy/minhash.py +++ b/nemo_curator/stages/deduplication/fuzzy/minhash.py @@ -240,7 +240,7 @@ def __init__( # noqa: PLR0913 pool: bool = True, ): # Set ProcessingStage attributes - self._name = self.__class__.__name__ + self._name = "minhash_stage" self._resources = Resources(gpus=1.0) # Requires 1 GPU self.text_field = text_field diff --git a/nemo_curator/stages/deduplication/semantic/identify_duplicates.py b/nemo_curator/stages/deduplication/semantic/identify_duplicates.py index 081c45c73..9c7352c5d 100644 --- a/nemo_curator/stages/deduplication/semantic/identify_duplicates.py +++ b/nemo_curator/stages/deduplication/semantic/identify_duplicates.py @@ -45,7 +45,7 @@ class IdentifyDuplicatesStage(ProcessingStage[FileGroupTask, FileGroupTask]): def __post_init__(self): """Initialize parent class after dataclass initialization.""" super().__init__() - self._name = "RemovalStage" + self._name = "removal_stage" self._batch_size = 10 # We want to load multiple clusters at once diff --git a/nemo_curator/stages/deduplication/semantic/kmeans.py b/nemo_curator/stages/deduplication/semantic/kmeans.py index fad9d9f10..e510990a8 100644 --- a/nemo_curator/stages/deduplication/semantic/kmeans.py +++ b/nemo_curator/stages/deduplication/semantic/kmeans.py @@ -112,7 +112,7 @@ def __init__( # noqa: PLR0913 self.input_storage_options = self.read_kwargs.pop("storage_options", None) self.output_storage_options = self.write_kwargs.pop("storage_options", None) - self._name = "KMeansStage" + self._name = "kmeans_stage" self._resources = Resources(cpus=1.0, gpus=1.0) def process(self, task: FileGroupTask) -> _EmptyTask: diff --git a/nemo_curator/stages/deduplication/semantic/pairwise.py b/nemo_curator/stages/deduplication/semantic/pairwise.py index 17c5d8fa2..3404aeed2 100644 --- a/nemo_curator/stages/deduplication/semantic/pairwise.py +++ b/nemo_curator/stages/deduplication/semantic/pairwise.py @@ -112,7 +112,7 @@ def __init__( # noqa: PLR0913 check_disallowed_kwargs(self.write_kwargs, ["index"]) self.input_storage_options = self.read_kwargs.pop("storage_options", None) if self.read_kwargs else None self.output_storage_options = self.write_kwargs.pop("storage_options", None) if self.write_kwargs else None - self._name = "PairwiseCosineSimilarityStage" + self._name = "pairwise_cosine_similarity_stage" self._resources = Resources(cpus=1.0, gpus=1.0) def process(self, task: FileGroupTask) -> FileGroupTask: diff --git a/nemo_curator/stages/deduplication/shuffle_utils/stage.py b/nemo_curator/stages/deduplication/shuffle_utils/stage.py index 40071e9f5..1c342bdd7 100644 --- a/nemo_curator/stages/deduplication/shuffle_utils/stage.py +++ b/nemo_curator/stages/deduplication/shuffle_utils/stage.py @@ -51,7 +51,7 @@ class ShuffleStage(ProcessingStage[FileGroupTask, FileGroupTask]): Whether the underlying rapidsmpf shuffler should collect shuffle statistics. """ - _name = "ShuffleStage" + _name = "shuffle_stage" _resources = Resources(gpus=1.0) # Use BulkRapidsMPFShuffler directly diff --git a/nemo_curator/stages/text/deduplication/removal.py b/nemo_curator/stages/text/deduplication/removal.py index f364688fc..2fb85a2f0 100644 --- a/nemo_curator/stages/text/deduplication/removal.py +++ b/nemo_curator/stages/text/deduplication/removal.py @@ -55,7 +55,7 @@ class TextDuplicatesRemovalStage(ProcessingStage[DocumentBatch, DocumentBatch]): def __post_init__(self): """Initialize parent class after dataclass initialization.""" super().__init__() - self._name = "DuplicatesRemovalStage" + self._name = "text_duplicates_removal_stage" self.read_kwargs = self.read_kwargs.copy() if self.read_kwargs else {} def process(self, task: DocumentBatch) -> DocumentBatch: diff --git a/nemo_curator/stages/text/filters/doc_filter.py b/nemo_curator/stages/text/filters/doc_filter.py index 96a6745d6..3a8509052 100644 --- a/nemo_curator/stages/text/filters/doc_filter.py +++ b/nemo_curator/stages/text/filters/doc_filter.py @@ -14,6 +14,8 @@ from abc import ABC, abstractmethod +from nemo_curator.stages.base import _camel_to_snake + class DocumentFilter(ABC): """ @@ -26,7 +28,7 @@ class DocumentFilter(ABC): def __init__(self): super().__init__() - self._name = self.__class__.__name__ + self._name = _camel_to_snake(self.__class__.__name__) self._sentences = None self._paragraphs = None self._ngrams = None diff --git a/nemo_curator/stages/text/modifiers/doc_modifier.py b/nemo_curator/stages/text/modifiers/doc_modifier.py index 76791fbab..5c681dbf5 100644 --- a/nemo_curator/stages/text/modifiers/doc_modifier.py +++ b/nemo_curator/stages/text/modifiers/doc_modifier.py @@ -14,6 +14,8 @@ from abc import ABC, abstractmethod +from nemo_curator.stages.base import _camel_to_snake + class DocumentModifier(ABC): """ @@ -29,7 +31,7 @@ class DocumentModifier(ABC): def __init__(self) -> None: super().__init__() - self._name = self.__class__.__name__ + self._name = _camel_to_snake(self.__class__.__name__) self._sentences = None self._paragraphs = None self._ngrams = None diff --git a/nemo_curator/stages/video/preview/preview.py b/nemo_curator/stages/video/preview/preview.py index b05b5e1ba..ca1d56fd0 100644 --- a/nemo_curator/stages/video/preview/preview.py +++ b/nemo_curator/stages/video/preview/preview.py @@ -18,7 +18,7 @@ from loguru import logger -from nemo_curator.stages.base import ProcessingStage +from nemo_curator.stages.base import ProcessingStage, _camel_to_snake from nemo_curator.stages.resources import Resources from nemo_curator.tasks.video import Video, VideoTask, _Window from nemo_curator.utils.operation_utils import make_pipeline_temporary_dir @@ -46,6 +46,7 @@ def outputs(self) -> tuple[list[str], list[str]]: return ["data"], ["clips"] def __post_init__(self) -> None: + self._name = _camel_to_snake(self.__class__.__name__) self._resources = Resources(cpus=self.num_cpus_per_worker) def process(self, task: VideoTask) -> VideoTask: diff --git a/tests/stages/deduplication/fuzzy/test_buckets_to_edges_stage.py b/tests/stages/deduplication/fuzzy/test_buckets_to_edges_stage.py index 7fa437e3d..1dc6e53b0 100644 --- a/tests/stages/deduplication/fuzzy/test_buckets_to_edges_stage.py +++ b/tests/stages/deduplication/fuzzy/test_buckets_to_edges_stage.py @@ -300,7 +300,7 @@ def test_output_directory_cleanup(self, input_task: FileGroupTask, tmp_path: Pat """Test that existing output directory is cleaned up.""" output_dir = tmp_path / "output" - existing_dir = output_dir / "BucketsToEdgesStage" + existing_dir = output_dir / "buckets_to_edges_stage" existing_dir.mkdir(parents=True) existing_file = existing_dir / "existing.txt" existing_file.write_text("This should be deleted") diff --git a/tests/stages/deduplication/fuzzy/test_fuzzy_workflow.py b/tests/stages/deduplication/fuzzy/test_fuzzy_workflow.py index 98200ceb1..2e1263c5b 100644 --- a/tests/stages/deduplication/fuzzy/test_fuzzy_workflow.py +++ b/tests/stages/deduplication/fuzzy/test_fuzzy_workflow.py @@ -245,11 +245,11 @@ def test_fuzzy_dedup_no_duplicates( workflow.run(initial_tasks=tasks) - assert not (cache_path / "ConnectedComponentsStage").exists() - assert not (cache_path / "BucketsToEdgesStage").exists() + assert not (cache_path / "connected_components_stage").exists() + assert not (cache_path / "buckets_to_edges_stage").exists() assert not (output_path / DUPLICATE_IDS_SUBDIR).exists() - lsh_df = cudf.read_parquet(cache_path / "LSHStage") + lsh_df = cudf.read_parquet(cache_path / "lsh_stage") assert len(lsh_df) == 0 def test_bad_inputs(self, tmp_path: Path) -> None: diff --git a/tests/stages/test_base.py b/tests/stages/test_base.py index 731fec962..92bdb3f09 100644 --- a/tests/stages/test_base.py +++ b/tests/stages/test_base.py @@ -37,7 +37,7 @@ def validate(self) -> bool: class ConcreteProcessingStage(ProcessingStage[MockTask, MockTask]): """Concrete implementation of ProcessingStage for testing.""" - _name = "ConcreteProcessingStage" + _name = "concrete_processing_stage" _resources = Resources(cpus=2.0) _batch_size = 2 @@ -64,9 +64,9 @@ def test_all(self): assert stage.resources == Resources(cpus=2.0) # Original unchanged # Test with name override - stage_with_name = stage.with_(name="CustomStage") - assert stage_with_name.name == "CustomStage" - assert stage.name == "ConcreteProcessingStage" # Original unchanged + stage_with_name = stage.with_(name="custom_stage") + assert stage_with_name.name == "custom_stage" + assert stage.name == "concrete_processing_stage" # Original unchanged def test_batch_size_override(self): """Test overriding batch_size parameter.""" @@ -81,14 +81,14 @@ def test_multiple_parameters(self): """Test overriding multiple parameters at once.""" stage = ConcreteProcessingStage() new_resources = Resources(cpus=3.0) - stage_new = stage.with_(name="MultiParamStage", resources=new_resources, batch_size=10) + stage_new = stage.with_(name="multi_param_stage", resources=new_resources, batch_size=10) - assert stage_new.name == "MultiParamStage" + assert stage_new.name == "multi_param_stage" assert stage_new.resources == Resources(cpus=3.0) assert stage_new.batch_size == 10 # Original should be unchanged - assert stage.name == "ConcreteProcessingStage" + assert stage.name == "concrete_processing_stage" assert stage.resources == Resources(cpus=2.0) assert stage.batch_size == 2 @@ -117,16 +117,16 @@ def test_chained_with_calls(self): stage = ConcreteProcessingStage() # Chain multiple with_ calls - result = stage.with_(name="ChainedStage").with_(batch_size=8).with_(resources=Resources(cpus=6.0)) + result = stage.with_(name="chained_stage").with_(batch_size=8).with_(resources=Resources(cpus=6.0)) # Should return a new instance, not the original assert result is not stage - assert result.name == "ChainedStage" + assert result.name == "chained_stage" assert result.batch_size == 8 assert result.resources == Resources(cpus=6.0) # Original should be unchanged - assert stage.name == "ConcreteProcessingStage" + assert stage.name == "concrete_processing_stage" assert stage.batch_size == 2 assert stage.resources == Resources(cpus=2.0) @@ -150,7 +150,7 @@ def worker(worker_id: int) -> None: # Call with_ to create a modified stage modified_stage = stage.with_( - name=f"Worker{worker_id}Stage", + name=f"worker_{worker_id}_stage", resources=Resources(cpus=float(worker_id + 1)), batch_size=worker_id + 10, ) @@ -205,7 +205,7 @@ def worker(worker_id: int) -> None: # Verify specific values for each worker for i in range(num_threads): - expected_name = f"Worker{i}Stage" + expected_name = f"worker_{i}_stage" expected_resources = Resources(cpus=float(i + 1)) expected_batch_size = i + 10 @@ -218,7 +218,7 @@ def test_class_variable_vs_instance_variable_isolation(self): # Create a stage that uses class-level defaults (no instance variables set) class MinimalStage(ProcessingStage[MockTask, MockTask]): - _name = "MinimalStage" + _name = "minimal_stage" # Note: _resources is not set, so it falls back to ProcessingStage._resources _batch_size = 1 @@ -269,7 +269,7 @@ def process(self, task: MockTask) -> MockTask: class MockStageA(ProcessingStage[MockTask, MockTask]): """Mock stage A for testing composite stages.""" - _name = "MockStageA" + _name = "mock_stage_a" _resources = Resources(cpus=1.0) _batch_size = 1 @@ -286,7 +286,7 @@ def outputs(self) -> tuple[list[str], list[str]]: class MockStageB(ProcessingStage[MockTask, MockTask]): """Mock stage B for testing composite stages.""" - _name = "MockStageB" + _name = "mock_stage_b" _resources = Resources(cpus=2.0) _batch_size = 2 @@ -303,7 +303,7 @@ def outputs(self) -> tuple[list[str], list[str]]: class MockStageC(ProcessingStage[MockTask, MockTask]): """Mock stage C for testing composite stages.""" - _name = "MockStageC" + _name = "mock_stage_c" _resources = Resources(cpus=3.0) _batch_size = 3 @@ -320,7 +320,7 @@ def outputs(self) -> tuple[list[str], list[str]]: class ConcreteCompositeStage(CompositeStage[MockTask, MockTask]): """Concrete implementation of CompositeStage for testing.""" - _name = "ConcreteCompositeStage" + _name = "concrete_composite_stage" def decompose(self) -> list[ProcessingStage]: """Return a list of mock stages for testing.""" @@ -339,7 +339,7 @@ def test_single_operation(self): assert len(composite._with_operations) == 0 # Add a with operation - stage_config = {"MockStageA": {"name": "CustomStageA", "resources": Resources(cpus=5.0)}} + stage_config = {"mock_stage_a": {"name": "custom_stage_a", "resources": Resources(cpus=5.0)}} result = composite.with_(stage_config) # Should return the same instance (mutating pattern) @@ -352,27 +352,27 @@ def test_single_operation(self): assert len(modified_stages) == 3 # The first stage should have the applied configuration - assert modified_stages[0].name == "CustomStageA" + assert modified_stages[0].name == "custom_stage_a" assert modified_stages[0].resources == Resources(cpus=5.0) # The other stages should not have been modified - assert modified_stages[1].name == "MockStageB" + assert modified_stages[1].name == "mock_stage_b" assert modified_stages[1].resources == Resources(cpus=2.0) - assert modified_stages[2].name == "MockStageC" + assert modified_stages[2].name == "mock_stage_c" assert modified_stages[2].resources == Resources(cpus=3.0) @pytest.mark.parametrize( "configs", [ { - "MockStageA": {"name": "CustomStageA", "resources": Resources(cpus=6.0)}, - "MockStageB": {"resources": Resources(cpus=10.0), "batch_size": 8}, - "MockStageC": {"name": "CustomStageC", "resources": Resources(cpus=9.0), "batch_size": 10}, + "mock_stage_a": {"name": "custom_stage_a", "resources": Resources(cpus=6.0)}, + "mock_stage_b": {"resources": Resources(cpus=10.0), "batch_size": 8}, + "mock_stage_c": {"name": "custom_stage_c", "resources": Resources(cpus=9.0), "batch_size": 10}, }, [ - {"MockStageA": {"name": "CustomStageA", "resources": Resources(cpus=6.0)}}, - {"MockStageB": {"resources": Resources(cpus=10.0), "batch_size": 8}}, - {"MockStageC": {"name": "CustomStageC", "resources": Resources(cpus=9.0), "batch_size": 10}}, + {"mock_stage_a": {"name": "custom_stage_a", "resources": Resources(cpus=6.0)}}, + {"mock_stage_b": {"resources": Resources(cpus=10.0), "batch_size": 8}}, + {"mock_stage_c": {"name": "custom_stage_c", "resources": Resources(cpus=9.0), "batch_size": 10}}, ], ], ) @@ -398,15 +398,15 @@ def test_multiple_operations(self, configs: dict | list[dict]): stage_b = modified_stages[1] stage_c = modified_stages[2] - assert stage_a.name == "CustomStageA" + assert stage_a.name == "custom_stage_a" assert stage_a.resources == Resources(cpus=6.0) assert stage_a.batch_size == 1 # Not modified - assert stage_b.name == "MockStageB" # Not modified + assert stage_b.name == "mock_stage_b" # Not modified assert stage_b.resources == Resources(cpus=10.0) assert stage_b.batch_size == 8 - assert stage_c.name == "CustomStageC" + assert stage_c.name == "custom_stage_c" assert stage_c.resources == Resources(cpus=9.0) assert stage_c.batch_size == 10 @@ -415,15 +415,15 @@ def test_multiple_operations(self, configs: dict | list[dict]): [ ( { - "MockStageA": {"name": "CustomStageA"}, - "CustomStageA": {"name": "CustomStageA_2", "resources": Resources(cpus=7.0)}, + "mock_stage_a": {"name": "custom_stage_a"}, + "custom_stage_a": {"name": "custom_stage_a_2", "resources": Resources(cpus=7.0)}, }, True, ), ( [ - {"MockStageA": {"name": "CustomStageA"}}, - {"CustomStageA": {"name": "CustomStageA_2", "resources": Resources(cpus=7.0)}}, + {"mock_stage_a": {"name": "custom_stage_a"}}, + {"custom_stage_a": {"name": "custom_stage_a_2", "resources": Resources(cpus=7.0)}}, ], False, ), @@ -441,9 +441,9 @@ def test_multiple_operations_with_name_changed(self, configs: dict | list[dict], composite.with_(config) if should_fail: - # The first config should fail because it tries to reference "CustomStageA" - # in the same operation where "MockStageA" is being renamed to "CustomStageA" - with pytest.raises(ValueError, match="Stage CustomStageA not found in composite stage"): + # The first config should fail because it tries to reference "custom_stage_a" + # in the same operation where "mock_stage_a" is being renamed to "custom_stage_a" + with pytest.raises(ValueError, match="Stage custom_stage_a not found in composite stage"): composite._apply_with_(stages) else: # The second config should work because it applies operations sequentially @@ -451,16 +451,16 @@ def test_multiple_operations_with_name_changed(self, configs: dict | list[dict], assert len(modified_stages) == 3 - assert modified_stages[0].name == "CustomStageA_2" # Should reflect the latest name + assert modified_stages[0].name == "custom_stage_a_2" # Should reflect the latest name assert modified_stages[0].resources == Resources(cpus=7.0) # Should reflect the latest resources assert modified_stages[0].batch_size == 1 # Should not be modified - # MockStageB / MockStageC should not be modified - assert modified_stages[1].name == "MockStageB" + # mock_stage_b / mock_stage_c should not be modified + assert modified_stages[1].name == "mock_stage_b" assert modified_stages[1].resources == Resources(cpus=2.0) assert modified_stages[1].batch_size == 2 - assert modified_stages[2].name == "MockStageC" + assert modified_stages[2].name == "mock_stage_c" assert modified_stages[2].resources == Resources(cpus=3.0) assert modified_stages[2].batch_size == 3 @@ -471,7 +471,7 @@ def test_apply_with_non_unique_stage_names_error(self): # Create stages with duplicate names duplicate_stages = [MockStageA(), MockStageA(), MockStageB()] - config = {"MockStageA": {"name": "CustomStageA"}} + config = {"mock_stage_a": {"name": "custom_stage_a"}} composite.with_(config) # Should raise ValueError due to non-unique names @@ -486,13 +486,13 @@ def test_apply_with_unknown_stage_name_error(self): stages = composite.decompose() # Configure an unknown stage - config = {"UnknownStage": {"name": "CustomStage"}} + config = {"unknown_stage": {"name": "custom_stage"}} composite.with_(config) # Should raise ValueError due to unknown stage name import pytest - with pytest.raises(ValueError, match="Stage UnknownStage not found in composite stage"): + with pytest.raises(ValueError, match="Stage unknown_stage not found in composite stage"): composite._apply_with_(stages) def test_apply_with_empty_operations(self): @@ -518,3 +518,113 @@ def test_composite_stage_inputs_and_outputs(self): # outputs() should return the last stage's outputs assert composite.outputs() == composite.decompose()[-1].outputs() + + +class TestStageNamingValidation: + """Test stage naming validation according to snake_case convention.""" + + def test_validate_stage_name_function(self): + """Test the _validate_stage_name function with valid and invalid names.""" + from nemo_curator.stages.base import _validate_stage_name + + # Valid snake_case names should pass + valid_names = [ + "video_reader", + "pairwise_file_partitioning", + "boilerplate_string_ratio", + "duplicates_removal_stage", + "lsh_stage", + "identify_duplicates", + "a", + "test_123", + "another_test_name_with_numbers_456" + ] + + for name in valid_names: + _validate_stage_name(name) # Should not raise + + # Invalid names should fail + invalid_names = [ + "DuplicatesRemovalStage", # CamelCase + "LSHStage", # CamelCase + "IdentifyDuplicates", # CamelCase + "PairwiseCosineSimilarityStage", # CamelCase + "KMeansStage", # CamelCase + "123invalid", # starts with number + "invalid-name", # has hyphen + "invalid name", # has space + "", # empty + "Invalid_Name", # has uppercase + "invalidName", # camelCase + ] + + for name in invalid_names: + with pytest.raises(ValueError, match="Stage name must be snake_case"): + _validate_stage_name(name) + + def test_with_method_validates_stage_name(self): + """Test that the with_ method validates stage names.""" + stage = ConcreteProcessingStage() + + # Valid snake_case name should work + valid_stage = stage.with_(name="valid_snake_case_name") + assert valid_stage.name == "valid_snake_case_name" + + # Invalid names should raise ValueError + invalid_names = [ + "InvalidCamelCase", + "invalid-hyphen", + "invalid space", + "123starts_with_number", + "", + "Invalid_Mixed_Case" + ] + + for invalid_name in invalid_names: + with pytest.raises(ValueError, match="Stage name must be snake_case"): + stage.with_(name=invalid_name) + + def test_existing_stage_names_are_snake_case(self): + """Test that all stage instances follow snake_case convention.""" + import re + + # Test the mock stages we defined in this file + mock_stages = [ + ConcreteProcessingStage(), + MockStageA(), + MockStageB(), + MockStageC() + ] + + snake_case_pattern = re.compile(r"[a-z][a-z0-9_]*") + + for stage in mock_stages: + assert snake_case_pattern.fullmatch(stage.name), f"Stage {stage.__class__.__name__} has invalid name: {stage.name}" + + def test_stage_name_pattern_comprehensive(self): + """Test comprehensive patterns for stage naming.""" + from nemo_curator.stages.base import _validate_stage_name + + # Test edge cases + valid_edge_cases = [ + "a", # single letter + "a1", # letter followed by number + "a_", # letter followed by underscore + "a_1", # letter, underscore, number + "test_name_with_many_underscores_123" # complex valid name + ] + + for name in valid_edge_cases: + _validate_stage_name(name) # Should not raise + + # Note: double underscore is actually valid by our regex, so let's test what actually fails + definitely_invalid = [ + "_invalid", # starts with underscore + "1invalid", # starts with number + "A", # single uppercase letter + "test_Name", # contains uppercase + ] + + for name in definitely_invalid: + with pytest.raises(ValueError, match="Stage name must be snake_case"): + _validate_stage_name(name) diff --git a/tests/stages/test_function_decorators.py b/tests/stages/test_function_decorators.py index b7bdb5fa4..114a60d41 100644 --- a/tests/stages/test_function_decorators.py +++ b/tests/stages/test_function_decorators.py @@ -42,7 +42,7 @@ def validate(self) -> bool: resources_inc = Resources(cpus=1.5) -@processing_stage(name="IncrementStage", resources=resources_inc, batch_size=4) +@processing_stage(name="increment_stage", resources=resources_inc, batch_size=4) def increment_stage(task: MockTask) -> MockTask: task.data += 1 return task @@ -52,7 +52,7 @@ def increment_stage(task: MockTask) -> MockTask: resources_dup = Resources(cpus=0.5) -@processing_stage(name="DuplicateStage", resources=resources_dup, batch_size=2) +@processing_stage(name="duplicate_stage", resources=resources_dup, batch_size=2) def duplicate_stage(task: MockTask) -> list[MockTask]: return [task, task] @@ -72,7 +72,7 @@ def test_instance_properties(self) -> None: stage = increment_stage # Decorator replaces the function with an instance assert isinstance(stage, ProcessingStage) - assert stage.name == "IncrementStage" + assert stage.name == "increment_stage" assert stage.resources == resources_inc assert stage.batch_size == 4 @@ -130,5 +130,5 @@ def bad_stage(task: MockTask, _: int): # type: ignore[valid-type] # noqa: ANN2 def test_stage_registry(self) -> None: """Uses get_stage_class to ensure that stage names are in the _STAGE_REGISTRY.""" - assert get_stage_class("IncrementStage") is not None - assert get_stage_class("DuplicateStage") is not None + assert get_stage_class("increment_stage") is not None + assert get_stage_class("duplicate_stage") is not None diff --git a/tests/stages/text/modules/test_filters.py b/tests/stages/text/modules/test_filters.py index 04e73ea1c..84d40f713 100644 --- a/tests/stages/text/modules/test_filters.py +++ b/tests/stages/text/modules/test_filters.py @@ -794,7 +794,7 @@ def test_pornographicurls(self) -> None: expected_data = DocumentBatch( data=pd.DataFrame({"text": ["no url", "fine url https://www.nvidia.com/en-us/"]}), - task_id="batch_1_PornographicUrlsFilter", + task_id="batch_1_pornographic_urls_filter", dataset_name="test_1", ) assert all_equal(expected_data, filtered_data), f"Expected {expected_data} but got {filtered_data}" @@ -970,7 +970,7 @@ def test_filter_dataset_prefix(self) -> None: # Expect only those records where the text starts with "Hello". expected_dataset = DocumentBatch( data=pd.DataFrame({"text": ["Hello world", "Hello everyone"]}), - task_id="batch_1_SubstringFilter", + task_id="batch_1_substring_filter", dataset_name="test_1", ) @@ -993,7 +993,7 @@ def test_filter_dataset_suffix(self) -> None: # Expect only those records that end with "end". expected_dataset = DocumentBatch( data=pd.DataFrame({"text": ["This is the end", "Not matching end", "The end"]}), - task_id="batch_1_SubstringFilter", + task_id="batch_1_substring_filter", dataset_name="test_1", ) @@ -1010,7 +1010,7 @@ def test_filter_dataset_any(self) -> None: # Expect documents that contain "test" anywhere. expected_dataset = DocumentBatch( data=pd.DataFrame({"text": ["test case", "This is a testcase", "another test"]}), - task_id="batch_1_SubstringFilter", + task_id="batch_1_substring_filter", dataset_name="test_1", ) @@ -1263,7 +1263,7 @@ def test_fake_quality_filter(self) -> None: expected_data = DocumentBatch( data=pd.DataFrame({"text": ["b", "c", "d"]}), - task_id="batch_1_FakeQualityFilter", + task_id="batch_1_fake_quality_filter", dataset_name="test_1", ) assert all_equal(expected_data, filtered_data), f"Expected {expected_data} but got {filtered_data}" @@ -1276,7 +1276,7 @@ def test_fake_langid_filter(self) -> None: expected_data = DocumentBatch( data=pd.DataFrame({"text": ["a", "b", "d"]}), - task_id="batch_1_FakeLangId", + task_id="batch_1_fake_lang_id", dataset_name="test_1", ) assert all_equal(expected_data, filtered_data), f"Expected {expected_data} but got {filtered_data}" diff --git a/tests/stages/text/modules/test_modifiers.py b/tests/stages/text/modules/test_modifiers.py index 124142f45..bc333058b 100644 --- a/tests/stages/text/modules/test_modifiers.py +++ b/tests/stages/text/modules/test_modifiers.py @@ -492,7 +492,7 @@ def test_docmodifier_single_normalization_and_name(self) -> None: assert len(m.modifier_fn) == 1 assert m.modifier_fn[0] is mod assert m._input_fields == [["text"]] - assert m.name == "MarkdownRemover" + assert m.name == "markdown_remover" def test_mixed_modifiers_with_text_fields_preserved(self) -> None: def fn(s: str) -> str: @@ -503,7 +503,7 @@ def fn(s: str) -> str: assert m._input_fields == [["a"], ["b"]] assert m.modifier_fn[0] is fn assert m.modifier_fn[1] is mod - expected_name = f"modifier_chain_of_{fn.__name__}_MarkdownRemover" + expected_name = f"modifier_chain_of_{fn.__name__}_markdown_remover" assert m.name == expected_name def test_raises_when_single_modifier_with_multiple_text_fields(self) -> None: diff --git a/tests/stages/video/preview/test_preview.py b/tests/stages/video/preview/test_preview.py index c30496370..efc280923 100644 --- a/tests/stages/video/preview/test_preview.py +++ b/tests/stages/video/preview/test_preview.py @@ -354,15 +354,15 @@ def test_with_method(self): stage = PreviewStage() # Test modifying parameters that are supported by the base class - new_stage = stage.with_(name="CustomPreviewStage", resources=Resources(cpus=8.0), batch_size=5) + new_stage = stage.with_(name="custom_preview_stage", resources=Resources(cpus=8.0), batch_size=5) # Verify new instance has modified values - assert new_stage.name == "CustomPreviewStage" + assert new_stage.name == "custom_preview_stage" assert new_stage.resources == Resources(cpus=8.0) assert new_stage.batch_size == 5 # Verify original instance is unchanged - assert stage.name == "ProcessingStage" # Default name from base class + assert stage.name == "preview_stage" # Default name from class name assert stage.resources == Resources(cpus=4.0) assert stage.batch_size == 1