diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index e4c9677a4c62..f9d1e1c21bac 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -13,22 +13,43 @@ // limitations under the License. //! prom supply the prometheus HTTP API Server compliance + +use std::borrow::Borrow; use std::collections::{BTreeMap, HashMap, HashSet}; +use std::hash::{Hash, Hasher}; use std::sync::Arc; +use arrow::array::AsArray; +use arrow::datatypes::{ + Date32Type, Date64Type, Decimal128Type, DurationMicrosecondType, DurationMillisecondType, + DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int8Type, Int16Type, + Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, + Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, + TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, + TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type, +}; +use arrow_schema::{DataType, IntervalUnit, TimeUnit}; use axum::extract::{Path, Query, State}; use axum::{Extension, Form}; use catalog::CatalogManagerRef; use common_catalog::parse_catalog_and_schema_from_db_string; +use common_decimal::Decimal128; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_query::{Output, OutputData}; -use common_recordbatch::RecordBatches; +use common_recordbatch::{RecordBatch, RecordBatches}; use common_telemetry::{debug, tracing}; +use common_time::time::Time; use common_time::util::{current_time_rfc3339, yesterday_rfc3339}; +use common_time::{ + Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp, +}; use common_version::OwnedBuildInfo; +use datafusion_common::ScalarValue; use datatypes::prelude::ConcreteDataType; use datatypes::scalars::ScalarVector; +use datatypes::schema::{ColumnSchema, SchemaRef}; +use datatypes::types::jsonb_to_string; use datatypes::vectors::Float64Vector; use futures::StreamExt; use futures::future::join_all; @@ -53,8 +74,9 @@ use store_api::metric_engine_consts::{ pub use super::result::prometheus_resp::PrometheusJsonResponse; use crate::error::{ - CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, ParseTimestampSnafu, Result, - TableNotFoundSnafu, UnexpectedResultSnafu, + CatalogSnafu, CollectRecordbatchSnafu, ConvertScalarValueSnafu, DataFusionSnafu, Error, + InvalidQuerySnafu, NotSupportedSnafu, ParseTimestampSnafu, Result, TableNotFoundSnafu, + UnexpectedResultSnafu, }; use crate::http::header::collect_plan_metrics; use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL}; @@ -98,12 +120,23 @@ pub struct PromData { pub result: PromQueryResult, } +/// A "holder" for the reference([Arc]) to a column name, +/// to help avoiding cloning [String]s when used as a [HashMap] key. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub struct Column(Arc); + +impl From<&str> for Column { + fn from(s: &str) -> Self { + Self(Arc::new(s.to_string())) + } +} + #[derive(Debug, Default, Serialize, Deserialize, PartialEq)] #[serde(untagged)] pub enum PrometheusResponse { PromData(PromData), Labels(Vec), - Series(Vec>), + Series(Vec>), LabelValues(Vec), FormatQuery(String), BuildInfo(OwnedBuildInfo), @@ -622,7 +655,7 @@ async fn get_all_column_names( async fn retrieve_series_from_query_result( result: Result, - series: &mut Vec>, + series: &mut Vec>, query_ctx: &QueryContext, table_name: &str, manager: &CatalogManagerRef, @@ -700,7 +733,7 @@ async fn retrieve_labels_name_from_query_result( fn record_batches_to_series( batches: RecordBatches, - series: &mut Vec>, + series: &mut Vec>, table_name: &str, tag_columns: &HashSet, ) -> Result<()> { @@ -723,20 +756,353 @@ fn record_batches_to_series( .try_project(&projection) .context(CollectRecordbatchSnafu)?; - for row in batch.rows() { - let mut element: HashMap = row + let mut writer = RowWriter::new(&batch.schema, table_name); + writer.write(batch, series)?; + } + Ok(()) +} + +/// Writer from a row in the record batch to a Prometheus time series: +/// +/// `{__name__="",