Skip to content

Commit 5ab40a9

Browse files
committed
refactor: add ResourceStat
Signed-off-by: zyy17 <[email protected]>
1 parent 477eeac commit 5ab40a9

File tree

20 files changed

+278
-218
lines changed

20 files changed

+278
-218
lines changed

Cargo.lock

Lines changed: 4 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/cmd/src/flownode.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHand
3030
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
3131
use common_meta::key::TableMetadataManager;
3232
use common_meta::key::flow::FlowMetadataManager;
33+
use common_stat::ResourceStatImpl;
3334
use common_telemetry::info;
3435
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
3536
use common_version::{short_version, verbose_version};
@@ -372,11 +373,15 @@ impl StartCommand {
372373
Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
373374
]);
374375

376+
let mut resource_stat = ResourceStatImpl::default();
377+
resource_stat.start_collect_cpu_usage();
378+
375379
let heartbeat_task = flow::heartbeat::HeartbeatTask::new(
376380
&opts,
377381
meta_client.clone(),
378382
opts.heartbeat.clone(),
379383
Arc::new(executor),
384+
Arc::new(resource_stat),
380385
);
381386

382387
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));

src/cmd/src/frontend.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
3030
use common_meta::heartbeat::handler::HandlerGroupExecutor;
3131
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
3232
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
33+
use common_stat::ResourceStatImpl;
3334
use common_telemetry::info;
3435
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
3536
use common_time::timezone::set_default_timezone;
@@ -421,11 +422,15 @@ impl StartCommand {
421422
Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
422423
]);
423424

425+
let mut resource_stat = ResourceStatImpl::default();
426+
resource_stat.start_collect_cpu_usage();
427+
424428
let heartbeat_task = HeartbeatTask::new(
425429
&opts,
426430
meta_client.clone(),
427431
opts.heartbeat.clone(),
428432
Arc::new(executor),
433+
Arc::new(resource_stat),
429434
);
430435
let heartbeat_task = Some(heartbeat_task);
431436

src/common/config/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ workspace = true
1111
common-base.workspace = true
1212
common-error.workspace = true
1313
common-macro.workspace = true
14-
common-stat.workspace = true
1514
config.workspace = true
1615
humantime-serde.workspace = true
1716
object-store.workspace = true

src/common/config/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
pub mod config;
1616
pub mod error;
17-
pub mod utils;
1817

1918
use std::time::Duration;
2019

src/common/config/src/utils.rs

Lines changed: 0 additions & 34 deletions
This file was deleted.

src/common/stat/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@ license.workspace = true
66

77
[dependencies]
88
common-base.workspace = true
9+
common-runtime.workspace = true
10+
common-telemetry.workspace = true
911
lazy_static.workspace = true
1012
nix.workspace = true
1113
num_cpus.workspace = true
1214
prometheus.workspace = true
1315
sysinfo.workspace = true
16+
tokio.workspace = true
1417

1518
[lints]
1619
workspace = true

src/common/stat/src/cgroups.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,16 +145,16 @@ pub fn get_cpu_usage_from_cgroups() -> Option<i64> {
145145
fields[1].trim().parse::<i64>().ok()
146146
}
147147

148-
/// Calculate the cpu usage in millicores from cgroups filesystem.
149-
///
150-
/// - Return `0` if the current cpu usage is equal to the last cpu usage or the interval is 0.
151-
pub fn calculate_cpu_usage(
148+
// Calculate the cpu usage in millicores from cgroups filesystem.
149+
//
150+
// - Return `0` if the current cpu usage is equal to the last cpu usage or the interval is 0.
151+
pub(crate) fn calculate_cpu_usage(
152152
current_cpu_usage_usecs: i64,
153153
last_cpu_usage_usecs: i64,
154154
interval_milliseconds: i64,
155155
) -> i64 {
156156
let diff = current_cpu_usage_usecs - last_cpu_usage_usecs;
157-
if diff != 0 && interval_milliseconds != 0 {
157+
if diff > 0 && interval_milliseconds > 0 {
158158
((diff as f64 / interval_milliseconds as f64).round() as i64).max(1)
159159
} else {
160160
0

src/common/stat/src/lib.rs

Lines changed: 2 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -13,66 +13,7 @@
1313
// limitations under the License.
1414

1515
mod cgroups;
16+
mod resource;
1617

1718
pub use cgroups::*;
18-
use common_base::readable_size::ReadableSize;
19-
use sysinfo::System;
20-
21-
/// Get the total CPU in millicores.
22-
pub fn get_total_cpu_millicores() -> i64 {
23-
// Get CPU limit from cgroups filesystem.
24-
if let Some(cgroup_cpu_limit) = get_cpu_limit_from_cgroups() {
25-
cgroup_cpu_limit
26-
} else {
27-
// Get total CPU cores from host system.
28-
num_cpus::get() as i64 * 1000
29-
}
30-
}
31-
32-
/// Get the total memory in bytes.
33-
pub fn get_total_memory_bytes() -> i64 {
34-
// Get memory limit from cgroups filesystem.
35-
if let Some(cgroup_memory_limit) = get_memory_limit_from_cgroups() {
36-
cgroup_memory_limit
37-
} else {
38-
// Get total memory from host system.
39-
if sysinfo::IS_SUPPORTED_SYSTEM {
40-
let mut sys_info = System::new();
41-
sys_info.refresh_memory();
42-
sys_info.total_memory() as i64
43-
} else {
44-
// If the system is not supported, return -1.
45-
-1
46-
}
47-
}
48-
}
49-
50-
/// Get the total CPU cores. The result will be rounded to the nearest integer.
51-
/// For example, if the total CPU is 1.5 cores(1500 millicores), the result will be 2.
52-
pub fn get_total_cpu_cores() -> usize {
53-
((get_total_cpu_millicores() as f64) / 1000.0).round() as usize
54-
}
55-
56-
/// Get the total memory in readable size.
57-
pub fn get_total_memory_readable() -> Option<ReadableSize> {
58-
if get_total_memory_bytes() > 0 {
59-
Some(ReadableSize(get_total_memory_bytes() as u64))
60-
} else {
61-
None
62-
}
63-
}
64-
65-
#[cfg(test)]
66-
mod tests {
67-
use super::*;
68-
69-
#[test]
70-
fn test_get_total_cpu_cores() {
71-
assert!(get_total_cpu_cores() > 0);
72-
}
73-
74-
#[test]
75-
fn test_get_total_memory_readable() {
76-
assert!(get_total_memory_readable().unwrap() > ReadableSize::mb(0));
77-
}
78-
}
19+
pub use resource::*;

0 commit comments

Comments
 (0)