Skip to content

Add Step DSL (Source/Flow/Merge/Split/Sink) for Spark jobs#314

Open
em3s wants to merge 10 commits into
mainfrom
feat/step-dsl-313
Open

Add Step DSL (Source/Flow/Merge/Split/Sink) for Spark jobs#314
em3s wants to merge 10 commits into
mainfrom
feat/step-dsl-313

Conversation

@em3s
Copy link
Copy Markdown
Contributor

@em3s em3s commented May 10, 2026

Summary

Introduces the in-job Step model for Spark jobs. Source / Flow / Merge / Split / Sink compose into a DAG via either a type-state Scala DSL or an inline YAML steps: list interpreted by StepsRunnerJob.

Inner counterpart to the workflow DSL ADR (#310), which covers job-to-job relationships only.

Closes #313

Depends on #334 — re-removes the SparkConventionsPlugin HBase regression carried into main via PR #309. This branch absorbs the same change locally so CI passes before #334 lands.

Changes

  • Step sealed trait — Source (0→1), Flow (1→1), Merge (N→1), Split (1→M), Sink (1→0)
  • Type-state Plan DSL — ~>, +, .as, fanOut, forked("port")
  • Executor — AST identity memo, fanOut cache/unpersist, Split per-execution memo, plan-time validation
  • StepsRunnerJob + StepsBuilder for inline YAML chains
  • Built-ins: FileSource, SampleSource, SqlMerge, CacheFlow, FileSink, ShowSink
  • Tests for each step, builder, and DSL

How to Test

./gradlew :pipeline:test
./gradlew :pipeline:spotlessCheck

AI Assistance

  • This PR was written largely with AI assistance.
    • Tool / model: Claude Code (Opus 4.7, 1M context)

Introduces the in-job Step model and its YAML inline form for a single
Spark job's data flow, complementing the workflow DSL (#310) which covers
job-to-job relationships.

- Step sealed trait: Source / Transform / Sink with arities 0->1, N->1, 1->0
- Plan type-state DSL: ~>, +, .as, fanOut
- Executor: AST-level memoization, fanOut cache/unpersist
- StepsRunnerJob + StepsBuilder + ClassResolver for inline YAML chains
- Built-ins: FileSource, SampleSource, SqlTransform, CacheTransform,
  FileSink, ShowSink

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
@dosubot dosubot Bot added size:XL This PR changes 500-999 lines, ignoring generated files. enhancement New feature or request labels May 10, 2026
@em3s em3s marked this pull request as draft May 10, 2026 14:48
em3s and others added 3 commits May 12, 2026 22:53
…n errors

- Executor.memo now keys on AST node identity (java.util.IdentityHashMap),
  making the "shared upstream materializes once" semantics explicit and
  preventing accidental dedup of two distinct Sources with equal args.
- StepsBuilder rejects `as:` on Sink steps up front (sinks produce no
  output to reference) and replaces `var prev: Ast = null` with
  `Option[Ast]`.
- StepsBuilder.instantiate wraps mapper.convertValue failures with the
  step name and args so binding errors surface with context instead of
  a raw Jackson stack.
- Drops redundant @transient on Job.mapper (companion object lazy val).
Covers ADR Done When items not previously verified:

- PlanExecutorTest uses a counting Source and capturing Sink to assert
  that fanOut materializes its upstream exactly once across branches,
  exposes the cached storage level to each branch, and unpersists after
  the run. The shared-upstream test asserts identity-based memo
  semantics (one Src node feeding two sinks runs once; two distinct
  Sources with equal args remain independent).
- PlanTypeStateTest captures the type-state invariant via reflection:
  Plan.Open and Plan.MultiOpen must not expose run(); Plan.Closed must.
  This is a regression sentinel for the compile-time guarantee.
…ClassResolver guard

- fanOut: reject branches that introduce a new Source or contain nested
  Snk/Fork/Group at plan construction, not at run().
- FileSink: default mode to errorifexists (matches Spark default) so
  accidental reruns do not silently destroy output.
- ClassResolver: load classes with initialize=false; StepsBuilder type-
  checks Step assignability before triggering <clinit>.
- Job: split lax mapper (argv) and strict stepMapper (step args) so YAML
  typos in step args fail loudly while Spark --spark.* flags pass through.
- parseArgv: warn on dropped tokens to stderr instead of silently ignoring.
- Plan: explain non-blocking unpersist rationale.
- Add JobTest, ClassResolverTest, plus fanOut/FileSink regression cases.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the original three-primitive spec (Source/Transform/Sink) with
five shape-typed primitives so each trait's in/out arity is self-evident,
and adds Split (1→M) to express multi-output cases ALS dual-factor,
train/val split, model+metrics) that N→1 could not.

Traits:
- Transform → Flow (1→1) + Merge (N→1)
- New Split (1→M) with declared output ports

Built-in renames:
- SqlTransform → SqlMerge
- CacheTransform → CacheFlow

DSL additions:
- Plan.Open ~> overloads for Flow / Merge / Split / Sink
- Plan.Forked: typed result of `~> someSplit`; `.apply(port)` selects a port
- Plan.bundle: combines per-port Closed siblings into one runnable Plan
- Executor: split-level memoization keyed on Ast.Sp identity, so the split
  body runs at most once even when multiple ports are consumed
- fanOut branch validator now covers Flow/Merge/Port and rejects nested
  Sp/Port/Fork/Group

StepSpec / StepsBuilder:
- `as` is now polymorphic: string for Source/Flow/Merge, map for Split
  `{port: label}`
- Split rejects string-form `as`; non-Split rejects map form
- After a Split, the linear-chain default upstream is dropped — downstream
  must explicitly reference a port label
- ClassResolver: new `steps.split` root for future Split built-ins

Tests:
- PlanSplitTest (7 cases): port routing, single-execution memo, AST-identity
  isolation, unknown-port plan-time rejection, bundle composition
- StepsBuilderTest: 4 new Split-form cases (build/run, reject string `as` on
  Split, reject unknown port, reject linear chain after Split)
- Existing tests renamed where they referenced Transform

This is a pre-release spec change; no backward-compat shims.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@em3s em3s changed the title Add Step DSL (Source/Transform/Sink) for Spark jobs Add Step DSL (Source/Flow/Merge/Split/Sink) for Spark jobs May 14, 2026
em3s and others added 2 commits May 14, 2026 17:18
Closes the only remaining coverage gap: every test until now constructed
StepSpec values directly, so the polymorphic `as` field (string for
Source/Flow/Merge, map for Split) and the rest of the YAML → Cfg path
were exercised only by hand-built fixtures, never by actual YAML.

Three cases parse a workflow-shaped YAML `args:` block via the same
loose Map[String, Any] shape that real runners hand to planFromMap, then
run the resulting Plan:

- linear chain (Source → SqlMerge → Sink)
- string `as:` + multi-input `inputs:` for a join
- map `as: {even: ..., odd: ...}` for Split port labels feeding two
  FileSinks; output partitions are read back to confirm routing

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pure formatting from `:pipeline:spotlessApply`. No semantic changes —
column alignment in DSL operator overloads and Ast case-class lists,
ScalaDoc indentation in Step/SqlMerge/ParitySplit, google-java-format
of the test sentinel, and chain wrapping in a few test sites.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@em3s em3s marked this pull request as ready for review May 14, 2026 08:21
em3s and others added 3 commits May 14, 2026 19:01
CI's spotless (scalafmt 3.8.3) flags `/** Foo` openers in multi-line
docstrings and rewrites them to `/**\n  * Foo`. Local spotlessApply
does not reproduce the violation despite identical scalafmt version
and config — root cause TBD — so this commit applies the CI-expected
form by hand to unblock the build.

No semantic changes; affects 18 files across pipeline main and test.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
PR #312 removed HBase from SparkConventionsPlugin, but PR #309
re-added the same five lines. The re-added hbase-shaded-client and
hbase-shaded-mapreduce jars shadow Hadoop's FileSystem on the
pipeline test classpath, breaking Spark tests on CI.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request size:XL This PR changes 500-999 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add Step DSL (Source/Flow/Merge/Split/Sink) for Spark jobs

1 participant