Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions catalog/plugins.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
"specifier": ">=3.10"
},
"data_designer": {
"requirement": "data-designer>=0.5.7",
"specifier": ">=0.5.7",
"requirement": "data-designer>=0.6.1",
"specifier": ">=0.6.1",
"marker": null
}
},
Expand Down
4 changes: 2 additions & 2 deletions docs/catalog-schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ The top-level document must contain `schema_version` and `packages`:
"specifier": ">=3.10"
},
"data_designer": {
"requirement": "data-designer>=0.5.7",
"specifier": ">=0.5.7",
"requirement": "data-designer>=0.6.1",
"specifier": ">=0.6.1",
"marker": null
}
},
Expand Down
4 changes: 2 additions & 2 deletions docs/catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ after installation.
"specifier": ">=3.10"
},
"data_designer": {
"requirement": "data-designer>=0.5.7",
"specifier": ">=0.5.7",
"requirement": "data-designer>=0.6.1",
"specifier": ">=0.6.1",
"marker": null
}
},
Expand Down
34 changes: 27 additions & 7 deletions plugins/data-designer-retrieval-sdg/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,27 @@ via `[project.entry-points."data_designer.plugins"]`:
Both are registered automatically through Python entry points when the
package is installed (see [Installation](#installation)).

## Native async (`DATA_DESIGNER_ASYNC_ENGINE=1`)
## Native async and resumable generation

`embedding-dedup` implements `agenerate()` directly on top of
`model.agenerate_text_embeddings`, so the column participates in
DataDesigner's async cell-level scheduler whenever the env var is set:
DataDesigner's async cell-level scheduler.

The `generate` command uses DataDesigner's native resumable generation.
Use a stable `--artifact-path`, `--dataset-name`, and `--buffer-size`, then
resume an interrupted run with `--resume always`:

```bash
export DATA_DESIGNER_ASYNC_ENGINE=1
data-designer-retrieval-sdg generate ...
data-designer-retrieval-sdg generate \
--input-dir ./my_documents \
--output-dir ./generated_output \
--dataset-name my_retrieval_run \
--buffer-size 200 \
--resume always
```

The async engine requires Python 3.11+; without the env var the package
runs on Python 3.10+ via the framework's sync bridge.
Use `--resume if_possible` to resume only when the saved config matches and
otherwise start a fresh run.

## Installation

Expand Down Expand Up @@ -91,16 +99,28 @@ uv run data-designer-retrieval-sdg generate --help
data-designer-retrieval-sdg generate \
--input-dir ./my_documents \
--output-dir ./generated_output \
--dataset-name my_retrieval_run \
--buffer-size 200 \
--resume if_possible \
--num-pairs 7
```

Generation writes DataDesigner artifacts under `--artifact-path` and exports a
single JSONL file to `--output-dir`.

### Convert to training format

```bash
data-designer-retrieval-sdg convert ./generated_output \
data-designer-retrieval-sdg convert ./generated_output/my_retrieval_run.jsonl \
--corpus-id my_corpus
```

Legacy `generated_batch*.json` directories remain supported by `convert`, but
`generate` no longer writes per-batch JSON files. The old manual restart flags
`--batch-size`, `--start-batch-index`, and `--end-batch-index` were removed
because DataDesigner now owns checkpointing through `--buffer-size` and
`--resume`.

### Use as a library

```python
Expand Down
2 changes: 1 addition & 1 deletion plugins/data-designer-retrieval-sdg/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ version = "0.1.0"
description = "Retriever SDG toolkit: registers the embedding-dedup column generator and document-chunker seed reader, plus a multi-step QA generation pipeline, CLI, and Automodel-compatible data conversion"
requires-python = ">=3.10"
dependencies = [
"data-designer>=0.5.7",
"data-designer>=0.6.1",
"nltk>=3.9.2",
"pyyaml>=6.0",
"pyarrow>=14.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@
- ``generate`` -- run the full SDG pipeline on a directory of text files
- ``convert`` -- convert raw SDG output to Automodel-compatible formats

The ``generate`` subcommand drives a per-batch loop so each batch's output
is checkpointed to its own JSON file (resumable across crashes). The
batching wraps DataDesigner's native ``IndexRange`` selection strategy
applied to a :class:`DocumentChunkerSeedSource`; the framework owns
discovery, chunking, and async cell scheduling (when
``DATA_DESIGNER_ASYNC_ENGINE=1`` is set).
The ``generate`` subcommand runs the full pipeline through DataDesigner's
native resumable generation support. The framework owns discovery, chunking,
checkpointing, and async cell scheduling.
"""

from __future__ import annotations
Expand All @@ -26,6 +23,7 @@
import data_designer.config as dd
from data_designer.engine.resources.seed_reader import SeedReaderError
from data_designer.engine.secret_resolver import PlaintextResolver
from data_designer.engine.storage.artifact_storage import ResumeMode
from data_designer.interface import DataDesigner
from data_designer.logging import LoggerConfig, LoggingConfig, OutputConfig, configure_logging

Expand Down Expand Up @@ -98,9 +96,15 @@ def _add_generate_parser(subparsers: argparse._SubParsersAction) -> None:
p.add_argument("--similarity-threshold", type=float, default=0.9, help="Cosine threshold for QA-pair dedup")
p.add_argument("--preview", action="store_true", help="Preview without full generation")
p.add_argument("--artifact-path", type=Path, default=Path("./artifacts"), help="DD artifact path")
p.add_argument("--batch-size", type=int, default=200, help="Records per batch")
p.add_argument("--start-batch-index", type=int, default=0, help="Batch index to start from")
p.add_argument("--end-batch-index", type=int, default=-1, help="Batch index to end at (exclusive)")
p.add_argument("--dataset-name", default=None, help="Stable DD dataset name for artifacts and resume")
p.add_argument("--buffer-size", type=int, default=200, help="DataDesigner checkpoint buffer size")
p.add_argument(
"--resume",
"-r",
choices=[mode.value for mode in ResumeMode],
default=ResumeMode.NEVER.value,
help="Resume behavior for interrupted generation runs",
)

g = p.add_argument_group("multi-document bundling")
g.add_argument("--multi-doc", action="store_true", help="Enable multi-doc bundling")
Expand Down Expand Up @@ -167,34 +171,22 @@ def _run_generate(args: argparse.Namespace) -> None:
)

data_designer = DataDesigner(artifact_path=args.artifact_path, model_providers=model_providers)
data_designer.set_run_config(dd.RunConfig(disable_early_shutdown=True))
data_designer.set_run_config(dd.RunConfig(disable_early_shutdown=True, buffer_size=args.buffer_size))

args.output_dir.mkdir(parents=True, exist_ok=True)

num_batches = (total_records + args.batch_size - 1) // args.batch_size
actual_end_batch = num_batches if args.end_batch_index == -1 else min(args.end_batch_index, num_batches)

pipeline_kwargs = _pipeline_kwargs(args)
_print_model_config(args, custom_providers)

if args.preview:
_run_preview(data_designer, seed_source, total_records, args, pipeline_kwargs)
return

_run_batches(
data_designer,
seed_source,
total_records,
num_batches,
args.start_batch_index,
actual_end_batch,
args,
pipeline_kwargs,
)
_run_create(data_designer, seed_source, total_records, args, pipeline_kwargs)


def _pipeline_kwargs(args: argparse.Namespace) -> dict:
"""Collect pipeline-builder keyword arguments shared between preview and batch runs."""
"""Collect pipeline-builder keyword arguments shared between preview and create runs."""
return {
"max_artifacts_per_type": args.max_artifacts_per_type,
"num_pairs": args.num_pairs,
Expand Down Expand Up @@ -238,7 +230,7 @@ def _run_preview(
config_builder = build_qa_generation_pipeline(
seed_source=seed_source,
start_index=0,
end_index=min(args.batch_size - 1, total_records - 1),
end_index=min(args.buffer_size - 1, total_records - 1),
**pipeline_kwargs,
)
print("\nPreviewing generation...")
Expand All @@ -249,51 +241,40 @@ def _run_preview(
logger.warning("Preview error: %s", e)


def _run_batches(
def _run_create(
data_designer: DataDesigner,
seed_source: DocumentChunkerSeedSource,
total_records: int,
num_batches: int,
start_batch: int,
end_batch: int,
args: argparse.Namespace,
pipeline_kwargs: dict,
) -> None:
"""Process the pipeline in batches, writing one JSON per batch."""
"""Run full generation once and export the resulting dataset as JSONL."""
print(f"\nTotal records: {total_records}")
print(f"Batch size: {args.batch_size}")
print(f"Total batches: {num_batches}")
print(f"Starting from batch index: {start_batch}")
print(f"Ending at batch index: {end_batch} (exclusive)")

for batch_idx in range(start_batch, end_batch):
start_idx = batch_idx * args.batch_size
end_idx = min(start_idx + args.batch_size - 1, total_records - 1)
num_in_batch = end_idx - start_idx + 1

print(f"\n{'=' * 60}")
print(f"Processing batch {batch_idx}/{num_batches - 1} (records {start_idx}-{end_idx})")
print(f"{'=' * 60}")

config_builder = build_qa_generation_pipeline(
seed_source=seed_source,
start_index=start_idx,
end_index=end_idx,
**pipeline_kwargs,
)
print(f"Buffer size: {args.buffer_size}")
print(f"Resume mode: {args.resume}")

config_builder = build_qa_generation_pipeline(
seed_source=seed_source,
start_index=0,
end_index=total_records - 1,
**pipeline_kwargs,
)
Comment thread
shan-nvidia marked this conversation as resolved.

input_basename = args.input_dir.name
dataset_name = f"{input_basename}_batch{batch_idx}_{start_idx}_{end_idx}"
result = data_designer.create(config_builder, num_records=num_in_batch, dataset_name=dataset_name)
generated_df = result.load_dataset()
Comment thread
eric-tramel marked this conversation as resolved.
dataset_name = args.dataset_name or args.input_dir.name or "retrieval_sdg"
print(f"Dataset name: {dataset_name}")
print("\nGenerating dataset...")
result = data_designer.create(
config_builder,
num_records=total_records,
dataset_name=dataset_name,
resume=ResumeMode(args.resume),
)

output_filename = f"generated_batch{batch_idx}_{start_idx}_{end_idx}.json"
generated_df.to_json(args.output_dir / output_filename, orient="records", indent=2)
print(f"Saved {output_filename} ({len(generated_df)} records)")
output_path = args.output_dir / f"{result.artifact_storage.resolved_dataset_name}.jsonl"
result.export(output_path, format="jsonl")

print(f"\n{'=' * 60}")
print(f"Generation complete! All batches saved to {args.output_dir}")
print(f"Total batches processed: {end_batch - start_batch}")
print(f"\nGeneration complete! Artifacts saved to {result.artifact_storage.base_dataset_path}")
print(f"Exported JSONL to {output_path}")


def _add_convert_parser(subparsers: argparse._SubParsersAction) -> None:
Expand All @@ -304,7 +285,7 @@ def _add_convert_parser(subparsers: argparse._SubParsersAction) -> None:
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)

p.add_argument("input_path", help="Path to JSON file or directory of batch files")
p.add_argument("input_path", help="Path to generated JSONL/JSON/parquet file or output directory")
p.add_argument("--corpus-id", required=True, help="Corpus identifier")
p.add_argument("--output-dir", default=None, help="Output directory")
p.add_argument("--eval-only", action="store_true", help="BEIR eval only (no train/val)")
Expand Down
Loading
Loading