Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 74 additions & 12 deletions src/frontend/src/instance/jaeger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use async_trait::async_trait;
Expand All @@ -28,6 +28,7 @@ use common_function::scalars::udf::create_udf;
use common_query::{Output, OutputData};
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::util;
use common_telemetry::warn;
use datafusion::dataframe::DataFrame;
use datafusion::execution::SessionStateBuilder;
use datafusion::execution::context::SessionContext;
Expand All @@ -42,8 +43,9 @@ use servers::error::{
};
use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams};
use servers::otlp::trace::{
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
DURATION_NANO_COLUMN, KEY_OTEL_STATUS_ERROR_KEY, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
};
use servers::query_handler::JaegerQueryHandler;
use session::context::QueryContextRef;
Expand Down Expand Up @@ -322,6 +324,7 @@ async fn query_trace_table(
})?;

let is_data_model_v1 = table
.clone()
.table_info()
.meta
.options
Expand All @@ -330,6 +333,14 @@ async fn query_trace_table(
.map(|s| s.as_str())
== Some(TABLE_DATA_MODEL_TRACE_V1);

// collect to set
let col_names = table
.table_info()
.meta
.field_column_names()
.map(|s| format!("\"{}\"", s))
.collect::<HashSet<String>>();

let df_context = create_df_context(query_engine)?;

let dataframe = df_context
Expand All @@ -342,7 +353,7 @@ async fn query_trace_table(
let dataframe = filters
.into_iter()
.chain(tags.map_or(Ok(vec![]), |t| {
tags_filters(&dataframe, t, is_data_model_v1)
tags_filters(&dataframe, t, is_data_model_v1, &col_names)
})?)
.try_fold(dataframe, |df, expr| {
df.filter(expr).context(DataFusionSnafu)
Expand Down Expand Up @@ -472,23 +483,73 @@ fn json_tag_filters(
Ok(filters)
}

fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
/// Helper function to check if span_key or resource_key exists in col_names and create an expression.
/// If neither exists, logs a warning and returns None.
#[inline]
fn check_col_and_build_expr<F>(
span_key: String,
resource_key: String,
key: &str,
col_names: &HashSet<String>,
expr_builder: F,
) -> Option<Expr>
where
F: FnOnce(String) -> Expr,
{
if col_names.contains(&span_key) {
return Some(expr_builder(span_key));
}
if col_names.contains(&resource_key) {
return Some(expr_builder(resource_key));
}
warn!("tag key {} not found in table columns", key);
None
}

fn flatten_tag_filters(
tags: HashMap<String, JsonValue>,
col_names: &HashSet<String>,
) -> ServerResult<Vec<Expr>> {
let filters = tags
.into_iter()
.filter_map(|(key, value)| {
let key = format!("\"span_attributes.{}\"", key);
if key == KEY_OTEL_STATUS_ERROR_KEY && value == JsonValue::Bool(true) {
return Some(col(SPAN_STATUS_CODE).eq(lit(SPAN_STATUS_ERROR)));
}

// TODO(shuiyisong): add more precise mapping from key to col name
let span_key = format!("\"span_attributes.{}\"", key);
let resource_key = format!("\"resource_attributes.{}\"", key);
match value {
JsonValue::String(value) => Some(col(key).eq(lit(value))),
JsonValue::String(value) => {
check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
col(k).eq(lit(value))
})
}
JsonValue::Number(value) => {
if value.is_f64() {
// safe to unwrap as checked previously
Some(col(key).eq(lit(value.as_f64().unwrap())))
let value = value.as_f64().unwrap();
check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
col(k).eq(lit(value))
})
} else {
Some(col(key).eq(lit(value.as_i64().unwrap())))
let value = value.as_i64().unwrap();
check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
col(k).eq(lit(value))
})
}
}
JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
JsonValue::Null => Some(col(key).is_null()),
JsonValue::Bool(value) => {
check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
col(k).eq(lit(value))
})
}
JsonValue::Null => {
check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
col(k).is_null()
})
}
// not supported at the moment
JsonValue::Array(_value) => None,
JsonValue::Object(_value) => None,
Expand All @@ -502,9 +563,10 @@ fn tags_filters(
dataframe: &DataFrame,
tags: HashMap<String, JsonValue>,
is_data_model_v1: bool,
col_names: &HashSet<String>,
) -> ServerResult<Vec<Expr>> {
if is_data_model_v1 {
flatten_tag_filters(tags)
flatten_tag_filters(tags, col_names)
} else {
json_tag_filters(dataframe, tags)
}
Expand Down
47 changes: 42 additions & 5 deletions src/servers/src/http/jaeger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ use crate::http::extractor::TraceTableName;
use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
use crate::otlp::trace::{
DURATION_NANO_COLUMN, KEY_OTEL_SCOPE_NAME, KEY_OTEL_SCOPE_VERSION, KEY_OTEL_STATUS_CODE,
KEY_SERVICE_NAME, KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN,
SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN,
SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE,
SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
KEY_OTEL_STATUS_ERROR_KEY, KEY_OTEL_STATUS_MESSAGE, KEY_OTEL_TRACE_STATE, KEY_SERVICE_NAME,
KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN, SCOPE_VERSION_COLUMN,
SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN,
SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
SPAN_STATUS_MESSAGE_COLUMN, SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN,
TRACE_ID_COLUMN, TRACE_STATE_COLUMN,
};
use crate::query_handler::JaegerQueryHandlerRef;

Expand Down Expand Up @@ -654,7 +656,10 @@ async fn covert_to_records(output: Output) -> Result<Option<HttpRecordsOutput>>
.await
.context(CollectRecordbatchSnafu)?,
)?;
debug!("The query records: {:?}", records);
debug!(
"The query records: {}",
serde_json::to_string(&records).unwrap()
);
Ok(Some(records))
}
// It's unlikely to happen. However, if the output is not a stream, return None.
Expand Down Expand Up @@ -859,6 +864,38 @@ fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
value_type: ValueType::String,
value: Value::String(normalize_status_code(&span_status)),
});
// set error to comply with the Jaeger API
if span_status == SPAN_STATUS_ERROR {
span.tags.push(KeyValue {
key: KEY_OTEL_STATUS_ERROR_KEY.to_string(),
value_type: ValueType::Boolean,
value: Value::Boolean(true),
});
}
}
}

SPAN_STATUS_MESSAGE_COLUMN => {
if let JsonValue::String(span_status_message) = cell
&& !span_status_message.is_empty()
{
span.tags.push(KeyValue {
key: KEY_OTEL_STATUS_MESSAGE.to_string(),
value_type: ValueType::String,
value: Value::String(span_status_message),
});
}
}

TRACE_STATE_COLUMN => {
if let JsonValue::String(trace_state) = cell
&& !trace_state.is_empty()
{
span.tags.push(KeyValue {
key: KEY_OTEL_TRACE_STATE.to_string(),
value_type: ValueType::String,
value: Value::String(trace_state),
});
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/servers/src/otlp/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ pub const TIMESTAMP_COLUMN: &str = "timestamp";
pub const DURATION_NANO_COLUMN: &str = "duration_nano";
pub const SPAN_KIND_COLUMN: &str = "span_kind";
pub const SPAN_STATUS_CODE: &str = "span_status_code";
pub const SPAN_STATUS_MESSAGE_COLUMN: &str = "span_status_message";
pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes";
pub const SPAN_EVENTS_COLUMN: &str = "span_events";
pub const SCOPE_NAME_COLUMN: &str = "scope_name";
pub const SCOPE_VERSION_COLUMN: &str = "scope_version";
pub const RESOURCE_ATTRIBUTES_COLUMN: &str = "resource_attributes";
pub const TRACE_STATE_COLUMN: &str = "trace_state";

// const keys
pub const KEY_SERVICE_NAME: &str = "service.name";
Expand All @@ -49,6 +51,9 @@ pub const KEY_SPAN_KIND: &str = "span.kind";
pub const KEY_OTEL_SCOPE_NAME: &str = "otel.scope.name";
pub const KEY_OTEL_SCOPE_VERSION: &str = "otel.scope.version";
pub const KEY_OTEL_STATUS_CODE: &str = "otel.status_code";
pub const KEY_OTEL_STATUS_MESSAGE: &str = "otel.status_description";
pub const KEY_OTEL_STATUS_ERROR_KEY: &str = "error";
pub const KEY_OTEL_TRACE_STATE: &str = "w3c.tracestate";

/// The span kind prefix in the database.
/// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database.
Expand All @@ -57,6 +62,7 @@ pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_";
// The span status code prefix in the database.
pub const SPAN_STATUS_PREFIX: &str = "STATUS_CODE_";
pub const SPAN_STATUS_UNSET: &str = "STATUS_CODE_UNSET";
pub const SPAN_STATUS_ERROR: &str = "STATUS_CODE_ERROR";

/// Convert SpanTraces to GreptimeDB row insert requests.
/// Returns `InsertRequests` and total number of rows to ingest
Expand Down
10 changes: 5 additions & 5 deletions src/servers/src/otlp/trace/v0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use crate::error::Result;
use crate::otlp::trace::span::{TraceSpan, parse};
use crate::otlp::trace::{
DURATION_NANO_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN,
TRACE_ID_COLUMN,
SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, SPAN_STATUS_CODE,
SPAN_STATUS_MESSAGE_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_STATE_COLUMN,
};
use crate::otlp::utils::{make_column_data, make_string_column_data};
use crate::query_handler::PipelineHandlerRef;
Expand Down Expand Up @@ -124,9 +124,9 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)),
make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)),
make_string_column_data("span_status_code", Some(span.span_status_code)),
make_string_column_data("span_status_message", Some(span.span_status_message)),
make_string_column_data("trace_state", Some(span.trace_state)),
make_string_column_data(SPAN_STATUS_CODE, Some(span.span_status_code)),
make_string_column_data(SPAN_STATUS_MESSAGE_COLUMN, Some(span.span_status_message)),
make_string_column_data(TRACE_STATE_COLUMN, Some(span.trace_state)),
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;

Expand Down
17 changes: 9 additions & 8 deletions src/servers/src/otlp/trace/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ use crate::error::Result;
use crate::otlp::trace::attributes::Attributes;
use crate::otlp::trace::span::{TraceSpan, parse};
use crate::otlp::trace::{
DURATION_NANO_COLUMN, KEY_SERVICE_NAME, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN,
TRACE_ID_COLUMN,
DURATION_NANO_COLUMN, KEY_SERVICE_NAME, PARENT_SPAN_ID_COLUMN, SCOPE_NAME_COLUMN,
SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN,
SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_MESSAGE_COLUMN,
TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_STATE_COLUMN,
};
use crate::otlp::utils::{any_value_to_jsonb, make_column_data, make_string_column_data};
use crate::query_handler::PipelineHandlerRef;
Expand Down Expand Up @@ -135,11 +136,11 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
make_string_column_data(SPAN_ID_COLUMN, Some(span.span_id)),
make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)),
make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)),
make_string_column_data("span_status_code", Some(span.span_status_code)),
make_string_column_data("span_status_message", Some(span.span_status_message)),
make_string_column_data("trace_state", Some(span.trace_state)),
make_string_column_data("scope_name", Some(span.scope_name)),
make_string_column_data("scope_version", Some(span.scope_version)),
make_string_column_data(SPAN_STATUS_CODE, Some(span.span_status_code)),
make_string_column_data(SPAN_STATUS_MESSAGE_COLUMN, Some(span.span_status_message)),
make_string_column_data(TRACE_STATE_COLUMN, Some(span.trace_state)),
make_string_column_data(SCOPE_NAME_COLUMN, Some(span.scope_name)),
make_string_column_data(SCOPE_VERSION_COLUMN, Some(span.scope_version)),
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;

Expand Down
Loading
Loading