diff --git a/rust/otap-dataflow/benchmarks/benches/attribute_transform/main.rs b/rust/otap-dataflow/benchmarks/benches/attribute_transform/main.rs index 790a655954..b389888152 100644 --- a/rust/otap-dataflow/benchmarks/benches/attribute_transform/main.rs +++ b/rust/otap-dataflow/benchmarks/benches/attribute_transform/main.rs @@ -72,6 +72,7 @@ fn generate_dict_keys_attribute_batch( fn bench_transform_attributes(c: &mut Criterion) { // Pre-create AttributesTransform instances to avoid measuring their creation cost let single_replace_no_delete = AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter([( "attr24".into(), "attr_24".into(), @@ -80,6 +81,7 @@ fn bench_transform_attributes(c: &mut Criterion) { }; let single_replace_single_delete = AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter([( "attr24".into(), "attr_24".into(), @@ -88,11 +90,13 @@ fn bench_transform_attributes(c: &mut Criterion) { }; let no_replace_single_delete = AttributesTransform { + insert: None, rename: None, delete: Some(DeleteTransform::new(BTreeSet::from_iter(["attr15".into()]))), }; let attr3_replace_no_delete = AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter([( "attr3".into(), "attr_3".into(), @@ -101,11 +105,13 @@ fn bench_transform_attributes(c: &mut Criterion) { }; let no_replace_attr9_delete = AttributesTransform { + insert: None, rename: None, delete: Some(DeleteTransform::new(BTreeSet::from_iter(["attr9".into()]))), }; let attr3_replace_attr9_delete = AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter([( "attr3".into(), "attr_3".into(), diff --git a/rust/otap-dataflow/crates/otap/src/attributes_processor.rs b/rust/otap-dataflow/crates/otap/src/attributes_processor.rs index 8cf5d122b8..6d832551c8 100644 --- a/rust/otap-dataflow/crates/otap/src/attributes_processor.rs +++ b/rust/otap-dataflow/crates/otap/src/attributes_processor.rs @@ -10,9 +10,10 @@ //! Supported actions (current subset): //! - `rename`: Renames an attribute key (non-standard deviation from the Go collector). //! - `delete`: Removes an attribute by key. +//! - `insert`: Inserts a new attribute. //! //! Unsupported actions are ignored if present in the config: -//! `insert`, `upsert`, `update` (value update), `hash`, `extract`, `convert`. +//! `upsert`, `update` (value update), `hash`, `extract`, `convert`. //! We may add support for them later. //! //! Example configuration (YAML): @@ -47,7 +48,8 @@ use otap_df_engine::processor::ProcessorWrapper; use otap_df_pdata::otap::{ OtapArrowRecords, transform::{ - AttributesTransform, DeleteTransform, RenameTransform, transform_attributes_with_stats, + AttributesTransform, DeleteTransform, InsertTransform, LiteralValue, RenameTransform, + transform_attributes_with_stats, }, }; use otap_df_pdata::proto::opentelemetry::arrow::v1::ArrowPayloadType; @@ -81,6 +83,14 @@ pub enum Action { key: String, }, + /// Insert a new attribute. + Insert { + /// The attribute key to insert. + key: String, + /// The value to insert. + value: LiteralValue, + }, + /// Other actions are accepted for forward-compatibility but ignored. /// These variants allow deserialization of Go-style configs without effect. #[serde(other)] @@ -91,7 +101,7 @@ pub enum Action { /// Configuration for the AttributesProcessor. /// /// Accepts configuration in the same format as the OpenTelemetry Collector's attributes processor. -/// Supported actions: rename (deviation), delete. Others are ignored. +/// Supported actions: rename (deviation), delete, insert. Others are ignored. /// /// You can control which attribute domains are transformed via `apply_to`. /// Valid values: "signal" (default), "resource", "scope". @@ -140,6 +150,7 @@ impl AttributesProcessor { fn new(config: Config) -> Result { let mut renames = BTreeMap::new(); let mut deletes = BTreeSet::new(); + let mut inserts = BTreeMap::new(); for action in config.actions { match action { @@ -152,6 +163,9 @@ impl AttributesProcessor { } => { let _ = renames.insert(source_key, destination_key); } + Action::Insert { key, value } => { + let _ = inserts.insert(key, value); + } // Unsupported actions are ignored for now Action::Unsupported => {} } @@ -182,6 +196,11 @@ impl AttributesProcessor { } else { Some(DeleteTransform::new(deletes)) }, + insert: if inserts.is_empty() { + None + } else { + Some(InsertTransform::new(inserts)) + }, }; transform @@ -201,7 +220,9 @@ impl AttributesProcessor { #[inline] const fn is_noop(&self) -> bool { - self.transform.rename.is_none() && self.transform.delete.is_none() + self.transform.rename.is_none() + && self.transform.delete.is_none() + && self.transform.insert.is_none() } #[inline] @@ -746,6 +767,452 @@ mod tests { .validate(|_| async move {}); } + #[test] + fn test_insert_scoped_to_resource_only_logs() { + // Resource has 'a', scope has 'a', log has 'a' and another key to keep batch non-empty + let input = build_logs_with_attrs( + vec![ + KeyValue::new("a", AnyValue::new_string("rv")), + KeyValue::new("r", AnyValue::new_string("keep")), + ], + vec![KeyValue::new("a", AnyValue::new_string("sv"))], + vec![ + KeyValue::new("a", AnyValue::new_string("lv")), + KeyValue::new("b", AnyValue::new_string("keep")), + ], + ); + + let cfg = json!({ + "actions": [ + {"action": "insert", "key": "c", "value": "val"}, + ], + "apply_to": ["resource"] + }); + + // Create a proper pipeline context for the test + let metrics_registry_handle = TelemetryRegistryHandle::new(); + let controller_ctx = ControllerContext::new(metrics_registry_handle); + let pipeline_ctx = + controller_ctx.pipeline_context_with("grp".into(), "pipeline".into(), 0, 0); + + let node = test_node("attributes-processor-insert-resource"); + let rt: TestRuntime = TestRuntime::new(); + let mut node_config = NodeUserConfig::new_processor_config(ATTRIBUTES_PROCESSOR_URN); + node_config.config = cfg; + let proc = + create_attributes_processor(pipeline_ctx, node, Arc::new(node_config), rt.config()) + .expect("create processor"); + let phase = rt.set_processor(proc); + + phase + .run_test(|mut ctx| async move { + let mut bytes = BytesMut::new(); + input.encode(&mut bytes).expect("encode"); + let bytes = bytes.freeze(); + let pdata_in = + OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into()); + ctx.process(Message::PData(pdata_in)) + .await + .expect("process"); + + let out = ctx.drain_pdata().await; + let first = out.into_iter().next().expect("one output").payload(); + + let otlp_bytes: OtlpProtoBytes = first.try_into().expect("convert to otlp"); + let bytes = match otlp_bytes { + OtlpProtoBytes::ExportLogsRequest(b) => b, + _ => panic!("unexpected otlp variant"), + }; + let decoded = ExportLogsServiceRequest::decode(bytes.as_ref()).expect("decode"); + + let res_attrs = &decoded.resource_logs[0] + .resource + .as_ref() + .unwrap() + .attributes; + + assert!( + res_attrs + .iter() + .any(|kv| kv.key == "c" && kv.value == Some(AnyValue::new_string("val"))) + ); + assert!(res_attrs.iter().any(|kv| kv.key == "r")); + + // Scope 'c' should not be inserted; 'a' should remain + let scope_attrs = &decoded.resource_logs[0].scope_logs[0] + .scope + .as_ref() + .unwrap() + .attributes; + assert!(!scope_attrs.iter().any(|kv| kv.key == "c")); + assert!(scope_attrs.iter().any(|kv| kv.key == "a")); + + // Log 'c' should not be inserted; 'b' should remain + let log_attrs = &decoded.resource_logs[0].scope_logs[0].log_records[0].attributes; + assert!(!log_attrs.iter().any(|kv| kv.key == "c")); + assert!(log_attrs.iter().any(|kv| kv.key == "b")); + }) + .validate(|_| async move {}); + } + + #[test] + fn test_insert_int_value_via_config() { + // Test inserting an integer value via JSON configuration + let input = build_logs_with_attrs( + vec![], + vec![], + vec![KeyValue::new("existing", AnyValue::new_string("val"))], + ); + + let cfg = json!({ + "actions": [ + {"action": "insert", "key": "count", "value": 42}, + ], + "apply_to": ["signal"] + }); + + let metrics_registry_handle = TelemetryRegistryHandle::new(); + let controller_ctx = ControllerContext::new(metrics_registry_handle); + let pipeline_ctx = + controller_ctx.pipeline_context_with("grp".into(), "pipeline".into(), 0, 0); + + let node = test_node("attributes-processor-insert-int"); + let rt: TestRuntime = TestRuntime::new(); + let mut node_config = NodeUserConfig::new_processor_config(ATTRIBUTES_PROCESSOR_URN); + node_config.config = cfg; + let proc = + create_attributes_processor(pipeline_ctx, node, Arc::new(node_config), rt.config()) + .expect("create processor"); + let phase = rt.set_processor(proc); + + phase + .run_test(|mut ctx| async move { + let mut bytes = BytesMut::new(); + input.encode(&mut bytes).expect("encode"); + let bytes = bytes.freeze(); + let pdata_in = + OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into()); + ctx.process(Message::PData(pdata_in)) + .await + .expect("process"); + + let out = ctx.drain_pdata().await; + let first = out.into_iter().next().expect("one output").payload(); + + let otlp_bytes: OtlpProtoBytes = first.try_into().expect("convert to otlp"); + let bytes = match otlp_bytes { + OtlpProtoBytes::ExportLogsRequest(b) => b, + _ => panic!("unexpected otlp variant"), + }; + let decoded = ExportLogsServiceRequest::decode(bytes.as_ref()).expect("decode"); + + let log_attrs = &decoded.resource_logs[0].scope_logs[0].log_records[0].attributes; + + // Verify the int value was inserted correctly + assert!(log_attrs.iter().any(|kv| kv.key == "count")); + let count_kv = log_attrs.iter().find(|kv| kv.key == "count").unwrap(); + let inner_val = count_kv.value.as_ref().unwrap().value.as_ref().unwrap(); + match inner_val { + otap_df_pdata::proto::opentelemetry::common::v1::any_value::Value::IntValue( + i, + ) => { + assert_eq!(*i, 42); + } + _ => panic!("expected IntValue, got {:?}", inner_val), + } + assert!(log_attrs.iter().any(|kv| kv.key == "existing")); + }) + .validate(|_| async move {}); + } + + #[test] + fn test_insert_double_value_via_config() { + // Test inserting a double value via JSON configuration + let input = build_logs_with_attrs( + vec![], + vec![], + vec![KeyValue::new("existing", AnyValue::new_string("val"))], + ); + + let cfg = json!({ + "actions": [ + {"action": "insert", "key": "ratio", "value": 1.2345}, + ], + "apply_to": ["signal"] + }); + + let metrics_registry_handle = TelemetryRegistryHandle::new(); + let controller_ctx = ControllerContext::new(metrics_registry_handle); + let pipeline_ctx = + controller_ctx.pipeline_context_with("grp".into(), "pipeline".into(), 0, 0); + + let node = test_node("attributes-processor-insert-double"); + let rt: TestRuntime = TestRuntime::new(); + let mut node_config = NodeUserConfig::new_processor_config(ATTRIBUTES_PROCESSOR_URN); + node_config.config = cfg; + let proc = + create_attributes_processor(pipeline_ctx, node, Arc::new(node_config), rt.config()) + .expect("create processor"); + let phase = rt.set_processor(proc); + + phase + .run_test(|mut ctx| async move { + let mut bytes = BytesMut::new(); + input.encode(&mut bytes).expect("encode"); + let bytes = bytes.freeze(); + let pdata_in = + OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into()); + ctx.process(Message::PData(pdata_in)) + .await + .expect("process"); + + let out = ctx.drain_pdata().await; + let first = out.into_iter().next().expect("one output").payload(); + + let otlp_bytes: OtlpProtoBytes = first.try_into().expect("convert to otlp"); + let bytes = match otlp_bytes { + OtlpProtoBytes::ExportLogsRequest(b) => b, + _ => panic!("unexpected otlp variant"), + }; + let decoded = ExportLogsServiceRequest::decode(bytes.as_ref()).expect("decode"); + + let log_attrs = &decoded.resource_logs[0].scope_logs[0].log_records[0].attributes; + + // Verify the double value was inserted correctly + assert!(log_attrs.iter().any(|kv| kv.key == "ratio")); + let ratio_kv = log_attrs.iter().find(|kv| kv.key == "ratio").unwrap(); + let inner_val = ratio_kv.value.as_ref().unwrap().value.as_ref().unwrap(); + match inner_val { + otap_df_pdata::proto::opentelemetry::common::v1::any_value::Value::DoubleValue(d) => { + assert!((d - 1.2345).abs() < f64::EPSILON); + } + _ => panic!("expected DoubleValue, got {:?}", inner_val), + } + assert!(log_attrs.iter().any(|kv| kv.key == "existing")); + }) + .validate(|_| async move {}); + } + + #[test] + fn test_insert_bool_value_via_config() { + // Test inserting a boolean value via JSON configuration + let input = build_logs_with_attrs( + vec![], + vec![], + vec![KeyValue::new("existing", AnyValue::new_string("val"))], + ); + + let cfg = json!({ + "actions": [ + {"action": "insert", "key": "enabled", "value": true}, + ], + "apply_to": ["signal"] + }); + + let metrics_registry_handle = TelemetryRegistryHandle::new(); + let controller_ctx = ControllerContext::new(metrics_registry_handle); + let pipeline_ctx = + controller_ctx.pipeline_context_with("grp".into(), "pipeline".into(), 0, 0); + + let node = test_node("attributes-processor-insert-bool"); + let rt: TestRuntime = TestRuntime::new(); + let mut node_config = NodeUserConfig::new_processor_config(ATTRIBUTES_PROCESSOR_URN); + node_config.config = cfg; + let proc = + create_attributes_processor(pipeline_ctx, node, Arc::new(node_config), rt.config()) + .expect("create processor"); + let phase = rt.set_processor(proc); + + phase + .run_test(|mut ctx| async move { + let mut bytes = BytesMut::new(); + input.encode(&mut bytes).expect("encode"); + let bytes = bytes.freeze(); + let pdata_in = + OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into()); + ctx.process(Message::PData(pdata_in)) + .await + .expect("process"); + + let out = ctx.drain_pdata().await; + let first = out.into_iter().next().expect("one output").payload(); + + let otlp_bytes: OtlpProtoBytes = first.try_into().expect("convert to otlp"); + let bytes = match otlp_bytes { + OtlpProtoBytes::ExportLogsRequest(b) => b, + _ => panic!("unexpected otlp variant"), + }; + let decoded = ExportLogsServiceRequest::decode(bytes.as_ref()).expect("decode"); + + let log_attrs = &decoded.resource_logs[0].scope_logs[0].log_records[0].attributes; + + // Verify the bool value was inserted correctly + assert!(log_attrs.iter().any(|kv| kv.key == "enabled")); + let enabled_kv = log_attrs.iter().find(|kv| kv.key == "enabled").unwrap(); + let inner_val = enabled_kv.value.as_ref().unwrap().value.as_ref().unwrap(); + match inner_val { + otap_df_pdata::proto::opentelemetry::common::v1::any_value::Value::BoolValue(b) => { + assert!(*b); + } + _ => panic!("expected BoolValue, got {:?}", inner_val), + } + assert!(log_attrs.iter().any(|kv| kv.key == "existing")); + }) + .validate(|_| async move {}); + } + + #[test] + fn test_insert_does_not_overwrite_existing() { + // Verify that insert action does NOT overwrite existing key (per OTel spec) + let input = build_logs_with_attrs( + vec![], + vec![], + vec![KeyValue::new( + "target_key", + AnyValue::new_string("original_value"), + )], + ); + + let cfg = json!({ + "actions": [ + {"action": "insert", "key": "target_key", "value": "new_value"}, + ], + "apply_to": ["signal"] + }); + + let metrics_registry_handle = TelemetryRegistryHandle::new(); + let controller_ctx = ControllerContext::new(metrics_registry_handle); + let pipeline_ctx = + controller_ctx.pipeline_context_with("grp".into(), "pipeline".into(), 0, 0); + + let node = test_node("attributes-processor-insert-no-overwrite"); + let rt: TestRuntime = TestRuntime::new(); + let mut node_config = NodeUserConfig::new_processor_config(ATTRIBUTES_PROCESSOR_URN); + node_config.config = cfg; + let proc = + create_attributes_processor(pipeline_ctx, node, Arc::new(node_config), rt.config()) + .expect("create processor"); + let phase = rt.set_processor(proc); + + phase + .run_test(|mut ctx| async move { + let mut bytes = BytesMut::new(); + input.encode(&mut bytes).expect("encode"); + let bytes = bytes.freeze(); + let pdata_in = + OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into()); + ctx.process(Message::PData(pdata_in)) + .await + .expect("process"); + + let out = ctx.drain_pdata().await; + let first = out.into_iter().next().expect("one output").payload(); + + let otlp_bytes: OtlpProtoBytes = first.try_into().expect("convert to otlp"); + let bytes = match otlp_bytes { + OtlpProtoBytes::ExportLogsRequest(b) => b, + _ => panic!("unexpected otlp variant"), + }; + let decoded = ExportLogsServiceRequest::decode(bytes.as_ref()).expect("decode"); + + let log_attrs = &decoded.resource_logs[0].scope_logs[0].log_records[0].attributes; + + // Should have exactly one target_key with original value + let matching_attrs: Vec<_> = log_attrs + .iter() + .filter(|kv| kv.key == "target_key") + .collect(); + assert_eq!(matching_attrs.len(), 1); + + // Value should be original, not overwritten + let value = matching_attrs[0] + .value + .as_ref() + .expect("value") + .value + .as_ref() + .expect("inner value"); + match value { + otap_df_pdata::proto::opentelemetry::common::v1::any_value::Value::StringValue( + s, + ) => { + assert_eq!(s, "original_value"); + } + _ => panic!("expected string value"), + } + }) + .validate(|_| async move {}); + } + + #[test] + fn test_insert_multiple_keys_mixed_existing() { + // Test inserting multiple keys where some exist and some don't + let input = build_logs_with_attrs( + vec![], + vec![], + vec![ + KeyValue::new("existing_a", AnyValue::new_string("val_a")), + KeyValue::new("existing_b", AnyValue::new_string("val_b")), + ], + ); + + let cfg = json!({ + "actions": [ + {"action": "insert", "key": "existing_a", "value": "should_not_replace"}, + {"action": "insert", "key": "new_key", "value": "new_value"}, + ], + "apply_to": ["signal"] + }); + + let metrics_registry_handle = TelemetryRegistryHandle::new(); + let controller_ctx = ControllerContext::new(metrics_registry_handle); + let pipeline_ctx = + controller_ctx.pipeline_context_with("grp".into(), "pipeline".into(), 0, 0); + + let node = test_node("attributes-processor-insert-mixed"); + let rt: TestRuntime = TestRuntime::new(); + let mut node_config = NodeUserConfig::new_processor_config(ATTRIBUTES_PROCESSOR_URN); + node_config.config = cfg; + let proc = + create_attributes_processor(pipeline_ctx, node, Arc::new(node_config), rt.config()) + .expect("create processor"); + let phase = rt.set_processor(proc); + + phase + .run_test(|mut ctx| async move { + let mut bytes = BytesMut::new(); + input.encode(&mut bytes).expect("encode"); + let bytes = bytes.freeze(); + let pdata_in = + OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into()); + ctx.process(Message::PData(pdata_in)) + .await + .expect("process"); + + let out = ctx.drain_pdata().await; + let first = out.into_iter().next().expect("one output").payload(); + + let otlp_bytes: OtlpProtoBytes = first.try_into().expect("convert to otlp"); + let bytes = match otlp_bytes { + OtlpProtoBytes::ExportLogsRequest(b) => b, + _ => panic!("unexpected otlp variant"), + }; + let decoded = ExportLogsServiceRequest::decode(bytes.as_ref()).expect("decode"); + + let log_attrs = &decoded.resource_logs[0].scope_logs[0].log_records[0].attributes; + + // Should have 3 keys: existing_a (unchanged), existing_b, new_key + assert_eq!(log_attrs.len(), 3); + assert!(log_attrs.iter().any(|kv| kv.key == "existing_a")); + assert!(log_attrs.iter().any(|kv| kv.key == "existing_b")); + assert!( + log_attrs.iter().any(|kv| kv.key == "new_key" + && kv.value == Some(AnyValue::new_string("new_value"))) + ); + }) + .validate(|_| async move {}); + } + #[test] fn test_delete_scoped_to_resource_only_logs() { // Resource has 'a', scope has 'a', log has 'a' and another key to keep batch non-empty diff --git a/rust/otap-dataflow/crates/pdata/src/otap/transform.rs b/rust/otap-dataflow/crates/pdata/src/otap/transform.rs index 2a31e19ead..73436fe5f1 100644 --- a/rust/otap-dataflow/crates/pdata/src/otap/transform.rs +++ b/rust/otap-dataflow/crates/pdata/src/otap/transform.rs @@ -13,12 +13,18 @@ use arrow::array::{ use arrow::buffer::{Buffer, MutableBuffer, NullBuffer, OffsetBuffer, ScalarBuffer}; use arrow::compute::kernels::cmp::eq; use arrow::compute::{SortColumn, and, concat}; -use arrow::datatypes::{ArrowDictionaryKeyType, ArrowNativeType, DataType, UInt8Type, UInt16Type}; +use arrow::datatypes::{ + ArrowDictionaryKeyType, ArrowNativeType, DataType, Field, UInt8Type, UInt16Type, +}; use arrow::row::{RowConverter, SortField}; use crate::arrays::{ MaybeDictArrayAccessor, NullableArrayAccessor, get_required_array, get_u8_array, }; +use crate::encode::record::array::{ + ArrayAppend, ArrayAppendNulls, ArrayAppendStr, ArrayOptions, Float64ArrayBuilder, + Int64ArrayBuilder, StringArrayBuilder, dictionary::DictionaryOptions, +}; use crate::error::{Error, Result}; use crate::otlp::attributes::{AttributeValueType, parent_id::ParentId}; use crate::schema::consts::{self, metadata}; @@ -480,6 +486,26 @@ fn create_next_eq_array_for_array(arr: T) -> BooleanArray { eq(&lhs, &rhs).expect("should be able to compare slice with offset of 1") } +#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] +#[serde(untagged)] +pub enum LiteralValue { + Bool(bool), + Int(i64), + Double(f64), + Str(String), +} + +pub struct InsertTransform { + pub(super) entries: BTreeMap, +} + +impl InsertTransform { + #[must_use] + pub fn new(entries: BTreeMap) -> Self { + Self { entries } + } +} + pub struct RenameTransform { pub(super) map: BTreeMap, pub(super) target_bytes: Vec>, @@ -530,6 +556,9 @@ pub struct AttributesTransform { // rows with attribute names in this set will be deleted from the attribute record batch pub delete: Option, + + // rows that will be inserted into the attribute record batch + pub insert: Option, } impl AttributesTransform { @@ -547,6 +576,13 @@ impl AttributesTransform { self } + /// Set the insert transform + #[must_use] + pub fn with_insert(mut self, insert: InsertTransform) -> Self { + self.insert = Some(insert); + self + } + /// Validates the attribute transform operation. The current rule is that no key can be /// duplicated in any of the passed values. This is done to avoid any ambiguity about how /// to apply the transformation. For example, if the following passed @@ -585,6 +621,16 @@ impl AttributesTransform { } } + if let Some(insert) = &self.insert { + for key in insert.entries.keys() { + if !all_keys.insert(key) { + return Err(Error::InvalidAttributeTransform { + reason: format!("Duplicate key in insert: {key}"), + }); + } + } + } + Ok(()) } } @@ -609,6 +655,8 @@ pub struct TransformStats { pub deleted_entries: u64, /// Exact number of attribute entries whose key was renamed pub renamed_entries: u64, + /// Exact number of attribute entries added by insert rules + pub inserted_entries: u64, } /// Apply an [`AttributesTransform`] to an attributes [`RecordBatch`] and return both the @@ -641,7 +689,21 @@ pub fn transform_attributes_with_stats( name: consts::ATTRIBUTE_KEY.into(), })?; - match schema.field(key_column_idx).data_type() { + // Check if we need early materialization for insert + // We only need to materialize if insert is present AND we have parent_id column + // This allows us to get the full list of parents before any deletes are applied + let insert_needed = + transform.insert.is_some() && schema.column_with_name(consts::PARENT_ID).is_some(); + let attrs_record_batch_cow = if insert_needed { + let rb = materialize_parent_id_for_attributes_auto(attrs_record_batch)?; + Cow::Owned(rb) + } else { + Cow::Borrowed(attrs_record_batch) + }; + let attrs_record_batch = attrs_record_batch_cow.as_ref(); + let schema = attrs_record_batch.schema(); + + let (rb, mut stats) = match schema.field(key_column_idx).data_type() { DataType::Utf8 => { let keys_arr = attrs_record_batch .column(key_column_idx) @@ -653,6 +715,7 @@ pub fn transform_attributes_with_stats( let stats = TransformStats { renamed_entries: keys_transform_result.replaced_rows as u64, deleted_entries: keys_transform_result.deleted_rows as u64, + inserted_entries: 0, }; let new_keys = Arc::new(keys_transform_result.new_keys); @@ -662,7 +725,7 @@ pub fn transform_attributes_with_stats( let should_materialize_parent_ids = schema.column_with_name(consts::PARENT_ID).is_some(); let (attrs_record_batch, schema) = if should_materialize_parent_ids { - let rb = materialize_parent_id_for_attributes::(attrs_record_batch)?; + let rb = materialize_parent_id_for_attributes_auto(attrs_record_batch)?; let schema = rb.schema(); (rb, schema) } else { @@ -691,7 +754,7 @@ pub fn transform_attributes_with_stats( // safety: this should only return an error if our schema, or column lengths don't match let rb = RecordBatch::try_new(schema, columns) .expect("can build record batch with same schema and columns"); - Ok((rb, stats)) + (rb, stats) } DataType::Dictionary(k, _) => { let (new_dict, keep_ranges, stats) = match *k.clone() { @@ -711,6 +774,7 @@ pub fn transform_attributes_with_stats( TransformStats { deleted_entries: dict_imm_result.deleted_rows as u64, renamed_entries: dict_imm_result.renamed_rows as u64, + inserted_entries: 0, }, ) } @@ -730,6 +794,7 @@ pub fn transform_attributes_with_stats( TransformStats { deleted_entries: dict_imm_result.deleted_rows as u64, renamed_entries: dict_imm_result.renamed_rows as u64, + inserted_entries: 0, }, ) } @@ -742,21 +807,23 @@ pub fn transform_attributes_with_stats( }; // if all rows have been removed (all attributes deleted) just return empty RecordBatch - if keep_ranges.as_ref().map(Vec::len) == Some(0) { - return Ok((RecordBatch::new_empty(schema), stats)); - } - - // Materialize the parent ID column if it exists. Renames/replacements can change - // logical adjacency of (type,key,value) runs, which can invalidate quasi-delta decoding - // even when no rows are deleted. - let should_materialize_parent_ids = - schema.column_with_name(consts::PARENT_ID).is_some(); - let (attrs_record_batch, schema) = if should_materialize_parent_ids { - let rb = materialize_parent_id_for_attributes::(attrs_record_batch)?; - let schema = rb.schema(); - (rb, schema) + // NOTE: If we have inserts, we might still return rows even if all existing were deleted. + // But here we just produce the "remaining" batch. Inserts are appended later. + let (attrs_record_batch, schema) = if keep_ranges.as_ref().map(Vec::len) == Some(0) { + (RecordBatch::new_empty(schema.clone()), schema.clone()) } else { - (attrs_record_batch.clone(), schema) + // Materialize the parent ID column if it exists. Renames/replacements can change + // logical adjacency of (type,key,value) runs, which can invalidate quasi-delta decoding + // even when no rows are deleted. + let should_materialize_parent_ids = + schema.column_with_name(consts::PARENT_ID).is_some(); + if should_materialize_parent_ids { + let rb = materialize_parent_id_for_attributes_auto(attrs_record_batch)?; + let schema = rb.schema(); + (rb, schema) + } else { + (attrs_record_batch.clone(), schema) + } }; // TODO if there are any optional columns that now contain only null or default values, @@ -781,17 +848,83 @@ pub fn transform_attributes_with_stats( // safety: this should only return an error if our schema, or column lengths don't match let rb = RecordBatch::try_new(schema, columns) .expect("can build record batch with same schema and columns"); - Ok((rb, stats)) + (rb, stats) + } + data_type => { + return Err(Error::InvalidListArray { + expect_oneof: vec![ + DataType::Utf8, + DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)), + DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)), + ], + actual: data_type.clone(), + }); + } + }; + + // Handle inserts + // According to OTel collector spec, `insert` only inserts if the key does not already exist. + // We need: + // - Original parent IDs from attrs_record_batch to know which parents exist + // - The transformed batch (rb) to check which (parent_id, key) pairs already exist + if let Some(insert) = &transform.insert { + if let Some(original_parent_ids) = attrs_record_batch.column_by_name(consts::PARENT_ID) { + // Determine which value type columns we need based on what's being inserted + let needs_int = insert + .entries + .iter() + .any(|(_, v)| matches!(v, LiteralValue::Int(_))); + let needs_double = insert + .entries + .iter() + .any(|(_, v)| matches!(v, LiteralValue::Double(_))); + let needs_bool = insert + .entries + .iter() + .any(|(_, v)| matches!(v, LiteralValue::Bool(_))); + let needs_str = insert + .entries + .iter() + .any(|(_, v)| matches!(v, LiteralValue::Str(_))); + + // Extend schema and batch to include missing value columns + let (rb_extended, extended_schema) = + extend_schema_for_inserts(&rb, needs_str, needs_int, needs_double, needs_bool)?; + + let (new_rows, count) = match get_parent_id_value_type(original_parent_ids)? { + DataType::UInt16 => create_inserted_batch::( + &rb_extended, + original_parent_ids, + insert, + extended_schema.as_ref(), + )?, + DataType::UInt32 => create_inserted_batch::( + &rb_extended, + original_parent_ids, + insert, + extended_schema.as_ref(), + )?, + data_type => { + return Err(Error::ColumnDataTypeMismatch { + name: consts::PARENT_ID.into(), + expect: DataType::UInt16, // or UInt32 + actual: data_type, + }); + } + }; + if count > 0 { + let combined = + arrow::compute::concat_batches(&extended_schema, &[rb_extended, new_rows]) + .map_err(|e| Error::Format { + error: e.to_string(), + })?; + stats.inserted_entries = count as u64; + return Ok((combined, stats)); + } } - data_type => Err(Error::InvalidListArray { - expect_oneof: vec![ - DataType::Utf8, - DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)), - DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)), - ], - actual: data_type.clone(), - }), } + + Ok((rb, stats)) } /// This function is used to perform bulk transformations on OTel attributes. @@ -802,9 +935,7 @@ pub fn transform_attributes_with_stats( /// Currently the operations supported are: /// - rename which replaces a given attribute key /// - delete which removes all rows from the record batch for a given key -/// -/// Support for insert will be added in the future (see -/// https://github.com/open-telemetry/otel-arrow/issues/813) +/// - insert which adds new attributes to the record batch /// /// Note that to avoid any ambiguity in how the transformation is applied, this method will /// validate the transform. The caller must ensure the supplied transform is valid. See @@ -1621,6 +1752,9 @@ fn take_ranges_slice(array: T, ranges: &[(usize, usize)]) -> Result where T: Array, { + if ranges.is_empty() { + return Ok(array.slice(0, 0)); + } let slices: Vec = ranges .iter() .map(|&(start, end)| array.slice(start, end - start)) @@ -2364,6 +2498,7 @@ mod test { ( // most basic transform AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter(vec![( "b".into(), "B".into(), @@ -2378,6 +2513,7 @@ mod test { ( // test replacements at array boundaries AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter(vec![( "a".into(), "A".into(), @@ -2390,6 +2526,7 @@ mod test { ( // test replacements where replacements longer than target AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter(vec![( "a".into(), "AAA".into(), @@ -2405,6 +2542,7 @@ mod test { ( // test replacements where replacements shorter than target AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter(vec![( "aaa".into(), "a".into(), @@ -2420,6 +2558,7 @@ mod test { ( // test replacing single contiguous block of keys AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter(vec![( "a".into(), "AA".into(), @@ -2438,6 +2577,7 @@ mod test { ( // test multiple replacements AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter(vec![ ("a".into(), "AA".into()), ("dd".into(), "D".into()), @@ -2456,6 +2596,7 @@ mod test { ( // test multiple replacements interleaved AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter(vec![ ("a".into(), "AA".into()), ("dd".into(), "D".into()), @@ -2474,6 +2615,7 @@ mod test { ( // test deletion at array boundaries without replaces AttributesTransform { + insert: None, rename: None, delete: Some(DeleteTransform::new(BTreeSet::from_iter(vec!["a".into()]))), }, @@ -2483,6 +2625,7 @@ mod test { ( // test delete contiguous segment AttributesTransform { + insert: None, rename: None, delete: Some(DeleteTransform::new(BTreeSet::from_iter(vec!["a".into()]))), }, @@ -2495,6 +2638,7 @@ mod test { ( // test multiple deletes AttributesTransform { + insert: None, rename: None, delete: Some(DeleteTransform::new(BTreeSet::from_iter(vec![ "a".into(), @@ -2510,6 +2654,7 @@ mod test { ( // test adjacent replacement and delete AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter(vec![( "a".into(), "AAA".into(), @@ -2522,6 +2667,7 @@ mod test { ( // test we handle an empty rename AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter(vec![]))), delete: Some(DeleteTransform::new(BTreeSet::from_iter(vec!["b".into()]))), }, @@ -2531,6 +2677,7 @@ mod test { ( // test we handle an empty delete AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter(vec![( "a".into(), "AAAA".into(), @@ -2703,6 +2850,7 @@ mod test { let result = transform_attributes( &record_batch, &AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter(vec![( "k2".into(), "K2".into(), @@ -2860,6 +3008,7 @@ mod test { let result = transform_attributes( &input, &AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter(vec![( "b".into(), "B".into(), @@ -2896,6 +3045,7 @@ mod test { ( // basic dict transform AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter([( "a".into(), "AA".into(), @@ -2937,6 +3087,7 @@ mod test { ( // test with some nulls AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter([( "a".into(), "AA".into(), @@ -3004,6 +3155,7 @@ mod test { // test if there's nulls in the dict keys. This would be unusual // but technically it's possible AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter([( "a".into(), "AA".into(), @@ -3103,6 +3255,7 @@ mod test { let result = transform_attributes( &input, &AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter([( "c".into(), "CCCCC".into(), @@ -3146,6 +3299,7 @@ mod test { let result = transform_attributes( &input, &AttributesTransform { + insert: None, rename: None, delete: Some(DeleteTransform::new(["a".into()].into_iter().collect())), }, @@ -3181,6 +3335,7 @@ mod test { let result = transform_attributes( &input, &AttributesTransform { + insert: None, rename: None, delete: Some(DeleteTransform::new(["a".into()].into_iter().collect())), }, @@ -3264,6 +3419,7 @@ mod test { let result = transform_attributes( &input, &AttributesTransform { + insert: None, rename: None, delete: Some(DeleteTransform::new(BTreeSet::from_iter(["b".into()]))), }, @@ -3341,6 +3497,7 @@ mod test { let result = transform_attributes( &input, &AttributesTransform { + insert: None, rename: None, delete: Some(DeleteTransform::new(BTreeSet::from_iter(["b".into()]))), }, @@ -3407,6 +3564,7 @@ mod test { let result = transform_attributes( &input, &AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter([( "k2".into(), "k1".into(), @@ -3485,6 +3643,7 @@ mod test { let result = transform_attributes( &input, &AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter([( "k2".into(), "k1".into(), @@ -3503,6 +3662,7 @@ mod test { fn test_invalid_attributes_transforms() { let test_cases = vec![ AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter([( "b".into(), "b".into(), @@ -3510,6 +3670,7 @@ mod test { delete: None, }, AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter([ ("b".into(), "b".into()), ("a".into(), "b".into()), @@ -3517,6 +3678,7 @@ mod test { delete: None, }, AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter([( "b".into(), "a".into(), @@ -3524,6 +3686,7 @@ mod test { delete: Some(DeleteTransform::new(BTreeSet::from_iter(vec!["b".into()]))), }, AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter([( "b".into(), "a".into(), @@ -3564,6 +3727,7 @@ mod test { .unwrap(); let tx = AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter([( String::from("a"), String::from("A"), @@ -3606,6 +3770,7 @@ mod test { .unwrap(); let tx = AttributesTransform { + insert: None, rename: Some(RenameTransform::new(BTreeMap::from_iter([( String::from("a"), String::from("A"), @@ -3623,3 +3788,1181 @@ mod test { assert_eq!(with_stats, plain); } } + +/// Extend a RecordBatch's schema to include missing value columns needed for inserts. +/// Returns the extended batch and schema. If no columns need to be added, returns a clone of +/// the original batch with its schema. +fn extend_schema_for_inserts( + batch: &RecordBatch, + needs_str: bool, + needs_int: bool, + needs_double: bool, + needs_bool: bool, +) -> Result<(RecordBatch, Arc)> { + let schema = batch.schema(); + let num_rows = batch.num_rows(); + + // Check which columns already exist + let has_str = schema.column_with_name(consts::ATTRIBUTE_STR).is_some(); + let has_int = schema.column_with_name(consts::ATTRIBUTE_INT).is_some(); + let has_double = schema.column_with_name(consts::ATTRIBUTE_DOUBLE).is_some(); + let has_bool = schema.column_with_name(consts::ATTRIBUTE_BOOL).is_some(); + + // If all needed columns exist, return unchanged + if (!needs_str || has_str) + && (!needs_int || has_int) + && (!needs_double || has_double) + && (!needs_bool || has_bool) + { + return Ok((batch.clone(), schema)); + } + + // Build new schema with missing columns + let mut new_fields: Vec = schema.fields().iter().map(|f| f.as_ref().clone()).collect(); + let mut new_columns: Vec = batch.columns().to_vec(); + + // Add missing columns with null arrays + if needs_str && !has_str { + new_fields.push(Field::new( + consts::ATTRIBUTE_STR, + DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)), + true, + )); + new_columns.push(arrow::array::new_null_array( + &DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)), + num_rows, + )); + } + + if needs_int && !has_int { + new_fields.push(Field::new(consts::ATTRIBUTE_INT, DataType::Int64, true)); + new_columns.push(arrow::array::new_null_array(&DataType::Int64, num_rows)); + } + + if needs_double && !has_double { + new_fields.push(Field::new( + consts::ATTRIBUTE_DOUBLE, + DataType::Float64, + true, + )); + new_columns.push(arrow::array::new_null_array(&DataType::Float64, num_rows)); + } + + if needs_bool && !has_bool { + new_fields.push(Field::new(consts::ATTRIBUTE_BOOL, DataType::Boolean, true)); + new_columns.push(arrow::array::new_null_array(&DataType::Boolean, num_rows)); + } + + let new_schema = Arc::new(arrow::datatypes::Schema::new(new_fields)); + let new_batch = + RecordBatch::try_new(new_schema.clone(), new_columns).map_err(|e| Error::Format { + error: e.to_string(), + })?; + + Ok((new_batch, new_schema)) +} + +/// Get the value type for a parent ID column, handling both primitive and dictionary-encoded arrays. +/// Returns the underlying primitive type (UInt16 or UInt32). +fn get_parent_id_value_type(arr: &ArrayRef) -> Result { + match arr.data_type() { + DataType::UInt16 | DataType::UInt32 => Ok(arr.data_type().clone()), + DataType::Dictionary(_, v) => match **v { + DataType::UInt16 | DataType::UInt32 => Ok((**v).clone()), + _ => Err(Error::UnsupportedDictionaryValueType { + expect_oneof: vec![DataType::UInt16, DataType::UInt32], + actual: (**v).clone(), + }), + }, + _ => Err(Error::ColumnDataTypeMismatch { + name: consts::PARENT_ID.into(), + expect: DataType::UInt16, // or UInt32 + actual: arr.data_type().clone(), + }), + } +} + +/// Get the value type for a parent ID column from a schema field, handling both primitive and +/// dictionary-encoded types. +/// Returns the underlying primitive type (UInt16 or UInt32). +fn get_parent_id_value_type_from_schema( + schema: &arrow::datatypes::Schema, +) -> Result> { + let Some((_, field)) = schema.column_with_name(consts::PARENT_ID) else { + return Ok(None); + }; + match field.data_type() { + DataType::UInt16 | DataType::UInt32 => Ok(Some(field.data_type().clone())), + DataType::Dictionary(_, v) => match **v { + DataType::UInt16 | DataType::UInt32 => Ok(Some((**v).clone())), + _ => Err(Error::UnsupportedDictionaryValueType { + expect_oneof: vec![DataType::UInt16, DataType::UInt32], + actual: (**v).clone(), + }), + }, + _ => Err(Error::ColumnDataTypeMismatch { + name: consts::PARENT_ID.into(), + expect: DataType::UInt16, // or UInt32 + actual: field.data_type().clone(), + }), + } +} + +/// Materialize parent IDs with automatic type dispatch based on schema. +/// Returns the materialized batch or the original batch if no parent_id column exists. +fn materialize_parent_id_for_attributes_auto(record_batch: &RecordBatch) -> Result { + let schema = record_batch.schema(); + match get_parent_id_value_type_from_schema(&schema)? { + Some(DataType::UInt16) => materialize_parent_id_for_attributes::(record_batch), + Some(DataType::UInt32) => materialize_parent_id_for_attributes::(record_batch), + Some(other) => Err(Error::ColumnDataTypeMismatch { + name: consts::PARENT_ID.into(), + expect: DataType::UInt16, // or UInt32 + actual: other, + }), + None => Ok(record_batch.clone()), + } +} + +/// Returns `ArrayOptions` configured to match the given `DataType`. +/// For dictionary types, configures the appropriate dictionary options. +/// For native types (Utf8, Int64, Float64), returns options with no dictionary. +fn array_options_for_type(data_type: &DataType) -> ArrayOptions { + match data_type { + DataType::Dictionary(k, _) => match **k { + DataType::UInt8 => ArrayOptions { + dictionary_options: Some(DictionaryOptions::dict8()), + optional: false, + default_values_optional: false, + }, + DataType::UInt16 => ArrayOptions { + dictionary_options: Some(DictionaryOptions::dict16()), + optional: false, + default_values_optional: false, + }, + // Default to dict16 for other key types + _ => ArrayOptions { + dictionary_options: Some(DictionaryOptions::dict16()), + optional: false, + default_values_optional: false, + }, + }, + // Native types - no dictionary + _ => ArrayOptions { + dictionary_options: None, + optional: false, + default_values_optional: false, + }, + } +} + +/// Create a batch of inserted attributes. +/// According to the OTel collector spec, `insert` only inserts if the key does NOT already exist. +/// This function checks existing (parent_id, key) pairs in the current record batch and only +/// inserts new keys. +/// +/// This function is generic over `T: ParentId` to handle different parent ID types (u16, u32) +/// as well as dictionary-encoded parent IDs. +fn create_inserted_batch( + current_batch: &RecordBatch, + parent_ids: &ArrayRef, + insert: &InsertTransform, + schema: &arrow::datatypes::Schema, +) -> Result<(RecordBatch, usize)> +where + T: ParentId, + ::ArrayType: ArrowPrimitiveType, + <::ArrayType as ArrowPrimitiveType>::Native: + Ord + std::hash::Hash + Copy + Default, +{ + // Use MaybeDictArrayAccessor to handle both primitive and dictionary-encoded parent IDs + let parent_ids_accessor = + MaybeDictArrayAccessor::>::try_new(parent_ids)?; + + // Build a set of (parent_id, key) pairs that already exist using StringArrayAccessor + let key_accessor = current_batch + .column_by_name(consts::ATTRIBUTE_KEY) + .map(MaybeDictArrayAccessor::::try_new) + .transpose()?; + + let mut existing_keys: BTreeMap< + <::ArrayType as ArrowPrimitiveType>::Native, + BTreeSet, + > = BTreeMap::new(); + if let Some(ref accessor) = key_accessor { + for i in 0..current_batch.num_rows() { + if let Some(parent) = parent_ids_accessor.value_at(i) { + if let Some(key) = accessor.str_at(i) { + let _ = existing_keys + .entry(parent) + .or_default() + .insert(key.to_string()); + } + } + } + } + + // Get unique parents + let mut unique_parents = BTreeSet::new(); + for i in 0..parent_ids_accessor.len() { + if let Some(parent) = parent_ids_accessor.value_at(i) { + let _ = unique_parents.insert(parent); + } + } + + if unique_parents.is_empty() { + return Ok((RecordBatch::new_empty(Arc::new(schema.clone())), 0)); + } + + // Compute which (parent, key, value) tuples to actually insert + // Only insert if the key doesn't already exist for that parent + let mut to_insert: Vec<( + <::ArrayType as ArrowPrimitiveType>::Native, + &str, + &LiteralValue, + )> = Vec::new(); + for &parent in &unique_parents { + let parent_existing = existing_keys.get(&parent); + for (key, val) in insert.entries.iter() { + let key_exists = parent_existing + .map(|keys| keys.contains(key)) + .unwrap_or(false); + if !key_exists { + to_insert.push((parent, key.as_str(), val)); + } + } + } + + let total_rows = to_insert.len(); + if total_rows == 0 { + return Ok((RecordBatch::new_empty(Arc::new(schema.clone())), 0)); + } + + // Build Parent ID column using the same primitive type + let mut new_parent_ids = PrimitiveBuilder::::with_capacity(total_rows); + for (parent, _, _) in &to_insert { + new_parent_ids.append_value(*parent); + } + let new_parent_ids = Arc::new(new_parent_ids.finish()) as ArrayRef; + + // Build Attribute Type column + let mut new_types = PrimitiveBuilder::::with_capacity(total_rows); + for (_, _, val) in &to_insert { + let type_val = match val { + LiteralValue::Str(_) => AttributeValueType::Str, + LiteralValue::Int(_) => AttributeValueType::Int, + LiteralValue::Double(_) => AttributeValueType::Double, + LiteralValue::Bool(_) => AttributeValueType::Bool, + }; + new_types.append_value(type_val as u8); + } + let new_types = Arc::new(new_types.finish()) as ArrayRef; + + // Build Key column using StringArrayBuilder + let key_col_idx = + schema + .index_of(consts::ATTRIBUTE_KEY) + .map_err(|_| Error::ColumnNotFound { + name: consts::ATTRIBUTE_KEY.into(), + })?; + let key_type = schema.field(key_col_idx).data_type(); + let key_options = array_options_for_type(key_type); + + let mut key_builder = StringArrayBuilder::new(key_options); + for (_, key, _) in &to_insert { + key_builder.append_str(key); + } + let new_keys = key_builder + .finish() + .expect("key builder should produce array since optional=false"); + + // We collect columns into a vec matching schema order. + let mut columns = Vec::with_capacity(schema.fields().len()); + + for field in schema.fields() { + let name = field.name(); + let col: ArrayRef = if name == consts::PARENT_ID { + new_parent_ids.clone() + } else if name == consts::ATTRIBUTE_TYPE { + new_types.clone() + } else if name == consts::ATTRIBUTE_KEY { + new_keys.clone() + } else if name == consts::ATTRIBUTE_STR { + let options = array_options_for_type(field.data_type()); + let mut builder = StringArrayBuilder::new(options); + for (_, _, val) in &to_insert { + if let LiteralValue::Str(s) = val { + builder.append_str(s); + } else { + builder.append_null(); + } + } + builder + .finish() + .expect("str builder should produce array since optional=false") + } else if name == consts::ATTRIBUTE_INT { + let options = array_options_for_type(field.data_type()); + let mut builder = Int64ArrayBuilder::new(options); + for (_, _, val) in &to_insert { + if let LiteralValue::Int(v) = val { + builder.append_value(v); + } else { + builder.append_null(); + } + } + builder + .finish() + .expect("int builder should produce array since optional=false") + } else if name == consts::ATTRIBUTE_DOUBLE { + let options = array_options_for_type(field.data_type()); + let mut builder = Float64ArrayBuilder::new(options); + for (_, _, val) in &to_insert { + if let LiteralValue::Double(v) = val { + builder.append_value(v); + } else { + builder.append_null(); + } + } + builder + .finish() + .expect("double builder should produce array since optional=false") + } else if name == consts::ATTRIBUTE_BOOL { + // Note: Boolean Dictionaries are not standard/supported by simple builders + let mut builder = arrow::array::BooleanBuilder::with_capacity(total_rows); + for (_, _, val) in &to_insert { + if let LiteralValue::Bool(v) = val { + builder.append_value(*v); + } else { + builder.append_null(); + } + } + Arc::new(builder.finish()) + } else { + // Fill with nulls + arrow::array::new_null_array(field.data_type(), total_rows) + }; + columns.push(col); + } + + Ok(( + RecordBatch::try_new(Arc::new(schema.clone()), columns).expect("schema check"), + total_rows, + )) +} + +#[cfg(test)] +mod insert_tests { + use super::*; + use crate::schema::consts; + use arrow::array::*; + use arrow::datatypes::*; + use std::sync::Arc; + + #[test] + fn test_insert_attributes_simple() { + // Schema + let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new(consts::ATTRIBUTE_KEY, DataType::Utf8, false), + Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), + ])); + + let input = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt16Array::from_iter_values(vec![0, 1])), + Arc::new(UInt8Array::from_iter_values(vec![ + AttributeValueType::Str as u8, + AttributeValueType::Str as u8, + ])), + Arc::new(StringArray::from_iter_values(vec!["k1", "k2"])), + Arc::new(StringArray::from_iter_values(vec!["v1", "v2"])), + ], + ) + .unwrap(); + + // Transform: insert "env"="prod" + let tx = AttributesTransform { + rename: None, + delete: None, + insert: Some(InsertTransform::new(BTreeMap::from([( + "env".into(), + LiteralValue::Str("prod".into()), + )]))), + }; + + let (result, stats) = + transform_attributes_with_stats(&input, &tx).expect("transform failed"); + + assert_eq!(stats.inserted_entries, 2); + + // Expected: 4 rows + assert_eq!(result.num_rows(), 4); + + let keys = result + .column_by_name(consts::ATTRIBUTE_KEY) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + // The original rows come first, then inserted rows. + // Original: k1, k2. Inserted: env, env. + let k0 = keys.value(0); + let k1 = keys.value(1); + let k2 = keys.value(2); + let k3 = keys.value(3); + + assert!(k0 == "k1" || k0 == "k2"); + assert!(k1 == "k1" || k1 == "k2"); + assert_eq!(k2, "env"); + assert_eq!(k3, "env"); + + // Check values + let vals = result + .column_by_name(consts::ATTRIBUTE_STR) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let v2 = vals.value(2); + let v3 = vals.value(3); + assert_eq!(v2, "prod"); + assert_eq!(v3, "prod"); + } + + #[test] + fn test_insert_attributes_with_delete() { + // Schema + let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new(consts::ATTRIBUTE_KEY, DataType::Utf8, false), + Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), + ])); + + // Input: parent 0 has "del_me". + let input = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt16Array::from_iter_values(vec![0])), + Arc::new(UInt8Array::from_iter_values(vec![ + AttributeValueType::Str as u8, + ])), + Arc::new(StringArray::from_iter_values(vec!["del_me"])), + Arc::new(StringArray::from_iter_values(vec!["val"])), + ], + ) + .unwrap(); + + let tx = AttributesTransform { + rename: None, + delete: Some(DeleteTransform::new(BTreeSet::from_iter(vec![ + "del_me".into(), + ]))), + insert: Some(InsertTransform::new(BTreeMap::from([( + "new".into(), + LiteralValue::Str("val".into()), + )]))), + }; + + let (result, stats) = transform_attributes_with_stats(&input, &tx).unwrap(); + + assert_eq!(stats.deleted_entries, 1); + assert_eq!(stats.inserted_entries, 1); + + // Result should contain 1 row: "new"="val" for parent 0. + assert_eq!(result.num_rows(), 1); + let keys = result + .column_by_name(consts::ATTRIBUTE_KEY) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(keys.value(0), "new"); + } + + #[test] + fn test_insert_does_not_overwrite_existing_key() { + // According to OTel collector spec, `insert` only inserts if the key does not already exist. + // This test verifies that existing keys are NOT overwritten. + let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new(consts::ATTRIBUTE_KEY, DataType::Utf8, false), + Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), + ])); + + // Input: parent 0 has "existing_key"="original_value" + let input = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt16Array::from_iter_values(vec![0])), + Arc::new(UInt8Array::from_iter_values(vec![ + AttributeValueType::Str as u8, + ])), + Arc::new(StringArray::from_iter_values(vec!["existing_key"])), + Arc::new(StringArray::from_iter_values(vec!["original_value"])), + ], + ) + .unwrap(); + + // Try to insert "existing_key"="new_value" - should be skipped because key exists + let tx = AttributesTransform { + rename: None, + delete: None, + insert: Some(InsertTransform::new(BTreeMap::from([( + "existing_key".into(), + LiteralValue::Str("new_value".into()), + )]))), + }; + + let (result, stats) = transform_attributes_with_stats(&input, &tx).unwrap(); + + // No inserts should happen because the key already exists + assert_eq!(stats.inserted_entries, 0); + + // Result should still contain 1 row with the original value + assert_eq!(result.num_rows(), 1); + let keys = result + .column_by_name(consts::ATTRIBUTE_KEY) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(keys.value(0), "existing_key"); + + let vals = result + .column_by_name(consts::ATTRIBUTE_STR) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(vals.value(0), "original_value"); + } + + #[test] + fn test_insert_mixed_existing_and_new_keys() { + // Test: insert multiple keys where some exist and some don't + let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new(consts::ATTRIBUTE_KEY, DataType::Utf8, false), + Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), + ])); + + // Input: parent 0 has "a"="av", parent 1 has "b"="bv" + let input = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt16Array::from_iter_values(vec![0, 1])), + Arc::new(UInt8Array::from_iter_values(vec![ + AttributeValueType::Str as u8, + AttributeValueType::Str as u8, + ])), + Arc::new(StringArray::from_iter_values(vec!["a", "b"])), + Arc::new(StringArray::from_iter_values(vec!["av", "bv"])), + ], + ) + .unwrap(); + + // Try to insert: + // - "a"="new_a" - should be skipped for parent 0 (exists), inserted for parent 1 + // - "c"="cv" - should be inserted for both parents + let tx = AttributesTransform { + rename: None, + delete: None, + insert: Some(InsertTransform::new(BTreeMap::from([ + ("a".into(), LiteralValue::Str("new_a".into())), + ("c".into(), LiteralValue::Str("cv".into())), + ]))), + }; + + let (result, stats) = transform_attributes_with_stats(&input, &tx).unwrap(); + + // Should insert: + // - parent 0: "c" (not "a" because it exists) + // - parent 1: "a" and "c" (neither exists) + // Total: 3 inserts + assert_eq!(stats.inserted_entries, 3); + + // Result should have: 2 original + 3 new = 5 rows + assert_eq!(result.num_rows(), 5); + } + + #[test] + fn test_insert_int_value() { + // Test inserting Int values + let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new(consts::ATTRIBUTE_KEY, DataType::Utf8, false), + Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), + Field::new(consts::ATTRIBUTE_INT, DataType::Int64, true), + Field::new(consts::ATTRIBUTE_DOUBLE, DataType::Float64, true), + Field::new(consts::ATTRIBUTE_BOOL, DataType::Boolean, true), + ])); + + let input = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt16Array::from_iter_values(vec![0])), + Arc::new(UInt8Array::from_iter_values(vec![ + AttributeValueType::Str as u8, + ])), + Arc::new(StringArray::from_iter_values(vec!["existing"])), + Arc::new(StringArray::from_iter_values(vec!["value"])), + Arc::new(Int64Array::from(vec![None])), + Arc::new(Float64Array::from(vec![None])), + Arc::new(BooleanArray::from(vec![None])), + ], + ) + .unwrap(); + + let tx = AttributesTransform { + rename: None, + delete: None, + insert: Some(InsertTransform::new(BTreeMap::from([( + "count".into(), + LiteralValue::Int(42), + )]))), + }; + + let (result, stats) = transform_attributes_with_stats(&input, &tx).unwrap(); + + assert_eq!(stats.inserted_entries, 1); + assert_eq!(result.num_rows(), 2); + + // Check the int value was inserted + let int_col = result + .column_by_name(consts::ATTRIBUTE_INT) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + // First row (existing) should be null, second row (inserted) should be 42 + assert!(int_col.is_null(0)); + assert_eq!(int_col.value(1), 42); + } + + #[test] + fn test_insert_double_value() { + // Test inserting Double values + let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new(consts::ATTRIBUTE_KEY, DataType::Utf8, false), + Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), + Field::new(consts::ATTRIBUTE_INT, DataType::Int64, true), + Field::new(consts::ATTRIBUTE_DOUBLE, DataType::Float64, true), + Field::new(consts::ATTRIBUTE_BOOL, DataType::Boolean, true), + ])); + + let input = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt16Array::from_iter_values(vec![0])), + Arc::new(UInt8Array::from_iter_values(vec![ + AttributeValueType::Str as u8, + ])), + Arc::new(StringArray::from_iter_values(vec!["existing"])), + Arc::new(StringArray::from_iter_values(vec!["value"])), + Arc::new(Int64Array::from(vec![None])), + Arc::new(Float64Array::from(vec![None])), + Arc::new(BooleanArray::from(vec![None])), + ], + ) + .unwrap(); + + let tx = AttributesTransform { + rename: None, + delete: None, + insert: Some(InsertTransform::new(BTreeMap::from([( + "ratio".into(), + LiteralValue::Double(1.2345), + )]))), + }; + + let (result, stats) = transform_attributes_with_stats(&input, &tx).unwrap(); + + assert_eq!(stats.inserted_entries, 1); + assert_eq!(result.num_rows(), 2); + + let double_col = result + .column_by_name(consts::ATTRIBUTE_DOUBLE) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert!(double_col.is_null(0)); + assert!((double_col.value(1) - 1.2345).abs() < f64::EPSILON); + } + + #[test] + fn test_insert_bool_value() { + // Test inserting Bool values + let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new(consts::ATTRIBUTE_KEY, DataType::Utf8, false), + Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), + Field::new(consts::ATTRIBUTE_INT, DataType::Int64, true), + Field::new(consts::ATTRIBUTE_DOUBLE, DataType::Float64, true), + Field::new(consts::ATTRIBUTE_BOOL, DataType::Boolean, true), + ])); + + let input = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt16Array::from_iter_values(vec![0])), + Arc::new(UInt8Array::from_iter_values(vec![ + AttributeValueType::Str as u8, + ])), + Arc::new(StringArray::from_iter_values(vec!["existing"])), + Arc::new(StringArray::from_iter_values(vec!["value"])), + Arc::new(Int64Array::from(vec![None])), + Arc::new(Float64Array::from(vec![None])), + Arc::new(BooleanArray::from(vec![None])), + ], + ) + .unwrap(); + + let tx = AttributesTransform { + rename: None, + delete: None, + insert: Some(InsertTransform::new(BTreeMap::from([( + "enabled".into(), + LiteralValue::Bool(true), + )]))), + }; + + let (result, stats) = transform_attributes_with_stats(&input, &tx).unwrap(); + + assert_eq!(stats.inserted_entries, 1); + assert_eq!(result.num_rows(), 2); + + let bool_col = result + .column_by_name(consts::ATTRIBUTE_BOOL) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert!(bool_col.is_null(0)); + assert!(bool_col.value(1)); + } + + #[test] + fn test_insert_multiple_value_types() { + // Test inserting multiple values of different types at once + let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new(consts::ATTRIBUTE_KEY, DataType::Utf8, false), + Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), + Field::new(consts::ATTRIBUTE_INT, DataType::Int64, true), + Field::new(consts::ATTRIBUTE_DOUBLE, DataType::Float64, true), + Field::new(consts::ATTRIBUTE_BOOL, DataType::Boolean, true), + ])); + + let input = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt16Array::from_iter_values(vec![0])), + Arc::new(UInt8Array::from_iter_values(vec![ + AttributeValueType::Str as u8, + ])), + Arc::new(StringArray::from_iter_values(vec!["existing"])), + Arc::new(StringArray::from_iter_values(vec!["value"])), + Arc::new(Int64Array::from(vec![None])), + Arc::new(Float64Array::from(vec![None])), + Arc::new(BooleanArray::from(vec![None])), + ], + ) + .unwrap(); + + let tx = AttributesTransform { + rename: None, + delete: None, + insert: Some(InsertTransform::new(BTreeMap::from([ + ("str_key".into(), LiteralValue::Str("str_val".into())), + ("int_key".into(), LiteralValue::Int(100)), + ("double_key".into(), LiteralValue::Double(2.5)), + ("bool_key".into(), LiteralValue::Bool(false)), + ]))), + }; + + let (result, stats) = transform_attributes_with_stats(&input, &tx).unwrap(); + + assert_eq!(stats.inserted_entries, 4); + assert_eq!(result.num_rows(), 5); // 1 original + 4 inserted + } + + #[test] + fn test_insert_with_dictionary_encoded_keys() { + // Test inserting with dictionary-encoded key columns + let key_type = DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)); + let str_type = DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)); + let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new(consts::ATTRIBUTE_KEY, key_type.clone(), false), + Field::new(consts::ATTRIBUTE_STR, str_type.clone(), true), + ])); + + let mut key_builder = StringDictionaryBuilder::::new(); + key_builder.append_value("existing_key"); + let keys = Arc::new(key_builder.finish()); + + let mut val_builder = StringDictionaryBuilder::::new(); + val_builder.append_value("existing_value"); + let vals = Arc::new(val_builder.finish()); + + let input = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt16Array::from_iter_values(vec![0])), + Arc::new(UInt8Array::from_iter_values(vec![ + AttributeValueType::Str as u8, + ])), + keys, + vals, + ], + ) + .unwrap(); + + // Insert a new key - should work with dictionary-encoded schema + let tx = AttributesTransform { + rename: None, + delete: None, + insert: Some(InsertTransform::new(BTreeMap::from([( + "new_key".into(), + LiteralValue::Str("new_value".into()), + )]))), + }; + + let (result, stats) = transform_attributes_with_stats(&input, &tx).unwrap(); + + assert_eq!(stats.inserted_entries, 1); + assert_eq!(result.num_rows(), 2); + } + + #[test] + fn test_insert_with_dictionary_encoded_keys_respects_existing() { + // Test that insert respects existing keys when using dictionary-encoded columns + let key_type = DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)); + let str_type = DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)); + let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new(consts::ATTRIBUTE_KEY, key_type.clone(), false), + Field::new(consts::ATTRIBUTE_STR, str_type.clone(), true), + ])); + + let mut key_builder = StringDictionaryBuilder::::new(); + key_builder.append_value("existing_key"); + let keys = Arc::new(key_builder.finish()); + + let mut val_builder = StringDictionaryBuilder::::new(); + val_builder.append_value("original_value"); + let vals = Arc::new(val_builder.finish()); + + let input = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt16Array::from_iter_values(vec![0])), + Arc::new(UInt8Array::from_iter_values(vec![ + AttributeValueType::Str as u8, + ])), + keys, + vals, + ], + ) + .unwrap(); + + // Try to insert existing_key - should be skipped + let tx = AttributesTransform { + rename: None, + delete: None, + insert: Some(InsertTransform::new(BTreeMap::from([( + "existing_key".into(), + LiteralValue::Str("new_value".into()), + )]))), + }; + + let (result, stats) = transform_attributes_with_stats(&input, &tx).unwrap(); + + // No inserts because key already exists + assert_eq!(stats.inserted_entries, 0); + assert_eq!(result.num_rows(), 1); + } + + #[test] + fn test_insert_with_uint16_dictionary_keys() { + // Test inserting with UInt16 dictionary-encoded key columns + let key_type = DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)); + let str_type = DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)); + let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new(consts::ATTRIBUTE_KEY, key_type.clone(), false), + Field::new(consts::ATTRIBUTE_STR, str_type.clone(), true), + ])); + + let mut key_builder = StringDictionaryBuilder::::new(); + key_builder.append_value("existing_key"); + let keys = Arc::new(key_builder.finish()); + + let mut val_builder = StringDictionaryBuilder::::new(); + val_builder.append_value("existing_value"); + let vals = Arc::new(val_builder.finish()); + + let input = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt16Array::from_iter_values(vec![0])), + Arc::new(UInt8Array::from_iter_values(vec![ + AttributeValueType::Str as u8, + ])), + keys, + vals, + ], + ) + .unwrap(); + + // Insert new key and try to insert existing key + let tx = AttributesTransform { + rename: None, + delete: None, + insert: Some(InsertTransform::new(BTreeMap::from([ + ( + "existing_key".into(), + LiteralValue::Str("should_skip".into()), + ), + ("new_key".into(), LiteralValue::Str("new_value".into())), + ]))), + }; + + let (result, stats) = transform_attributes_with_stats(&input, &tx).unwrap(); + + // Only 1 insert (new_key), existing_key should be skipped + assert_eq!(stats.inserted_entries, 1); + assert_eq!(result.num_rows(), 2); + } + + #[test] + fn test_insert_with_empty_batch() { + // Test inserting when input batch is empty + let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new(consts::ATTRIBUTE_KEY, DataType::Utf8, false), + Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), + ])); + + let input = RecordBatch::new_empty(schema.clone()); + + let tx = AttributesTransform { + rename: None, + delete: None, + insert: Some(InsertTransform::new(BTreeMap::from([( + "key".into(), + LiteralValue::Str("value".into()), + )]))), + }; + + let (result, stats) = transform_attributes_with_stats(&input, &tx).unwrap(); + + // No parents to insert into + assert_eq!(stats.inserted_entries, 0); + assert_eq!(result.num_rows(), 0); + } + + #[test] + fn test_insert_with_multiple_parents() { + // Test insert correctly handles multiple different parent IDs + let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new(consts::ATTRIBUTE_KEY, DataType::Utf8, false), + Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), + ])); + + // 3 parents: 0, 1, 2 with different existing keys + let input = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt16Array::from_iter_values(vec![0, 0, 1, 2, 2, 2])), + Arc::new(UInt8Array::from_iter_values(vec![ + AttributeValueType::Str as u8, + AttributeValueType::Str as u8, + AttributeValueType::Str as u8, + AttributeValueType::Str as u8, + AttributeValueType::Str as u8, + AttributeValueType::Str as u8, + ])), + Arc::new(StringArray::from_iter_values(vec![ + "a", "b", "a", "x", "y", "z", + ])), + Arc::new(StringArray::from_iter_values(vec![ + "v0", "v1", "v2", "v3", "v4", "v5", + ])), + ], + ) + .unwrap(); + + // Insert "a" and "new_key" + // Parent 0: has "a" -> skip "a", insert "new_key" + // Parent 1: has "a" -> skip "a", insert "new_key" + // Parent 2: no "a" -> insert both "a" and "new_key" + let tx = AttributesTransform { + rename: None, + delete: None, + insert: Some(InsertTransform::new(BTreeMap::from([ + ("a".into(), LiteralValue::Str("inserted_a".into())), + ("new_key".into(), LiteralValue::Str("new_val".into())), + ]))), + }; + + let (result, stats) = transform_attributes_with_stats(&input, &tx).unwrap(); + + // 1 + 1 + 2 = 4 inserts + assert_eq!(stats.inserted_entries, 4); + // 6 original + 4 inserted = 10 + assert_eq!(result.num_rows(), 10); + } + + #[test] + fn test_insert_with_null_in_key_column() { + // Test that null keys in original data are handled properly + let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new(consts::ATTRIBUTE_KEY, DataType::Utf8, true), // nullable + Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), + ])); + + // Parent 0 has a null key + let input = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt16Array::from_iter_values(vec![0])), + Arc::new(UInt8Array::from_iter_values(vec![ + AttributeValueType::Str as u8, + ])), + Arc::new(StringArray::from(vec![None::<&str>])), + Arc::new(StringArray::from_iter_values(vec!["value"])), + ], + ) + .unwrap(); + + let tx = AttributesTransform { + rename: None, + delete: None, + insert: Some(InsertTransform::new(BTreeMap::from([( + "new_key".into(), + LiteralValue::Str("new_value".into()), + )]))), + }; + + let (result, stats) = transform_attributes_with_stats(&input, &tx).unwrap(); + + // Should insert new_key for parent 0 + assert_eq!(stats.inserted_entries, 1); + assert_eq!(result.num_rows(), 2); + } + + #[test] + fn test_insert_with_u32_parent_ids() { + // Test that insert works with u32 parent IDs (used for metrics datapoint attributes, + // span link attributes, span event attributes). + let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt32, false), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new(consts::ATTRIBUTE_KEY, DataType::Utf8, false), + Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), + ])); + + let input = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from_iter_values(vec![0, 1])), + Arc::new(UInt8Array::from_iter_values(vec![ + AttributeValueType::Str as u8, + AttributeValueType::Str as u8, + ])), + Arc::new(StringArray::from_iter_values(vec!["k1", "k2"])), + Arc::new(StringArray::from_iter_values(vec!["v1", "v2"])), + ], + ) + .unwrap(); + + // Insert "env"="prod" - should be inserted for both parents + let tx = AttributesTransform { + rename: None, + delete: None, + insert: Some(InsertTransform::new(BTreeMap::from([( + "env".into(), + LiteralValue::Str("prod".into()), + )]))), + }; + + let (result, stats) = transform_attributes_with_stats(&input, &tx).unwrap(); + + assert_eq!(stats.inserted_entries, 2); + assert_eq!(result.num_rows(), 4); + + // Verify parent ID column is still u32 + let parent_col = result.column_by_name(consts::PARENT_ID).unwrap(); + assert_eq!(parent_col.data_type(), &DataType::UInt32); + } + + #[test] + fn test_insert_with_u32_parent_ids_respects_existing() { + // Test that insert with u32 parent IDs doesn't overwrite existing keys + let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt32, false), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new(consts::ATTRIBUTE_KEY, DataType::Utf8, false), + Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), + ])); + + // Parent 0 has "existing" + let input = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from_iter_values(vec![0])), + Arc::new(UInt8Array::from_iter_values(vec![ + AttributeValueType::Str as u8, + ])), + Arc::new(StringArray::from_iter_values(vec!["existing"])), + Arc::new(StringArray::from_iter_values(vec!["original"])), + ], + ) + .unwrap(); + + // Try to insert "existing" with a different value - should be skipped + let tx = AttributesTransform { + rename: None, + delete: None, + insert: Some(InsertTransform::new(BTreeMap::from([( + "existing".into(), + LiteralValue::Str("should_not_overwrite".into()), + )]))), + }; + + let (result, stats) = transform_attributes_with_stats(&input, &tx).unwrap(); + + // No inserts because key already exists + assert_eq!(stats.inserted_entries, 0); + assert_eq!(result.num_rows(), 1); + + // Verify original value is preserved + let vals = result + .column_by_name(consts::ATTRIBUTE_STR) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(vals.value(0), "original"); + } +}