Skip to content

feat: impl Task for private #18311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Jul 21, 2025
Merged

feat: impl Task for private #18311

merged 25 commits into from
Jul 21, 2025

Conversation

KKould
Copy link
Member

@KKould KKould commented Jul 4, 2025

I hereby agree to the terms of the CLA available at: https://docs.databend.com/dev/policies/cla/

Summary

Currently, Task-related functions rely on Cloud. This pr is used to support Task functions when privately deployed.

TODO:

  • Task
    • schedule
    • after
    • when
    • session_params: Nowhere to use
    • TaskRuns for log run state
  • Config to switch Cloud and Private
  • Store TaskRun on system table
  • Support scheduling by meta to avoid repeated query execution
  • Create Task
  • Describe Task
  • Alter Task
  • Drop Task
  • Show Tasks
  • Use table to record the direct dependencies of Task
  • System table: system.task_history
  • System table: system.task_dependents
  • System table: system.tasks

Tests

  • Unit Test
  • Logic Test
  • Benchmark Test
  • No Test

Type of change

  • Bug Fix (non-breaking change which fixes an issue)
  • New Feature (non-breaking change which adds functionality)
  • Breaking Change (fix or feature that could cause existing functionality not to work as expected)
  • Documentation Update
  • Refactoring
  • Performance Improvement
  • Other (please describe):

This change is Reviewable

@github-actions github-actions bot added the pr-feature this PR introduces a new feature to the codebase label Jul 4, 2025
None => None,
Some(ref w) => Some(mt::WarehouseOptions {
warehouse: w.warehouse.clone(),
using_warehouse_size: w.using_warehouse_size.clone(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need warehouse_size?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consistent with the current task structure defined by cloud, warehouse_options is rarely used in private

@KKould KKould force-pushed the feat/task_v2 branch 2 times, most recently from 22084d0 to dff7a50 Compare July 14, 2025 09:58
@KKould KKould marked this pull request as ready for review July 14, 2025 12:02
@KKould KKould requested a review from drmingdrmer as a code owner July 14, 2025 12:02
@drmingdrmer drmingdrmer requested a review from zhang2014 July 14, 2025 16:14
Copy link
Member

@drmingdrmer drmingdrmer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in general clean and rational to me. Good job!

I did not quite grasp the logic about the task execution between databend-query and meta. Can you provide a doc explaining this part? especially about the related key-value layout on databend-meta, and the duty of the semaphore in the task execution.

And add a new version change log entry here, and add a test for this change.

(134, "2025-06-27: Add: SequenceMeta.storage_version"),
// Dear developer:
// If you're gonna add a new metadata version, you'll have to add a test for it.
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)
// and replace two of the variable `bytes` and `want`.
];

Reviewed 10 of 35 files at r1, 38 of 38 files at r2, all commit messages.
Reviewable status: all files reviewed, 3 unresolved discussions (waiting on @zhang2014)


src/query/users/src/user_task.rs line 42 at r2 (raw file):

        task_api.execute_task(task_name).await??;
        Ok(())
    }

these UserApiProvider method does not seem necessary. just UserApiProvider.task_api(tenant).xxx() looks good enough to me 🤔

Code quote:

    #[async_backtrace::framed]
    pub async fn execute_task(&self, tenant: &Tenant, task_name: &str) -> Result<()> {
        let task_api = self.task_api(tenant);
        task_api.execute_task(task_name).await??;
        Ok(())
    }

src/query/management/src/task/task_mgr.rs line 238 at r2 (raw file):

            .await?
            .try_collect::<Vec<_>>()
            .await?;

what is list_task_fallback for? it looks the same as list_task.

Code quote:

    pub async fn list_task(&self) -> Result<Vec<Task>, MetaError> {
        let key = DirName::new(TaskIdent::new(&self.tenant, ""));
        let strm = self.kv_api.list_pb_values(&key).await?;

        match strm.try_collect().await {
            Ok(tasks) => Ok(tasks),
            Err(_) => self.list_task_fallback().await,
        }
    }

    #[async_backtrace::framed]
    #[fastrace::trace]
    pub async fn list_task_fallback(&self) -> Result<Vec<Task>, MetaError> {
        let key = TaskIdent::new(&self.tenant, "dummy");
        let dir = DirName::new(key);
        let tasks = self
            .kv_api
            .list_pb_values(&dir)
            .await?
            .try_collect::<Vec<_>>()
            .await?;

@KKould
Copy link
Member Author

KKould commented Jul 15, 2025

Currently, query uses the watch in meta to imitate channel to obtain tasks. When task messages are sent to channels, they are stored in meta using TaskMessage::key.

TaskMessage::key is divided into only 4 types of keys that will overwrite each other to avoid repeated storage and repeated processing.

Whenever a new key is inserted for overwriting, each query will receive the corresponding key change and process it, thus realizing the channel

The init type key of watch is used to let the Service load the Schedule, and TaskService will delete the corresponding key (TaskMgr::accept) when processing Execute & After & Delete TaskMessage to avoid repeated processing @drmingdrmer

@KKould
Copy link
Member Author

KKould commented Jul 15, 2025

TaskMetaHandle::acquire_with_guard tries to acquire semaphore in a short time to implement distributed task preemption, because when the watch mechanism changes the key, all queries will receive it.

@KKould
Copy link
Member Author

KKould commented Jul 15, 2025

list_task_fallback refers to UdfMgr, maybe this method can be deleted in TaskMgr

@drmingdrmer
Copy link
Member

@KKould
Based on our previous discussion, you mentioned that message processing follows an at-most-once pattern.

If that's still the case, you could simply get and delete the task message key from the meta-service within a single transaction. This ensures only one consumer can acquire the task for execution, eliminating the need for a semaphore.

PS, This logic should be documented in the source code so future readers can easily understand the design rationale.

@KKould
Copy link
Member Author

KKould commented Jul 15, 2025

@drmingdrmer Thanks for your suggestion. I tried KVApi::transaction, but it seems that this API cannot perform complex operations after obtaining semaphore as I am doing now. I can only use TxnOp. However, the other comments should be solved.

}

pub async fn prepare(&self) -> Result<()> {
let prepare_key = format!("{}/task_run_prepare/lock", self.tenant.tenant_name());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the task's execution history? Maybe it's better to reuse system_history instead. cc: @dqhl76

Copy link
Member Author

@KKould KKould Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, there is task_history in the current cloud task for querying running records. Is it better to forward system_task.task_run to task_history?

@BohuTANG BohuTANG added the ci-cloud Build docker image for cloud test label Jul 15, 2025
Copy link
Contributor

Docker Image for PR

  • tag: pr-18311-cb2a6ab-1752565796

note: this image tag is only available for internal use.

@drmingdrmer
Copy link
Member

@drmingdrmer Thanks for your suggestion. I tried KVApi::transaction, but it seems that this API cannot perform complex operations after obtaining semaphore as I am doing now. I can only use TxnOp. However, the other comments should be solved.

In order to provide a at-most-once semantics, It would be more simple by using an upsert(key, None) to fetch-and-delete a task message from the queue stored on meta-service.

And the semaphore can be removed. The transaction I mentioned above is meant to automatically fetch-and-remove a task message, not for exclusive task running.

Copy link
Contributor

github-actions bot commented Jul 15, 2025

🤖 Smart Auto-retry Analysis (Retry 9)

Workflow: 16387552174

📊 Summary

  • Total Jobs: 80
  • Failed Jobs: 1
  • Retryable: 0
  • Code Issues: 1

NO RETRY NEEDED

All failures appear to be code/test issues requiring manual fixes.

🔍 Job Details

  • linux / test_logs: Not retryable (Code/Test)

🤖 About

Automated analysis using job annotations to distinguish infrastructure issues (auto-retried) from code/test issues (manual fixes needed).

Copy link
Member

@sundy-li sundy-li left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, please fix the conflicts

@KKould KKould requested review from zhang2014 and drmingdrmer July 17, 2025 07:13
@drmingdrmer drmingdrmer requested review from dqhl76 and sundy-li July 18, 2025 14:37
Copy link
Member

@drmingdrmer drmingdrmer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 8 of 16 files at r3, 7 of 8 files at r6, 2 of 8 files at r7, 5 of 7 files at r8, 4 of 5 files at r9, 1 of 3 files at r10, 8 of 8 files at r11, 1 of 1 files at r12, all commit messages.
Reviewable status: all files reviewed, 13 unresolved discussions (waiting on @dqhl76, @KKould, @sundy-li, and @zhang2014)


src/meta/app/src/principal/task.rs line 100 at r12 (raw file):

    pub fn key(&self) -> String {
        format!("{}@{}", self.task.task_name, self.run_id)
    }

This method is also called a key but it does follow the convention of other meta-service key, such TaskIdent, which has a fixed prefix __fd_tasks.

And this method seems not used anywhere.

If it is a key used in meta-service, it should be defined with TIdent, such as TaskIdent.

Code quote:

    pub fn key(&self) -> String {
        format!("{}@{}", self.task.task_name, self.run_id)
    }

src/meta/app/src/principal/task.rs line 109 at r12 (raw file):

    DeleteTask(String),
    AfterTask(Task),
}

This struct and other core structs would benefit from documentation comments to clarify their purpose.

Currently the naming is ambiguous:

  • ExecuteTask suggests a command to run a task immediately, right?
  • ScheduleTask implies periodic scheduling, but it's missing schedule information (interval, timing, etc.)
  • AfterTask is unclear - does it mean "run task B after task A completes" or something else?

Consider adding doc comments that explain each variant's specific behavior and use cases.

Code quote:

#[derive(Debug, Clone, PartialEq)]
pub enum TaskMessage {
    ExecuteTask(Task),
    ScheduleTask(Task),
    DeleteTask(String),
    AfterTask(Task),
}

src/meta/app/src/principal/task.rs line 133 at r12 (raw file):

    pub fn schedule_key(task_name: &str) -> String {
        format!("{}-1-{task_name}", TaskMessage::prefix())
    }

why is schedule_key special, while there is already key() method?

Please explain it in the doc.

Code quote:

    pub fn key(&self) -> String {
        let ty = match self {
            TaskMessage::ExecuteTask(_) => 0,
            TaskMessage::ScheduleTask(_) => 1,
            TaskMessage::DeleteTask(_) => 2,
            TaskMessage::AfterTask(_) => 3,
        };
        format!("{}-{}-{}", TaskMessage::prefix(), ty, self.task_name())
    }

    pub fn schedule_key(task_name: &str) -> String {
        format!("{}-1-{task_name}", TaskMessage::prefix())
    }

src/meta/app/src/principal/task.rs line 137 at r12 (raw file):

    pub fn prefix() -> i64 {
        0
    }

Aside from other considerations, the keys and prefixes used in the meta-service should be human-readable strings rather than bare digits. This improves debuggability and makes the system easier to troubleshoot when inspecting the underlying storage.

Code quote:

    pub fn prefix() -> i64 {
        0
    }

Copy link
Member

@drmingdrmer drmingdrmer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 2 of 2 files at r13, all commit messages.
Reviewable status: all files reviewed, 13 unresolved discussions (waiting on @dqhl76, @KKould, @sundy-li, and @zhang2014)

@sundy-li sundy-li merged commit d6ddbae into databendlabs:main Jul 21, 2025
84 of 87 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-cloud Build docker image for cloud test pr-feature this PR introduces a new feature to the codebase
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants