Skip to content

feat: impl Task for private #18311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Jul 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
bf17de1
feat: impl Task for private
KKould Jul 4, 2025
4ccb7b7
feat: imp when_condition for Task & store TaskRun on system table
KKould Jul 8, 2025
6baff0a
feat: use Meta's Watch mechanism to distribute TaskMessage
KKould Jul 9, 2025
c42aac3
refactor: Using SQL to simplify DAG
KKould Jul 11, 2025
5fe7a8b
chore: fix meta_store init
KKould Jul 11, 2025
b4eb4bd
chore: log error on spawn
KKould Jul 11, 2025
d9b4576
test: add test for private task
KKould Jul 14, 2025
51bcf06
chore: add license for private task
KKould Jul 14, 2025
cc80564
chore: fix test_display_license_info
KKould Jul 14, 2025
181a438
chore: codefmt
KKould Jul 15, 2025
3a8a3dc
chore: add `accept` for Delete & After
KKould Jul 15, 2025
d7ba36b
chore: add restart test
KKould Jul 15, 2025
f0721b1
chore: codefmt
KKould Jul 15, 2025
0b8b72f
chore: add `system.task_history` for Private Task & use `TaskMgr::acc…
KKould Jul 15, 2025
c15189b
fix: TableFunctionFactory create fail
KKould Jul 16, 2025
739b8be
ci: rename to private task test
KKould Jul 16, 2025
2e9d55a
chore: add cron test
KKould Jul 16, 2025
a5f4931
feat: add system table: `system.tasks`
KKould Jul 16, 2025
863f120
chore: fix `update_or_create_task_run` correct on `ScheduleTask`
KKould Jul 16, 2025
547aed3
chore: add Task when test on `test-private-task.sh`
KKould Jul 17, 2025
fc5fe53
chore: add private task config check on `GlobalServices::init_with`
KKould Jul 17, 2025
22ea27f
chore: update Task version on Meta
KKould Jul 17, 2025
578694f
chore: codefmt
KKould Jul 17, 2025
98addfd
chore: remove `TaskMgr::list_task_fallback`
KKould Jul 17, 2025
6848da5
chore: codefmt
KKould Jul 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions .github/actions/test_private_tasks/action.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: "Test private task for databend query"
description: "Test private task for databend query"
runs:
using: "composite"
steps:
- uses: ./.github/actions/setup_test

- name: Install lsof
shell: bash
run: sudo apt-get update -yq && sudo apt-get install -yq lsof

- name: Minio Setup for (ubuntu-latest only)
shell: bash
run: |
docker run -d --network host --name minio \
-e "MINIO_ACCESS_KEY=minioadmin" \
-e "MINIO_SECRET_KEY=minioadmin" \
-e "MINIO_ADDRESS=:9900" \
-v /tmp/data:/data \
-v /tmp/config:/root/.minio \
minio/minio server /data

export AWS_ACCESS_KEY_ID=minioadmin
export AWS_SECRET_ACCESS_KEY=minioadmin
export AWS_EC2_METADATA_DISABLED=true

aws --endpoint-url http://127.0.0.1:9900/ s3 mb s3://testbucket
aws --endpoint-url http://127.0.0.1:9900/ s3 cp tests/data s3://testbucket/data --recursive --no-progress

- name: Run Private Task Tests
shell: bash
run: |
bash ./tests/task/test-private-task.sh

- name: Upload failure
if: failure()
uses: ./.github/actions/artifact_failure
with:
name: test-tasks
18 changes: 18 additions & 0 deletions .github/workflows/reuse.linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,24 @@ jobs:
- uses: ./.github/actions/test_logs
timeout-minutes: 20

test_private_tasks:
needs: [ build, check ]
runs-on:
- self-hosted
- X64
- Linux
- 2c8g
- "${{ inputs.runner_provider }}"
- "${{ inputs.runner_capacity }}"
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/setup_license
with:
runner_provider: ${{ inputs.runner_provider }}
type: ${{ inputs.license_type }}
- uses: ./.github/actions/test_private_tasks
timeout-minutes: 20

test_meta_cluster:
needs: [build, check]
runs-on:
Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/binaries/query/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use databend_query::servers::MySQLHandler;
use databend_query::servers::MySQLTlsConfig;
use databend_query::servers::Server;
use databend_query::servers::ShutdownHandle;
use databend_query::task::TaskService;
use databend_query::GlobalServices;
use log::info;

Expand Down Expand Up @@ -302,6 +303,9 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {
}
println!(" system history tables: {}", conf.log.history);
}
if conf.task.on {
TaskService::instance().initialized();
}

println!();
println!(
Expand Down
16 changes: 16 additions & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,22 @@ build_exceptions! {
UDFDataError(2607),
}

// Task Errors [2611-2614]
build_exceptions! {
/// Unknown Task
UnknownTask(2611),
/// Task already exists
TaskAlreadyExists(2612),
/// Task timezone invalid
TaskTimezoneInvalid(2613),
/// Task cron invalid
TaskCronInvalid(2614),
/// Task schedule and after conflict
TaskScheduleAndAfterConflict(2615),
/// Task when condition not met
TaskWhenConditionNotMet(2616),
}

// Search and External Service Errors [1901-1903, 1910]
build_exceptions! {
/// Tantivy error
Expand Down
11 changes: 10 additions & 1 deletion src/common/license/src/license.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ pub enum Feature {
SystemHistory,
#[serde(alias = "vector_index", alias = "VECTOR_INDEX")]
VectorIndex,
#[serde(alias = "private_task", alias = "PRIVATE_TASK")]
PrivateTask,
#[serde(other)]
Unknown,
}
Expand Down Expand Up @@ -134,6 +136,7 @@ impl fmt::Display for Feature {
Feature::WorkloadGroup => write!(f, "workload_group"),
Feature::SystemHistory => write!(f, "system_history"),
Feature::VectorIndex => write!(f, "vector_index"),
Feature::PrivateTask => write!(f, "private_task"),
Feature::Unknown => write!(f, "unknown"),
}
}
Expand Down Expand Up @@ -372,6 +375,11 @@ mod tests {
serde_json::from_str::<Feature>("\"VectorIndex\"").unwrap()
);

assert_eq!(
Feature::PrivateTask,
serde_json::from_str::<Feature>("\"private_task\"").unwrap()
);

assert_eq!(
Feature::Unknown,
serde_json::from_str::<Feature>("\"ssss\"").unwrap()
Expand Down Expand Up @@ -408,11 +416,12 @@ mod tests {
Feature::NgramIndex,
Feature::WorkloadGroup,
Feature::SystemHistory,
Feature::PrivateTask,
]),
};

assert_eq!(
"LicenseInfo{ type: enterprise, org: databend, tenants: [databend_tenant,foo], features: [aggregate_index,amend_table,attach_table,compute_quota(threads_num: 1, memory_usage: 1),computed_column,data_mask,hilbert_clustering,inverted_index,license_info,ngram_index,storage_encryption,storage_quota(storage_usage: 1),stream,system_history,vacuum,virtual_column,workload_group] }",
"LicenseInfo{ type: enterprise, org: databend, tenants: [databend_tenant,foo], features: [aggregate_index,amend_table,attach_table,compute_quota(threads_num: 1, memory_usage: 1),computed_column,data_mask,hilbert_clustering,inverted_index,license_info,ngram_index,private_task,storage_encryption,storage_quota(storage_usage: 1),stream,system_history,vacuum,virtual_column,workload_group] }",
license_info.to_string()
);
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/api/src/kv_pb_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use futures::TryStreamExt;
use itertools::Itertools;

pub(crate) use self::codec::decode_non_empty_item;
pub(crate) use self::codec::decode_seqv;
pub use self::codec::decode_seqv;
pub(crate) use self::codec::decode_transition;
pub(crate) use self::codec::encode_operation;
pub use self::upsert_pb::UpsertPB;
Expand Down
13 changes: 13 additions & 0 deletions src/meta/app/src/principal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub mod procedure_id_to_name;
pub mod procedure_identity;
pub mod procedure_name_ident;
pub mod stage_file_ident;
pub mod task;
pub mod task_ident;
pub mod task_message_ident;
pub mod tenant_ownership_object_ident;
pub mod tenant_user_ident;
pub mod user_defined_file_format_ident;
Expand Down Expand Up @@ -86,6 +89,16 @@ pub use role_info::RoleInfo;
pub use role_info::RoleInfoSerdeError;
pub use stage_file_ident::StageFileIdent;
pub use stage_file_path::StageFilePath;
pub use task::ScheduleOptions;
pub use task::ScheduleType;
pub use task::State;
pub use task::Status;
pub use task::Task;
pub use task::TaskMessage;
pub use task::TaskRun;
pub use task::WarehouseOptions;
pub use task_ident::TaskIdent;
pub use task_ident::TaskIdentRaw;
pub use tenant_ownership_object_ident::TenantOwnershipObjectIdent;
pub use tenant_user_ident::TenantUserIdent;
pub use udf_ident::UdfIdent;
Expand Down
165 changes: 165 additions & 0 deletions src/meta/app/src/principal/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;

use chrono::DateTime;
use chrono::Utc;

pub const EMPTY_TASK_ID: u64 = 0;

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum ScheduleType {
IntervalType = 0,
CronType = 1,
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum Status {
Suspended = 0,
Started = 1,
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum State {
Scheduled = 0,
Executing = 1,
Succeeded = 2,
Failed = 3,
Cancelled = 4,
}

#[derive(Debug, Clone, PartialEq)]
pub struct ScheduleOptions {
pub interval: Option<i32>,
pub cron: Option<String>,
pub time_zone: Option<String>,
pub schedule_type: ScheduleType,
pub milliseconds_interval: Option<u64>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct WarehouseOptions {
pub warehouse: Option<String>,
pub using_warehouse_size: Option<String>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct Task {
pub task_id: u64,
pub task_name: String,
pub query_text: String,
pub when_condition: Option<String>,
pub after: Vec<String>,
pub comment: Option<String>,
// expired useless
pub owner: String,
pub owner_user: String,
pub schedule_options: Option<ScheduleOptions>,
pub warehouse_options: Option<WarehouseOptions>,
pub next_scheduled_at: Option<DateTime<Utc>>,
pub suspend_task_after_num_failures: Option<u64>,
// TODO
pub error_integration: Option<String>,
pub status: Status,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub last_suspended_at: Option<DateTime<Utc>>,
// TODO
pub session_params: BTreeMap<String, String>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct TaskRun {
pub task: Task,
pub run_id: u64,
pub attempt_number: i32,
pub state: State,
pub scheduled_at: DateTime<Utc>,
pub completed_at: Option<DateTime<Utc>>,
pub error_code: i64,
pub error_message: Option<String>,
// expired useless
pub root_task_id: u64,
}

#[derive(Debug, Clone, PartialEq)]
pub enum TaskMessageType {
Execute,
Schedule,
Delete,
After,
}

#[derive(Debug, Clone, PartialEq)]
pub enum TaskMessage {
// Execute Task immediately. If an error occurs, if it is a SQL error in the task,
// it will be recorded in the error message of the task run.
ExecuteTask(Task),
// Schedule Task will try to spawn a thread in Query to continue running according to the time set in schedule
ScheduleTask(Task),
// Delete the task information and try to cancel the scheduled task in the query.
DeleteTask(String),
// After Task will bind Task to the tasks in Task.afters.
// When Execute Task is executed, after all the after tasks of Task are completed,
// the execution will continue.
AfterTask(Task),
}

impl TaskMessage {
pub fn task_name(&self) -> &str {
match self {
TaskMessage::ExecuteTask(task)
| TaskMessage::ScheduleTask(task)
| TaskMessage::AfterTask(task) => task.task_name.as_str(),
TaskMessage::DeleteTask(task_name) => task_name.as_str(),
}
}

pub fn ty(&self) -> TaskMessageType {
match self {
TaskMessage::ExecuteTask(_) => TaskMessageType::Execute,
TaskMessage::ScheduleTask(_) => TaskMessageType::Schedule,
TaskMessage::DeleteTask(_) => TaskMessageType::Delete,
TaskMessage::AfterTask(_) => TaskMessageType::After,
}
}

pub fn key(&self) -> String {
Self::key_with_type(self.ty(), self.task_name())
}

pub fn key_with_type(ty: TaskMessageType, task_name: &str) -> String {
let ty_num = match ty {
TaskMessageType::Execute => 0,
TaskMessageType::Schedule => 1,
TaskMessageType::Delete => 2,
TaskMessageType::After => 3,
};
format!("{}-{}-{}", TaskMessage::prefix_range().0, ty_num, task_name)
}

/// Returns the inclusive range of key prefixes used by `TaskMessage`.
///
/// This range can be used to scan all keys generated by `TaskMessage::key()`
/// and related methods (e.g., `schedule_key`). The prefix `0` is prepended
/// to all task-related keys to group them under the same prefix range,
/// enabling efficient key scanning or iteration.
///
/// The returned range is (0, 1), which includes all keys starting with `0-`
/// (as produced by `TaskMessage::prefix()`), and excludes any other unrelated prefixes.
pub fn prefix_range() -> (i64, i64) {
(0, 1)
}
}
Loading
Loading