Conversation
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces several enhancements to the flow query engine, focusing on improving the reliability and accuracy of incremental queries. It includes mechanisms for detecting and handling stale cursor errors, tracking region watermarks, and extending the query context with flow-specific configurations. These changes contribute to a more robust and efficient flow query processing pipeline. Highlights
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces support for incremental queries in flows. The changes are extensive, touching many parts of the codebase from the client and server communication layers to the storage and query engines. Key changes include:
- Modifying the flight protocol handling in the client to process metrics messages that may follow record batches.
- Extending
QueryContextand related structs to carry sequence numbers for snapshot and incremental reads. - Implementing a
RegionWatermarkStreamin the datanode to attach the latest region sequence number (watermark) to the query result stream. - Adding logic in the flow task runner to detect and handle "stale cursor" errors from incremental queries by falling back to a full re-computation.
- Introducing checks in the storage engine (
mito2) to detect stale incremental queries and return a specific error. - Adding comprehensive integration tests to validate the new incremental query and error handling logic.
My feedback focuses on improving error handling in the client and enhancing code documentation for better maintainability. The overall implementation appears solid and well-thought-out.
src/client/src/database.rs
Outdated
| let m: Option<Arc<RecordBatchMetrics>> = | ||
| serde_json::from_str(&s).ok().map(Arc::new); | ||
| metrics_ref.swap(m); |
There was a problem hiding this comment.
The use of .ok() silently discards any JSON deserialization errors. This could hide potential issues with metrics serialization from the server, making debugging more difficult. It would be better to log a warning when deserialization fails. This also applies to the similar code block at line 494.
| let m: Option<Arc<RecordBatchMetrics>> = | |
| serde_json::from_str(&s).ok().map(Arc::new); | |
| metrics_ref.swap(m); | |
| let m = match serde_json::from_str(&s) { | |
| Ok(metrics) => Some(Arc::new(metrics)), | |
| Err(e) => { | |
| warn!("Failed to deserialize RecordBatchMetrics from flight message: {}", e); | |
| None | |
| } | |
| }; | |
| metrics_ref.swap(m); |
| pub struct FlowQueryContext { | ||
| /// Current catalog name - needed for flow metadata and recovery | ||
| pub catalog: String, | ||
| /// Current schema name - needed for table resolution during flow execution | ||
| pub schema: String, | ||
| /// Timezone for timestamp operations in the flow | ||
| pub timezone: String, | ||
| #[serde(default)] | ||
| pub extensions: HashMap<String, String>, | ||
| #[serde(default)] | ||
| pub channel: u8, | ||
| #[serde(default)] | ||
| pub snapshot_seqs: HashMap<u64, u64>, | ||
| #[serde(default)] | ||
| pub sst_min_sequences: HashMap<u64, u64>, |
There was a problem hiding this comment.
The doc comments for catalog, schema, and timezone fields in FlowQueryContext were removed. These comments were useful for understanding the purpose of each field. It would be great to restore them and also add doc comments for the newly added fields (extensions, channel, snapshot_seqs, sst_min_sequences) to improve code clarity and maintainability.
| } | ||
|
|
||
| impl FrontendClient { | ||
| /// TODO(discord9): better way to detect stale cursor error instead of parsing the error message |
There was a problem hiding this comment.
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
I hereby agree to the terms of the GreptimeDB CLA.
Summary
This PR adds a proof-of-concept for Flow incremental queries using a changed-rows-only incremental shape.
Instead of recomputing the full result from the full source input on every round, the POC aggregates only the delta window and joins it with sink state to emit only the rows that need to be updated in the sink.
Why
The goal is to validate whether incremental queries can materially reduce Flow query cost before we invest further in end-to-end scheduling and batching work.
Benchmark Result
The strongest result comes from a workload shaped like
src >> sink >> delta:small_delta_src_ultra_sink_mid707-716 msdelta-only left join update):124-126 msThis is roughly a 5.7x improvement in wall-clock latency.
Profiling
For this workload, full recompute is dominated by scanning and aggregating the full source table, while the incremental path shifts the main cost to sink-side scan and join work.
That means the POC already demonstrates a meaningful win, and the next optimization target is clear: reduce sink-side read cost further.
Caveats
src >> sink >> delta.Next Step
Continue optimizing the changed-rows-only incremental path, especially sink-side scan/repartition cost.
PR Checklist
Please convert it to a draft if some of the following conditions are not met.