1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414
15- use std:: collections:: HashMap ;
15+ use std:: collections:: { HashMap , HashSet } ;
1616use std:: sync:: Arc ;
1717
1818use async_trait:: async_trait;
@@ -28,6 +28,7 @@ use common_function::scalars::udf::create_udf;
2828use common_query:: { Output , OutputData } ;
2929use common_recordbatch:: adapter:: RecordBatchStreamAdapter ;
3030use common_recordbatch:: util;
31+ use common_telemetry:: warn;
3132use datafusion:: dataframe:: DataFrame ;
3233use datafusion:: execution:: SessionStateBuilder ;
3334use datafusion:: execution:: context:: SessionContext ;
@@ -42,8 +43,9 @@ use servers::error::{
4243} ;
4344use servers:: http:: jaeger:: { JAEGER_QUERY_TABLE_NAME_KEY , QueryTraceParams } ;
4445use servers:: otlp:: trace:: {
45- DURATION_NANO_COLUMN , SERVICE_NAME_COLUMN , SPAN_ATTRIBUTES_COLUMN , SPAN_KIND_COLUMN ,
46- SPAN_KIND_PREFIX , SPAN_NAME_COLUMN , TIMESTAMP_COLUMN , TRACE_ID_COLUMN ,
46+ DURATION_NANO_COLUMN , KEY_OTEL_STATUS_ERROR_KEY , SERVICE_NAME_COLUMN , SPAN_ATTRIBUTES_COLUMN ,
47+ SPAN_KIND_COLUMN , SPAN_KIND_PREFIX , SPAN_NAME_COLUMN , SPAN_STATUS_CODE , SPAN_STATUS_ERROR ,
48+ TIMESTAMP_COLUMN , TRACE_ID_COLUMN ,
4749} ;
4850use servers:: query_handler:: JaegerQueryHandler ;
4951use session:: context:: QueryContextRef ;
@@ -322,6 +324,7 @@ async fn query_trace_table(
322324 } ) ?;
323325
324326 let is_data_model_v1 = table
327+ . clone ( )
325328 . table_info ( )
326329 . meta
327330 . options
@@ -330,6 +333,14 @@ async fn query_trace_table(
330333 . map ( |s| s. as_str ( ) )
331334 == Some ( TABLE_DATA_MODEL_TRACE_V1 ) ;
332335
336+ // collect to set
337+ let col_names = table
338+ . table_info ( )
339+ . meta
340+ . field_column_names ( )
341+ . map ( |s| format ! ( "\" {}\" " , s) )
342+ . collect :: < HashSet < String > > ( ) ;
343+
333344 let df_context = create_df_context ( query_engine) ?;
334345
335346 let dataframe = df_context
@@ -342,7 +353,7 @@ async fn query_trace_table(
342353 let dataframe = filters
343354 . into_iter ( )
344355 . chain ( tags. map_or ( Ok ( vec ! [ ] ) , |t| {
345- tags_filters ( & dataframe, t, is_data_model_v1)
356+ tags_filters ( & dataframe, t, is_data_model_v1, & col_names )
346357 } ) ?)
347358 . try_fold ( dataframe, |df, expr| {
348359 df. filter ( expr) . context ( DataFusionSnafu )
@@ -472,23 +483,73 @@ fn json_tag_filters(
472483 Ok ( filters)
473484}
474485
475- fn flatten_tag_filters ( tags : HashMap < String , JsonValue > ) -> ServerResult < Vec < Expr > > {
486+ /// Helper function to check if span_key or resource_key exists in col_names and create an expression.
487+ /// If neither exists, logs a warning and returns None.
488+ #[ inline]
489+ fn check_col_and_build_expr < F > (
490+ span_key : String ,
491+ resource_key : String ,
492+ key : & str ,
493+ col_names : & HashSet < String > ,
494+ expr_builder : F ,
495+ ) -> Option < Expr >
496+ where
497+ F : FnOnce ( String ) -> Expr ,
498+ {
499+ if col_names. contains ( & span_key) {
500+ return Some ( expr_builder ( span_key) ) ;
501+ }
502+ if col_names. contains ( & resource_key) {
503+ return Some ( expr_builder ( resource_key) ) ;
504+ }
505+ warn ! ( "tag key {} not found in table columns" , key) ;
506+ None
507+ }
508+
509+ fn flatten_tag_filters (
510+ tags : HashMap < String , JsonValue > ,
511+ col_names : & HashSet < String > ,
512+ ) -> ServerResult < Vec < Expr > > {
476513 let filters = tags
477514 . into_iter ( )
478515 . filter_map ( |( key, value) | {
479- let key = format ! ( "\" span_attributes.{}\" " , key) ;
516+ if key == KEY_OTEL_STATUS_ERROR_KEY && value == JsonValue :: Bool ( true ) {
517+ return Some ( col ( SPAN_STATUS_CODE ) . eq ( lit ( SPAN_STATUS_ERROR ) ) ) ;
518+ }
519+
520+ // TODO(shuiyisong): add more precise mapping from key to col name
521+ let span_key = format ! ( "\" span_attributes.{}\" " , key) ;
522+ let resource_key = format ! ( "\" resource_attributes.{}\" " , key) ;
480523 match value {
481- JsonValue :: String ( value) => Some ( col ( key) . eq ( lit ( value) ) ) ,
524+ JsonValue :: String ( value) => {
525+ check_col_and_build_expr ( span_key, resource_key, & key, col_names, |k| {
526+ col ( k) . eq ( lit ( value) )
527+ } )
528+ }
482529 JsonValue :: Number ( value) => {
483530 if value. is_f64 ( ) {
484531 // safe to unwrap as checked previously
485- Some ( col ( key) . eq ( lit ( value. as_f64 ( ) . unwrap ( ) ) ) )
532+ let value = value. as_f64 ( ) . unwrap ( ) ;
533+ check_col_and_build_expr ( span_key, resource_key, & key, col_names, |k| {
534+ col ( k) . eq ( lit ( value) )
535+ } )
486536 } else {
487- Some ( col ( key) . eq ( lit ( value. as_i64 ( ) . unwrap ( ) ) ) )
537+ let value = value. as_i64 ( ) . unwrap ( ) ;
538+ check_col_and_build_expr ( span_key, resource_key, & key, col_names, |k| {
539+ col ( k) . eq ( lit ( value) )
540+ } )
488541 }
489542 }
490- JsonValue :: Bool ( value) => Some ( col ( key) . eq ( lit ( value) ) ) ,
491- JsonValue :: Null => Some ( col ( key) . is_null ( ) ) ,
543+ JsonValue :: Bool ( value) => {
544+ check_col_and_build_expr ( span_key, resource_key, & key, col_names, |k| {
545+ col ( k) . eq ( lit ( value) )
546+ } )
547+ }
548+ JsonValue :: Null => {
549+ check_col_and_build_expr ( span_key, resource_key, & key, col_names, |k| {
550+ col ( k) . is_null ( )
551+ } )
552+ }
492553 // not supported at the moment
493554 JsonValue :: Array ( _value) => None ,
494555 JsonValue :: Object ( _value) => None ,
@@ -502,9 +563,10 @@ fn tags_filters(
502563 dataframe : & DataFrame ,
503564 tags : HashMap < String , JsonValue > ,
504565 is_data_model_v1 : bool ,
566+ col_names : & HashSet < String > ,
505567) -> ServerResult < Vec < Expr > > {
506568 if is_data_model_v1 {
507- flatten_tag_filters ( tags)
569+ flatten_tag_filters ( tags, col_names )
508570 } else {
509571 json_tag_filters ( dataframe, tags)
510572 }
0 commit comments