Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 245 additions & 34 deletions src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
use std::sync::Arc;

use api::helper::ColumnDataTypeWrapper;
use api::v1::{ColumnDataType, RowInsertRequests};
use api::v1::alter_table_expr::Kind;
use api::v1::{
AlterTableExpr, ColumnDataType, ModifyColumnType, ModifyColumnTypes, RowInsertRequests,
};
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
Expand Down Expand Up @@ -60,6 +63,33 @@ enum ChunkFailureReaction {
Propagate,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum TraceReconcileDecision {
UseExisting(ColumnDataType),
UseRequestLocal(ColumnDataType),
AlterExistingTo(ColumnDataType),
}

impl TraceReconcileDecision {
fn target_type(self) -> ColumnDataType {
match self {
Self::UseExisting(target_type)
| Self::UseRequestLocal(target_type)
| Self::AlterExistingTo(target_type) => target_type,
}
}

fn requires_alter(self) -> bool {
matches!(self, Self::AlterExistingTo(_))
}
}

struct PendingTraceColumnRewrite {
col_idx: usize,
target_type: ColumnDataType,
column_name: String,
}

impl ChunkFailureReaction {
fn as_metric_label(self) -> &'static str {
match self {
Expand Down Expand Up @@ -546,34 +576,126 @@ impl Instance {
Some(summary)
}

/// Picks the final datatype for one trace column.
/// Picks the reconciliation action for one trace column.
///
/// Existing table schema is authoritative when present. Otherwise we resolve the
/// request-local observed types using the shared trace coercion rules.
fn choose_trace_target_type(
/// Existing table schema is authoritative unless the only incompatible case is
/// widening an existing Int64 column to Float64 for incoming Int64/Float64 data.
fn choose_trace_reconcile_decision(
observed_types: &[ColumnDataType],
existing_type: Option<ColumnDataType>,
) -> ServerResult<Option<ColumnDataType>> {
) -> ServerResult<Option<TraceReconcileDecision>> {
let Some(existing_type) = existing_type else {
return resolve_new_trace_column_type(observed_types.iter().copied()).map_err(|_| {
error::InvalidParameterSnafu {
reason: "unsupported trace type mix".to_string(),
}
.build()
});
return resolve_new_trace_column_type(observed_types.iter().copied())
.map(|target_type| target_type.map(TraceReconcileDecision::UseRequestLocal))
.map_err(|_| {
error::InvalidParameterSnafu {
reason: "unsupported trace type mix".to_string(),
}
.build()
});
};

if observed_types.iter().copied().all(|request_type| {
if observed_types.iter().all(|&request_type| {
request_type == existing_type
|| is_supported_trace_coercion(request_type, existing_type)
}) {
Ok(Some(existing_type))
} else {
error::InvalidParameterSnafu {
reason: "unsupported trace type mix".to_string(),
return Ok(Some(TraceReconcileDecision::UseExisting(existing_type)));
}

if existing_type == ColumnDataType::Int64
&& observed_types.contains(&ColumnDataType::Float64)
&& observed_types.iter().all(|observed_type| {
matches!(
observed_type,
ColumnDataType::Int64 | ColumnDataType::Float64
)
})
{
return Ok(Some(TraceReconcileDecision::AlterExistingTo(
ColumnDataType::Float64,
)));
}

error::InvalidParameterSnafu {
reason: "unsupported trace type mix".to_string(),
}
.fail()
}

/// Widen existing trace table columns to Float64 before request rewrite.
async fn alter_trace_table_columns_to_float64(
&self,
ctx: &QueryContextRef,
table_name: &str,
column_names: &[String],
) -> ServerResult<()> {
let catalog_name = ctx.current_catalog().to_string();
let schema_name = ctx.current_schema();
let alter_expr = AlterTableExpr {
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.to_string(),
kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
modify_column_types: column_names
.iter()
.map(|column_name| ModifyColumnType {
column_name: column_name.clone(),
target_type: ColumnDataType::Float64 as i32,
target_type_extension: None,
})
.collect(),
})),
};

if let Err(err) = self
.statement_executor
.alter_table_inner(alter_expr, ctx.clone())
.await
{
let table = self
.catalog_manager
.table(&catalog_name, &schema_name, table_name, None)
.await
.map_err(servers::error::Error::from)?;
let alter_already_applied = table
.map(|table| {
let table_schema = table.schema();
column_names.iter().all(|column_name| {
table_schema
.column_schema_by_name(column_name)
.and_then(|table_col| {
ColumnDataTypeWrapper::try_from(table_col.data_type.clone())
.ok()
.map(|wrapper| wrapper.datatype())
})
== Some(ColumnDataType::Float64)
})
})
.unwrap_or(false);

if alter_already_applied {
return Ok(());
}

tracing::warn!(
Comment thread
shuiyisong marked this conversation as resolved.
Outdated
table_name,
columns = ?column_names,
error = %err,
"failed to widen trace columns before insert"
);

return error::InternalSnafu {
err_msg: format!(
"failed to widen trace columns {:?} in table '{}' to Float64 after alter failure ({})",
column_names,
table_name,
Comment thread
shuiyisong marked this conversation as resolved.
Outdated
err.status_code().as_ref()
),
}
.fail()
.fail();
Comment thread
shuiyisong marked this conversation as resolved.
Outdated
}

Ok(())
}

/// Coerce request column types and values to match the existing table schema
Expand All @@ -598,7 +720,8 @@ impl Instance {
};

let table_schema = table.map(|table| table.schema());
let mut pending_coercions = Vec::new();
let mut pending_rewrites = Vec::new();
let mut pending_alter_columns = Vec::new();

for (col_idx, col_schema) in rows.schema.iter().enumerate() {
let Some(current_type) = ColumnDataType::try_from(col_schema.datatype).ok() else {
Expand Down Expand Up @@ -647,8 +770,8 @@ impl Instance {

// Decide the final type once per column, then rewrite all affected cells
// together in one row pass below.
let Some(target_type) =
Self::choose_trace_target_type(&observed_types, existing_type).map_err(
let Some(decision) =
Self::choose_trace_reconcile_decision(&observed_types, existing_type).map_err(
|_| {
enrich_trace_reconcile_error(
&req.table_name,
Expand All @@ -661,52 +784,76 @@ impl Instance {
else {
continue;
};
let target_type = decision.target_type();

if observed_types
.iter()
.all(|observed| *observed == target_type)
if !decision.requires_alter()
&& observed_types
.iter()
.all(|observed| *observed == target_type)
&& col_schema.datatype == target_type as i32
{
continue;
}

pending_coercions.push((col_idx, target_type, col_schema.column_name.clone()));
if decision.requires_alter()
&& !pending_alter_columns.contains(&col_schema.column_name)
{
pending_alter_columns.push(col_schema.column_name.clone());
}

pending_rewrites.push(PendingTraceColumnRewrite {
col_idx,
target_type,
column_name: col_schema.column_name.clone(),
});
}

if pending_coercions.is_empty() {
if pending_rewrites.is_empty() {
continue;
}

if !pending_alter_columns.is_empty() {
self.alter_trace_table_columns_to_float64(
ctx,
&req.table_name,
&pending_alter_columns,
)
.await?;
Comment thread
shuiyisong marked this conversation as resolved.
}

// Update schema metadata before mutating row values so both stay in sync.
for (col_idx, target_type, ..) in &pending_coercions {
rows.schema[*col_idx].datatype = *target_type as i32;
for pending_rewrite in &pending_rewrites {
rows.schema[pending_rewrite.col_idx].datatype = pending_rewrite.target_type as i32;
}

// Apply all pending column rewrites in one row pass.
for row in &mut rows.rows {
for (col_idx, target_type, column_name) in &pending_coercions {
let Some(value) = row.values.get_mut(*col_idx) else {
for pending_rewrite in &pending_rewrites {
let Some(value) = row.values.get_mut(pending_rewrite.col_idx) else {
continue;
};
let Some(request_type) =
value.value_data.as_ref().and_then(trace_value_datatype)
else {
continue;
};
if request_type == *target_type {
if request_type == pending_rewrite.target_type {
continue;
}

value.value_data = coerce_value_data(
&value.value_data,
*target_type,
pending_rewrite.target_type,
request_type,
)
.map_err(|_| {
error::InvalidParameterSnafu {
reason: format!(
"failed to coerce trace column '{}' in table '{}' from {:?} to {:?}",
column_name, req.table_name, request_type, target_type
pending_rewrite.column_name,
req.table_name,
request_type,
pending_rewrite.target_type
),
}
.build()
Expand Down Expand Up @@ -767,10 +914,12 @@ fn push_observed_trace_type(observed_types: &mut Vec<ColumnDataType>, datatype:

#[cfg(test)]
mod tests {
use api::v1::ColumnDataType;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use servers::query_handler::TraceIngestOutcome;

use super::{ChunkFailureReaction, Instance};
use super::{ChunkFailureReaction, Instance, TraceReconcileDecision};
use crate::metrics::OTLP_TRACES_FAILURE_COUNT;

#[test]
Expand Down Expand Up @@ -923,4 +1072,66 @@ mod tests {
ChunkFailureReaction::DiscardChunk
);
}

#[test]
fn test_choose_trace_reconcile_decision_existing_int64_keeps_int64() {
assert_eq!(
Instance::choose_trace_reconcile_decision(
&[ColumnDataType::Int64],
Some(ColumnDataType::Int64)
)
.unwrap(),
Some(TraceReconcileDecision::UseExisting(ColumnDataType::Int64))
);
}

#[test]
fn test_choose_trace_reconcile_decision_existing_int64_widens_to_float64() {
assert_eq!(
Instance::choose_trace_reconcile_decision(
&[ColumnDataType::Int64, ColumnDataType::Float64],
Some(ColumnDataType::Int64)
)
.unwrap(),
Some(TraceReconcileDecision::AlterExistingTo(
ColumnDataType::Float64
))
);
}

#[test]
fn test_choose_trace_reconcile_decision_existing_float64_stays_authoritative() {
assert_eq!(
Instance::choose_trace_reconcile_decision(
&[ColumnDataType::Int64, ColumnDataType::Float64],
Some(ColumnDataType::Float64)
)
.unwrap(),
Some(TraceReconcileDecision::UseExisting(ColumnDataType::Float64))
);
}

#[test]
fn test_choose_trace_reconcile_decision_existing_int64_with_boolean_is_error() {
let err = Instance::choose_trace_reconcile_decision(
&[ColumnDataType::Boolean, ColumnDataType::Int64],
Some(ColumnDataType::Int64),
)
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
}

#[test]
fn test_choose_trace_reconcile_decision_request_local_prefers_float64() {
assert_eq!(
Instance::choose_trace_reconcile_decision(
&[ColumnDataType::Int64, ColumnDataType::Float64],
None
)
.unwrap(),
Some(TraceReconcileDecision::UseRequestLocal(
ColumnDataType::Float64
))
);
}
}
Loading
Loading