-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Description
What would you like to happen?
Current lineage implementation
Currently, the lineage implementation is tightly coupled to Beam Metrics as the storage backend in both Java and Python SDK.
The Java static API returns two instances for source and sink:
Lineage sources = Lineage.getSources();
Lineage sinks = Lineage.getSinks();The Lineage instance provides different overloads of the add() method, which sends lineage data to metrics:
if (MetricsFlag.lineageRollupEnabled()) {
((BoundedTrie) this.metric).add(segments);
} else {
((StringSet) this.metric).add(String.join("", segments));
}Also Lineage class provides two public static methods to query Lineage from results:
Set<String> query(MetricResults results, Type type, String truncatedMarker)
Set<String> query(MetricResults results, Type type)Requested functionality
Add a pluggable lineage tracking mechanism using ServiceLoader to decouple lineage reporting from core metrics infrastructure, enabling flexible observability without core changes. Scope: Java SDK (Python SDK in future work).
- This change must preserve current public APIs
- Minimize changes to I/O connectors that produce lineage; isolate changes to the
org.apache.beam.sdk.metrics.Lineageclass - Use a plugin approach via ServiceLoader discovery, following the existing pattern of
FileSystemRegistrar. Key advantage is that registrars on classpath can readPipelineOptionsand turn on or turn off depending on parameters. This satisfies the approach described in the Open Lineage ticket:
options = PipelineOptions([
'--openlineage_enabled=true',
(note that OpenLineage is just a good example, it is out of scope of this request)
4. Do not provide any concrete plugin implementations. If no plugins are available, fall back to the existing metric-based lineage approach.
5. Unfortunately static query methods expose MetricResults as implementation detail. Leave them as is, so they are out of scope of this change.
Deduplication
Native metrics like BoundedTrie/StringSet perform cross-worker consolidation automatically. In contrast, custom lineage shifts the responsibility for deduplication to the receiving system, whether that be OpenLineage or a custom-built implementation.
Relationship to existing roadmap
This change will serve as a foundation for [Feature Request]: Integrate Apache Beam with
Open Lineage which is already put on the roadmap for Beam 3.0.
Existing [Feature Request]: Integrate Apache Beam with
Open Lineage attempts to add a different type of functionality to Beam 3.0 and does not overlap. So the current feature request does not interfere with it
Testing strategy
- Unit tests for the plugin discovery mechanism
- Integration tests with mock lineage reporters
- Backward compatibility tests ensuring existing metric-based lineage still works when no plugins are present
- Cross-runner tests will initially focus on DirectRunner, with cross-runner compatibility expected to be inherited from the existing metrics infrastructure
Documentation
- Update JavaDoc for
Lineageclass - Add developer guide for implementing custom lineage reporters
Issue Priority
Priority: 2 (default / most feature requests should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner