diff --git a/doc/en/mooncake-store.md b/doc/en/mooncake-store.md index 02827b4b6..bde57b516 100644 --- a/doc/en/mooncake-store.md +++ b/doc/en/mooncake-store.md @@ -570,6 +570,14 @@ The HTTP metadata server can be configured using the following parameters: - **`http_metadata_server_host`** (string, default: `"0.0.0.0"`): Specifies the host address for the HTTP metadata server to bind to. Use `"0.0.0.0"` to listen on all available network interfaces, or specify a specific IP address for security purposes. +#### Environment Variables + +- MC_STORE_CLUSTER_ID: Identify the metadata when multiple cluster share the same master, default 'mooncake'. +- MC_STORE_MEMCPY: Enables or disables local memcpy optimization, set to 1/true to enable, 0/false to disable. +- MC_STORE_NODE_IP: Used by client metrics, the node's IP address. +- MC_STORE_CLIENT_METRIC: Enables client metric reporting, enabled by default; set to 0/false to disable. +- MC_STORE_CLIENT_METRIC_INTERVAL: Reporting interval in seconds, default 0 (collects but does not report). + #### Usage Example To start the master service with the HTTP metadata server enabled: diff --git a/doc/zh/mooncake-store.md b/doc/zh/mooncake-store.md index 1801dfe9d..64981b511 100644 --- a/doc/zh/mooncake-store.md +++ b/doc/zh/mooncake-store.md @@ -570,6 +570,14 @@ HTTP 元数据服务器可通过以下参数进行配置: - **`http_metadata_server_host`**(字符串,默认值:`"0.0.0.0"`):指定 HTTP 元数据服务器绑定的主机地址。使用 `"0.0.0.0"` 可监听所有可用网络接口,或指定特定 IP 地址以提高安全性。 +#### 环境变量说明 + +- **MC_STORE_CLUSTER_ID**: 在多集群复用 master 场景下标识元数据, 默认 'mooncake' +- **MC_STORE_MEMCPY**: 控制是否启用本地 memcpy 优化, 1/true 启用, 0/false 禁用 +- **MC_STORE_NODE_IP**: 客户端指标使用, 节点 IP 地址 +- **MC_STORE_CLIENT_METRIC**: 启用客户端指标上报, 默认启用;设为 0/false 禁用 +- **MC_STORE_CLIENT_METRIC_INTERVAL**: 指标上报间隔(秒), 默认 0(仅收集不上报) + #### 使用示例 要使用启用了 HTTP 元数据服务器的主服务,请运行: diff --git a/mooncake-store/include/client_metric.h b/mooncake-store/include/client_metric.h index 20882b2e0..bb3a239fb 100644 --- a/mooncake-store/include/client_metric.h +++ b/mooncake-store/include/client_metric.h @@ -8,6 +8,7 @@ #include #include #include "utils.h" +#include "hybrid_metric.h" namespace mooncake { @@ -21,23 +22,41 @@ const std::vector kLatencyBucket = { // safeguards for long tails 50000, 100000, 200000, 500000, 1000000}; +static inline std::string get_env_or_default( + const char* env_var, const std::string& default_val = "") { + const char* val = getenv(env_var); + return val ? val : default_val; +} + +// In production mode, more labels are needed for monitoring and troubleshooting +// Static labels include but are not limited to machine address, cluster name, +// etc. These labels remain constant during the lifetime of the application +const std::string kInstanceID = get_env_or_default("MC_STORE_NODE_IP"); +const std::string kClusterID = get_env_or_default("MC_STORE_CLUSTER_ID"); +const std::map static_labels = { + // instance id is node ip + {"instance_id", kInstanceID}, + // Cluster ID (e.g., Qwen-Max, Qwen-Plus; defaults to mooncake) + {"cluster_id", kClusterID}, +}; + struct TransferMetric { ylt::metric::counter_t total_read_bytes{"mooncake_transfer_read_bytes", - "Total bytes read"}; - ylt::metric::counter_t total_write_bytes{"mooncake_transfer_write_bytes", - "Total bytes written"}; + "Total bytes read", static_labels}; + ylt::metric::counter_t total_write_bytes{ + "mooncake_transfer_write_bytes", "Total bytes written", static_labels}; ylt::metric::histogram_t batch_put_latency_us{ "mooncake_transfer_batch_put_latency", - "Batch Put transfer latency (us)", kLatencyBucket}; + "Batch Put transfer latency (us)", kLatencyBucket, static_labels}; ylt::metric::histogram_t batch_get_latency_us{ "mooncake_transfer_batch_get_latency", - "Batch Get transfer latency (us)", kLatencyBucket}; + "Batch Get transfer latency (us)", kLatencyBucket, static_labels}; ylt::metric::histogram_t get_latency_us{"mooncake_transfer_get_latency", "Get transfer latency (us)", - kLatencyBucket}; + kLatencyBucket, static_labels}; ylt::metric::histogram_t put_latency_us{"mooncake_transfer_put_latency", "Put transfer latency (us)", - kLatencyBucket}; + kLatencyBucket, static_labels}; void serialize(std::string& str) { total_read_bytes.serialize(str); @@ -137,13 +156,14 @@ struct MasterClientMetric { MasterClientMetric() : rpc_count("mooncake_client_rpc_count", - "Total number of RPC calls made by the client", rpc_names), + "Total number of RPC calls made by the client", + static_labels, rpc_names), rpc_latency("mooncake_client_rpc_latency", "Latency of RPC calls made by the client (in us)", - kLatencyBucket, rpc_names) {} + kLatencyBucket, static_labels, rpc_names) {} - ylt::metric::dynamic_counter_1t rpc_count; - ylt::metric::dynamic_histogram_1t rpc_latency; + ylt::metric::hybrid_counter_1t rpc_count; + ylt::metric::hybrid_histogram_1t rpc_latency; void serialize(std::string& str) { rpc_count.serialize(str); rpc_latency.serialize(str); diff --git a/mooncake-store/include/hybrid_metric.h b/mooncake-store/include/hybrid_metric.h new file mode 100644 index 000000000..c84411ead --- /dev/null +++ b/mooncake-store/include/hybrid_metric.h @@ -0,0 +1,453 @@ +#pragma once +#include +#include +#include + +#include +#include + +namespace ylt::metric { +template +class basic_hybrid_counter + : public dynamic_metric_impl, N> { + using Base = dynamic_metric_impl, N>; + + public: + // hybrid labels value + basic_hybrid_counter(std::string name, std::string help, + std::map static_labels, + std::array labels_name) + : Base(MetricType::Counter, std::move(name), std::move(help), + std::move(labels_name)), + static_labels_(static_labels) { + for (auto& [k, v] : static_labels) { + static_labels_str_.append(k).append("=\"").append(v).append("\","); + } + } + using label_key_type = const std::array&; + void inc(label_key_type labels_value, value_type value = 1) { + detail::inc_impl(Base::try_emplace(labels_value).first->value, value); + } + + value_type update(label_key_type labels_value, value_type value) { + return Base::try_emplace(labels_value) + .first->value.exchange(value, std::memory_order::relaxed); + } + + value_type value(label_key_type labels_value) { + if (auto ptr = Base::find(labels_value); ptr != nullptr) { + return ptr->value.load(std::memory_order::relaxed); + } else { + return value_type{}; + } + } + + void remove_label_value( + const std::map& labels) override { + if (Base::empty()) { + return; + } + + const auto& labels_name = this->labels_name(); + if (labels.size() > labels_name.size()) { + return; + } + + // if (labels.size() == labels_name.size()) { // TODO: speed up for + // this case + + // } + // else { + size_t count = 0; + std::vector vec; + for (auto& lb_name : labels_name) { + if (auto i = labels.find(lb_name); i != labels.end()) { + vec.push_back(i->second); + } else { + vec.push_back(""); + count++; + } + } + if (count == labels_name.size()) { + return; + } + Base::erase_if([&](auto& pair) { + auto& [arr, _] = pair; + if constexpr (N > 0) { + for (size_t i = 0; i < vec.size(); i++) { + if (!vec[i].empty() && vec[i] != arr[i]) { + return false; + } + } + } + return true; + }); + //} + } + + bool has_label_value(const std::string& value) override { + auto map = Base::copy(); + for (auto& e : map) { + auto& label_value = e->label; + if (auto it = + std::find(label_value.begin(), label_value.end(), value); + it != label_value.end()) { + return true; + } + } + + return false; + } + + bool has_label_value(const std::regex& regex) override { + auto map = Base::copy(); + for (auto& e : map) { + auto& label_value = e->label; + if (auto it = std::find_if( + label_value.begin(), label_value.end(), + [&](auto& val) { return std::regex_match(val, regex); }); + it != label_value.end()) { + return true; + } + } + + return false; + } + + bool has_label_value(const std::vector& label_value) override { + std::array arr{}; + size_t size = (std::min)((size_t)N, label_value.size()); + if (label_value.size() > N) { + return false; + } + + for (size_t i = 0; i < size; i++) { + arr[i] = label_value[i]; + } + return Base::find(arr) != nullptr; + } + + void serialize(std::string& str) override { + auto map = Base::copy(); + if (map.empty()) { + return; + } + + std::string value_str; + serialize_map(map, value_str); + if (!value_str.empty()) { + Base::serialize_head(str); + str.append(value_str); + } + } + +#ifdef CINATRA_ENABLE_METRIC_JSON + void serialize_to_json(std::string& str) override { + auto map = Base::copy(); + json_counter_t counter{Base::name_, Base::help_, Base::metric_name()}; + to_json(counter, map, str); + } + + template + void to_json(json_counter_t& counter, T& map, std::string& str) { + for (auto& e : map) { + auto& k = e->label; + auto& val = e->value; + json_counter_metric_t metric; + size_t index = 0; + assert(Base::labels_name().size() == k.size()); + for (auto& [k, v] : static_labels_) { + metric.labels.emplace_back(k, v); + } + for (auto& label_value : k) { + metric.labels.emplace_back(Base::labels_name()[index++], + label_value); + } + metric.value = val.load(std::memory_order::relaxed); + counter.metrics.push_back(std::move(metric)); + } + if (!counter.metrics.empty()) { + iguana::to_json(counter, str); + } + } +#endif + + protected: + template + void serialize_map(T& value_map, std::string& str) { + for (auto& e : value_map) { + auto& labels_value = e->label; + auto val = e->value.load(std::memory_order::relaxed); + str.append(Base::name_); + if (Base::labels_name_.empty()) { + str.append(" "); + } else { + str.append("{"); + build_string_with_static(str, Base::labels_name_, labels_value); + str.append("} "); + } + + str.append(std::to_string(val)); + + str.append("\n"); + } + } + + template + bool equal(const std::vector& v, const std::array& a) { + if (v.size() != N) return false; + + return std::equal(v.begin(), v.end(), a.begin()); + } + + void build_string_with_static(std::string& str, + const std::vector& v1, + const auto& v2) { + str.append(static_labels_str_); + for (size_t i = 0; i < v1.size(); i++) { + str.append(v1[i]).append("=\"").append(v2[i]).append("\"").append( + ","); + } + str.pop_back(); + } + + private: + std::string static_labels_str_; // preformatted static labels string + std::map static_labels_; +}; + +using hybrid_counter_1t = basic_hybrid_counter; +using hybrid_counter_1d = basic_hybrid_counter; + +using hybrid_counter_2t = basic_hybrid_counter; +using hybrid_counter_2d = basic_hybrid_counter; +using hybrid_counter_t = hybrid_counter_2t; +using hybrid_counter_d = hybrid_counter_2d; + +using hybrid_counter_3t = basic_hybrid_counter; +using hybrid_counter_3d = basic_hybrid_counter; + +using hybrid_counter_4t = basic_hybrid_counter; +using hybrid_counter_4d = basic_hybrid_counter; + +using hybrid_counter_5t = basic_hybrid_counter; +using hybrid_counter_5d = basic_hybrid_counter; + +// ================ hybrid_histogram ================ +template +class basic_hybrid_histogram : public dynamic_metric { + public: + basic_hybrid_histogram(std::string name, std::string help, + std::vector buckets, + std::map static_labels, + std::array labels_name) + : dynamic_metric(MetricType::Histogram, name, help, labels_name), + static_labels_(static_labels), + bucket_boundaries_(buckets), + sum_(std::make_shared>( + name, help, labels_name)) { + for (size_t i = 0; i < buckets.size() + 1; i++) { + bucket_counts_.push_back( + std::make_shared>( + name, help, labels_name)); + } + for (auto& [k, v] : static_labels) { + static_labels_str_.append(k).append("=\"").append(v).append("\","); + } + } + + void observe(const std::array& labels_value, + value_type value) { + const auto bucket_index = static_cast( + std::distance(bucket_boundaries_.begin(), + std::lower_bound(bucket_boundaries_.begin(), + bucket_boundaries_.end(), value))); + sum_->inc(labels_value, value); + bucket_counts_[bucket_index]->inc(labels_value); + } + + void clean_expired_label() override { + sum_->clean_expired_label(); + for (auto& m : bucket_counts_) { + m->clean_expired_label(); + } + } + + auto get_bucket_counts() { return bucket_counts_; } + + bool has_label_value(const std::string& label_val) override { + return sum_->has_label_value(label_val); + } + + bool has_label_value(const std::regex& regex) override { + return sum_->has_label_value(regex); + } + + bool has_label_value(const std::vector& label_value) override { + return sum_->has_label_value(label_value); + } + + size_t label_value_count() const { return sum_->label_value_count(); } + + void serialize(std::string& str) override { + auto value_map = sum_->copy(); + if (value_map.empty()) { + return; + } + + serialize_head(str); + + std::string value_str; + auto bucket_counts = get_bucket_counts(); + for (auto& e : value_map) { + auto& labels_value = e->label; + auto& value = e->value; + if (value == 0) { + continue; + } + + value_type count = 0; + for (size_t i = 0; i < bucket_counts.size(); i++) { + auto counter = bucket_counts[i]; + value_str.append(name_).append("_bucket{"); + if (!labels_name_.empty()) { + build_label_string_with_static(value_str, labels_name_, + labels_value); + value_str.append(","); + } + + if (i == bucket_boundaries_.size()) { + value_str.append("le=\"").append("+Inf").append("\"} "); + } else { + value_str.append("le=\"") + .append(std::to_string(bucket_boundaries_[i])) + .append("\"} "); + } + + count += counter->value(labels_value); + value_str.append(std::to_string(count)); + value_str.append("\n"); + } + + str.append(value_str); + + std::string labels_str; + build_label_string_with_static(labels_str, sum_->labels_name(), + labels_value); + + str.append(name_); + str.append("_sum{"); + str.append(labels_str); + str.append("} "); + str.append(std::to_string(value)); + str.append("\n"); + str.append(name_).append("_count{"); + str.append(labels_str); + str.append("} "); + str.append(std::to_string(count)); + str.append("\n"); + } + if (value_str.empty()) { + str.clear(); + } + } + + void build_label_string_with_static( + std::string& str, const std::vector& label_name, + const auto& label_value) { + str.append(static_labels_str_); + for (size_t i = 0; i < label_name.size(); i++) { + str.append(label_name[i]) + .append("=\"") + .append(label_value[i]) + .append("\","); + } + str.pop_back(); + } + +#ifdef CINATRA_ENABLE_METRIC_JSON + void serialize_to_json(std::string& str) override { + auto value_map = sum_->copy(); + if (value_map.empty()) { + return; + } + + json_histogram_t hist{name_, help_, std::string(metric_name())}; + auto bucket_counts = get_bucket_counts(); + + for (auto& e : value_map) { + auto& labels_value = e->label; + auto& value = e->value; + if (value == 0) { + continue; + } + + size_t count = 0; + json_histogram_metric_t metric{}; + for (size_t i = 0; i < bucket_counts.size(); i++) { + auto counter = bucket_counts[i]; + + count += counter->value(labels_value); + + if (i == bucket_boundaries_.size()) { + metric.quantiles.emplace(std::numeric_limits::max(), + (int64_t)count); + } else { + metric.quantiles.emplace( + bucket_boundaries_[i], + (int64_t)counter->value(labels_value)); + } + } + metric.count = (int64_t)count; + metric.sum = sum_->value(labels_value); + + for (auto& [k, v] : static_labels_) { + metric.labels[k] = v; + } + for (size_t i = 0; i < labels_value.size(); i++) { + metric.labels[sum_->labels_name()[i]] = labels_value[i]; + } + + hist.metrics.push_back(std::move(metric)); + } + + if (!hist.metrics.empty()) { + iguana::to_json(hist, str); + } + } +#endif + + private: + template + bool is_strict_sorted(ForwardIterator first, ForwardIterator last) { + return std::adjacent_find( + first, last, + std::greater_equal::value_type>()) == last; + } + + std::string static_labels_str_; // preformatted static labels string + std::map static_labels_; + std::vector bucket_boundaries_; + std::vector>> + bucket_counts_; // readonly + std::shared_ptr> sum_; +}; + +using hybrid_histogram_1t = basic_hybrid_histogram; +using hybrid_histogram_1d = basic_hybrid_histogram; + +using hybrid_histogram_2t = basic_hybrid_histogram; +using hybrid_histogram_2d = basic_hybrid_histogram; +using hybrid_histogram_t = hybrid_histogram_2t; +using hybrid_histogram_d = hybrid_histogram_2d; + +using hybrid_histogram_3t = basic_hybrid_histogram; +using hybrid_histogram_3d = basic_hybrid_histogram; + +using hybrid_histogram_4t = basic_hybrid_histogram; +using hybrid_histogram_4d = basic_hybrid_histogram; + +using hybrid_histogram_5t = basic_hybrid_histogram; +using hybrid_histogram_5d = basic_hybrid_histogram; +} // namespace ylt::metric diff --git a/mooncake-store/tests/client_metrics_test.cpp b/mooncake-store/tests/client_metrics_test.cpp index ac49904fe..7739d595f 100644 --- a/mooncake-store/tests/client_metrics_test.cpp +++ b/mooncake-store/tests/client_metrics_test.cpp @@ -165,6 +165,8 @@ TEST_F(ClientMetricsTest, CompareWithSerializedMetrics) { std::cout << "\n=== Full Serialized Metrics ===" << std::endl; std::cout << serialized << std::endl; + EXPECT_TRUE(serialized.find("instance_id") != std::string::npos); + EXPECT_TRUE(serialized.find("cluster_id") != std::string::npos); // Summary should be much shorter and more readable EXPECT_LT(summary.length(), serialized.length());