Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 46 additions & 2 deletions src/execution/live_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,40 @@ impl SourceUpdateTask {
update_options: super::source_indexer::UpdateOptions,
) -> Result<()> {
let update_stats = Arc::new(stats::UpdateStats::default());
source_indexing_context

// Spawn periodic stats reporting task if print_stats is enabled
let reporting_handle = if self.options.print_stats {
let update_stats_clone = update_stats.clone();
let update_title_owned = update_title.to_string();
let flow_name = self.flow.flow_instance.name.clone();
let import_op_name = self.import_op().name.clone();

let report_task = async move {
let mut interval = tokio::time::interval(REPORT_INTERVAL);
let mut last_stats = update_stats_clone.as_ref().clone();
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
interval.tick().await; // Skip first tick
loop {
interval.tick().await;
let current_stats = update_stats_clone.as_ref().clone();
let delta = current_stats.delta(&last_stats);
if delta.has_any_change() {
// Print periodic progress (do NOT merge here, final report_stats will merge)
println!(
"{}.{} ({update_title_owned}): {}",
flow_name, import_op_name, delta
);
last_stats = current_stats;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to always show the latest stats (instead of the delta), and keep overwriting the stats line showing before? For users, they always see the last line showing the current stats.

Did a brief research, and indicatif crate seems good for this purpose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it wil check and apply the changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Screen.Recording.2025-10-21.at.6.30.13.PM.mov

is this right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's awesome! Thanks!

Please push your commits.

}
};
Some(tokio::spawn(report_task))
} else {
None
};

// Run the actual update
let update_result = source_indexing_context
.update(&self.pool, &update_stats, update_options)
.await
.with_context(|| {
Expand All @@ -338,12 +371,23 @@ impl SourceUpdateTask {
self.flow.flow_instance.name,
self.import_op().name
)
})?;
});

// Cancel the reporting task if it was spawned
if let Some(handle) = reporting_handle {
handle.abort();
}

// Check update result
update_result?;

if update_stats.has_any_change() {
self.status_tx.send_modify(|update| {
update.source_updates_num[self.source_idx] += 1;
});
}

// Report final stats
self.report_stats(&update_stats, update_title);
Ok(())
}
Expand Down
Loading