Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions src/frontend/src/instance/jaeger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ use servers::error::{
};
use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams};
use servers::otlp::trace::{
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
DURATION_NANO_COLUMN, KEY_OTEL_STATUS_ERROR_KEY, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
};
use servers::query_handler::JaegerQueryHandler;
use session::context::QueryContextRef;
Expand Down Expand Up @@ -472,23 +473,41 @@ fn json_tag_filters(
Ok(filters)
}

macro_rules! col_eq_or_col_eq {
($span_key:expr, $resource_key:expr, $value:expr) => {
col($span_key)
.eq(lit($value))
.or(col($resource_key).eq(lit($value)))
};
}

fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
let filters = tags
.into_iter()
.filter_map(|(key, value)| {
let key = format!("\"span_attributes.{}\"", key);
if key == KEY_OTEL_STATUS_ERROR_KEY && value == JsonValue::Bool(true) {
return Some(col(SPAN_STATUS_CODE).eq(lit(SPAN_STATUS_ERROR)));
}

// TODO(shuiyisong): add more precise mapping from key to col name
let span_key = format!("\"span_attributes.{}\"", key);
let resource_key = format!("\"resource_attributes.{}\"", key);
match value {
JsonValue::String(value) => Some(col(key).eq(lit(value))),
JsonValue::String(value) => {
Some(col_eq_or_col_eq!(span_key, resource_key, value.clone()))
}
JsonValue::Number(value) => {
if value.is_f64() {
// safe to unwrap as checked previously
Some(col(key).eq(lit(value.as_f64().unwrap())))
let value = value.as_f64().unwrap();
Some(col_eq_or_col_eq!(span_key, resource_key, value))
} else {
Some(col(key).eq(lit(value.as_i64().unwrap())))
let value = value.as_i64().unwrap();
Some(col_eq_or_col_eq!(span_key, resource_key, value))
}
}
JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
JsonValue::Null => Some(col(key).is_null()),
JsonValue::Bool(value) => Some(col_eq_or_col_eq!(span_key, resource_key, value)),
JsonValue::Null => Some(col(span_key).is_null().or(col(resource_key).is_null())),
// not supported at the moment
JsonValue::Array(_value) => None,
JsonValue::Object(_value) => None,
Expand Down
42 changes: 38 additions & 4 deletions src/servers/src/http/jaeger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ use crate::http::extractor::TraceTableName;
use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
use crate::otlp::trace::{
DURATION_NANO_COLUMN, KEY_OTEL_SCOPE_NAME, KEY_OTEL_SCOPE_VERSION, KEY_OTEL_STATUS_CODE,
KEY_SERVICE_NAME, KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN,
SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN,
SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE,
SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
KEY_OTEL_STATUS_ERROR_KEY, KEY_OTEL_STATUS_MESSAGE, KEY_OTEL_TRACE_STATE, KEY_SERVICE_NAME,
KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN, SCOPE_VERSION_COLUMN,
SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN,
SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
SPAN_STATUS_MESSAGE_COLUMN, SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN,
TRACE_ID_COLUMN, TRACE_STATE_COLUMN,
};
use crate::query_handler::JaegerQueryHandlerRef;

Expand Down Expand Up @@ -859,6 +861,38 @@ fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
value_type: ValueType::String,
value: Value::String(normalize_status_code(&span_status)),
});
// set error to comply with the Jaeger API
if span_status == SPAN_STATUS_ERROR {
span.tags.push(KeyValue {
key: KEY_OTEL_STATUS_ERROR_KEY.to_string(),
value_type: ValueType::Boolean,
value: Value::Boolean(true),
});
}
}
}

SPAN_STATUS_MESSAGE_COLUMN => {
if let JsonValue::String(span_status_message) = cell
&& !span_status_message.is_empty()
{
span.tags.push(KeyValue {
key: KEY_OTEL_STATUS_MESSAGE.to_string(),
value_type: ValueType::String,
value: Value::String(span_status_message),
});
}
}

TRACE_STATE_COLUMN => {
if let JsonValue::String(trace_state) = cell
&& !trace_state.is_empty()
{
span.tags.push(KeyValue {
key: KEY_OTEL_TRACE_STATE.to_string(),
value_type: ValueType::String,
value: Value::String(trace_state),
});
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/servers/src/otlp/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ pub const TIMESTAMP_COLUMN: &str = "timestamp";
pub const DURATION_NANO_COLUMN: &str = "duration_nano";
pub const SPAN_KIND_COLUMN: &str = "span_kind";
pub const SPAN_STATUS_CODE: &str = "span_status_code";
pub const SPAN_STATUS_MESSAGE_COLUMN: &str = "span_status_message";
pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes";
pub const SPAN_EVENTS_COLUMN: &str = "span_events";
pub const SCOPE_NAME_COLUMN: &str = "scope_name";
pub const SCOPE_VERSION_COLUMN: &str = "scope_version";
pub const RESOURCE_ATTRIBUTES_COLUMN: &str = "resource_attributes";
pub const TRACE_STATE_COLUMN: &str = "trace_state";

// const keys
pub const KEY_SERVICE_NAME: &str = "service.name";
Expand All @@ -49,6 +51,9 @@ pub const KEY_SPAN_KIND: &str = "span.kind";
pub const KEY_OTEL_SCOPE_NAME: &str = "otel.scope.name";
pub const KEY_OTEL_SCOPE_VERSION: &str = "otel.scope.version";
pub const KEY_OTEL_STATUS_CODE: &str = "otel.status_code";
pub const KEY_OTEL_STATUS_MESSAGE: &str = "otel.status_description";
pub const KEY_OTEL_STATUS_ERROR_KEY: &str = "error";
pub const KEY_OTEL_TRACE_STATE: &str = "w3c.tracestate";

/// The span kind prefix in the database.
/// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database.
Expand All @@ -57,6 +62,7 @@ pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_";
// The span status code prefix in the database.
pub const SPAN_STATUS_PREFIX: &str = "STATUS_CODE_";
pub const SPAN_STATUS_UNSET: &str = "STATUS_CODE_UNSET";
pub const SPAN_STATUS_ERROR: &str = "STATUS_CODE_ERROR";

/// Convert SpanTraces to GreptimeDB row insert requests.
/// Returns `InsertRequests` and total number of rows to ingest
Expand Down
10 changes: 5 additions & 5 deletions src/servers/src/otlp/trace/v0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use crate::error::Result;
use crate::otlp::trace::span::{TraceSpan, parse};
use crate::otlp::trace::{
DURATION_NANO_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN,
TRACE_ID_COLUMN,
SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, SPAN_STATUS_CODE,
SPAN_STATUS_MESSAGE_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_STATE_COLUMN,
};
use crate::otlp::utils::{make_column_data, make_string_column_data};
use crate::query_handler::PipelineHandlerRef;
Expand Down Expand Up @@ -124,9 +124,9 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)),
make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)),
make_string_column_data("span_status_code", Some(span.span_status_code)),
make_string_column_data("span_status_message", Some(span.span_status_message)),
make_string_column_data("trace_state", Some(span.trace_state)),
make_string_column_data(SPAN_STATUS_CODE, Some(span.span_status_code)),
make_string_column_data(SPAN_STATUS_MESSAGE_COLUMN, Some(span.span_status_message)),
make_string_column_data(TRACE_STATE_COLUMN, Some(span.trace_state)),
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;

Expand Down
17 changes: 9 additions & 8 deletions src/servers/src/otlp/trace/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ use crate::error::Result;
use crate::otlp::trace::attributes::Attributes;
use crate::otlp::trace::span::{TraceSpan, parse};
use crate::otlp::trace::{
DURATION_NANO_COLUMN, KEY_SERVICE_NAME, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN,
TRACE_ID_COLUMN,
DURATION_NANO_COLUMN, KEY_SERVICE_NAME, PARENT_SPAN_ID_COLUMN, SCOPE_NAME_COLUMN,
SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN,
SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_MESSAGE_COLUMN,
TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_STATE_COLUMN,
};
use crate::otlp::utils::{any_value_to_jsonb, make_column_data, make_string_column_data};
use crate::query_handler::PipelineHandlerRef;
Expand Down Expand Up @@ -135,11 +136,11 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
make_string_column_data(SPAN_ID_COLUMN, Some(span.span_id)),
make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)),
make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)),
make_string_column_data("span_status_code", Some(span.span_status_code)),
make_string_column_data("span_status_message", Some(span.span_status_message)),
make_string_column_data("trace_state", Some(span.trace_state)),
make_string_column_data("scope_name", Some(span.scope_name)),
make_string_column_data("scope_version", Some(span.scope_version)),
make_string_column_data(SPAN_STATUS_CODE, Some(span.span_status_code)),
make_string_column_data(SPAN_STATUS_MESSAGE_COLUMN, Some(span.span_status_message)),
make_string_column_data(TRACE_STATE_COLUMN, Some(span.trace_state)),
make_string_column_data(SCOPE_NAME_COLUMN, Some(span.scope_name)),
make_string_column_data(SCOPE_VERSION_COLUMN, Some(span.scope_version)),
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;

Expand Down
Loading