-
Notifications
You must be signed in to change notification settings - Fork 67
feat: support attribute insertion in OTAP transform #1737
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1737 +/- ##
==========================================
+ Coverage 84.40% 84.45% +0.05%
==========================================
Files 496 496
Lines 145393 146578 +1185
==========================================
+ Hits 122716 123796 +1080
- Misses 22143 22248 +105
Partials 534 534
🚀 New features to boost your workflow:
|
| .unwrap() | ||
| .attributes; | ||
|
|
||
| println!("Resource attrs: {:?}", res_attrs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can get rid of this println right? Assuming it's leftover debugging code
| println!("Resource attrs: {:?}", res_attrs); |
| assert!(res_attrs.iter().any(|kv| kv.key == "c")); | ||
| assert!(res_attrs.iter().any(|kv| kv.key == "r")); | ||
|
|
||
| // Scope 'c' should remain |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand this comment. Something like this might make it more clear:
| // Scope 'c' should remain | |
| // Scope 'c' should not be inserted; 'a' should remain |
| assert!(!scope_attrs.iter().any(|kv| kv.key == "c")); | ||
| assert!(scope_attrs.iter().any(|kv| kv.key == "a")); | ||
|
|
||
| // Log 'c' should be deleted; 'b' should remain |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be more accurate to say the attribute 'c' was not inserted vs that it was deleted
| // Log 'c' should be deleted; 'b' should remain | |
| // Log 'c' should not be inserted; 'b' should remain |
| let lit_val = match value { | ||
| Value::String(s) => Some(LiteralValue::Str(s)), | ||
| Value::Number(n) => { | ||
| if let Some(i) = n.as_i64() { | ||
| Some(LiteralValue::Int(i)) | ||
| } else { | ||
| n.as_f64().map(LiteralValue::Double) | ||
| } | ||
| } | ||
| Value::Bool(b) => Some(LiteralValue::Bool(b)), | ||
| // Ignore null, object, array as not supported by LiteralValue | ||
| _ => None, | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably return an error here rather than silently ignoring if the user supplies a type that's not supported.
We might be able to get this behaviour automatically if we define the Insert variant of Action as:
/// Insert a new attribute.
Insert {
/// The attribute key to insert.
key: String,
/// The value to insert.
value: LiteralValue,
},And define otap_df_pdata::otap::transform::LiteralValue as:
use serde::Deserialize
#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(untagged)]
pub enum LiteralValue {
Str(String),
Int(i64),
Double(f64),
Bool(bool),
}| let decoded = ExportLogsServiceRequest::decode(bytes.as_ref()).expect("decode"); | ||
|
|
||
| let log_attrs = &decoded.resource_logs[0].scope_logs[0].log_records[0].attributes; | ||
| assert!(log_attrs.iter().any(|kv| kv.key == "count")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this test (and the others that have been added in this module), should we also assert on the value of the attribute that was inserted?
| assert!(log_attrs.iter().any(|kv| kv.key == "count")); | |
| assert!( | |
| log_attrs | |
| .iter() | |
| .any(|kv| kv.key == "count" && kv.value == Some(AnyValue::new_int(42))); |
| pub struct InsertTransform { | ||
| pub(super) entries: Vec<(String, LiteralValue)>, | ||
| } | ||
|
|
||
| impl InsertTransform { | ||
| #[must_use] | ||
| pub fn new(entries: Vec<(String, LiteralValue)>) -> Self { | ||
| Self { entries } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using Vec<(String, LiteralValue)> here, would it make sense to use BTreeMap<String, LiteralValue> like we do for RenameTransform? That way it wouldn't be possible to create an InsertTransform with invalid duplicate entries.
| if let Some(insert) = &self.insert { | ||
| let mut insert_keys = BTreeSet::new(); | ||
| for (key, _) in &insert.entries { | ||
| if !insert_keys.insert(key) { | ||
| return Err(Error::InvalidAttributeTransform { | ||
| reason: format!("Duplicate key in insert: {key}"), | ||
| }); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitiely it makes sense to validate that there are no duplicate keys being inserted, but I think it also makes sense to validate that the keys of the attributes we're inserting are not the same keys as the attributes being deleted or replaced.
For example, if we had:
replace: [("x", "y")]
insert: [("x", String("a"))]
There's an ambiguity about what the final result will be. If rename gets applied first, attributes "x" will have the value "a". But if insert gets applied before rename, "x" will have the original value. To avoid all this ambiguity, currently the logic in this method says that no keys can be duplicated.
| if let Some(insert) = &self.insert { | |
| let mut insert_keys = BTreeSet::new(); | |
| for (key, _) in &insert.entries { | |
| if !insert_keys.insert(key) { | |
| return Err(Error::InvalidAttributeTransform { | |
| reason: format!("Duplicate key in insert: {key}"), | |
| }); | |
| } | |
| } | |
| } | |
| if let Some(insert) = &self.insert { | |
| for (key, _) in &insert.entries { | |
| if ! all_keys.insert(key) { | |
| return Err(Error::InvalidAttributeTransform { | |
| reason: format!("Duplicate key in insert: {key}"), | |
| }); | |
| } | |
| } | |
| } |
|
|
||
| /// Helper to extract string key from a record batch key column at a given index. | ||
| /// Handles both plain Utf8 and Dictionary-encoded columns. | ||
| fn get_key_at_index(key_col: &ArrayRef, idx: usize) -> Option<String> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a helper type that can encapsulate this logic: otap_df_pdata::arrays::StringArrayAccessor
| pub(crate) type StringArrayAccessor<'a> = MaybeDictArrayAccessor<'a, StringArray>; |
Instead of having a method defined here specially for this, below in the create_inserted_batch you could do
let key_col = current_batch
.column_by_name(consts::ATTRIBUTE_KEY)
.map(StringArrayAccessor::try_new)
.transpose()?;Then you could probably return an error if the key_col ends up being None.
This type provides the str_at method which returns an Option<&str>
| })?; | ||
| let key_type = schema.field(key_col_idx).data_type(); | ||
|
|
||
| let new_keys: ArrayRef = match key_type { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using the regular arrow builders and having to write these match statements to append values to arrays that could be dict/native type, we have a set of helper types that can encapsulate this logic.
For example, StringArrayBuilder:
otel-arrow/rust/otap-dataflow/crates/pdata/src/encode/record/array.rs
Lines 650 to 656 in 2c3976c
| pub type StringArrayBuilder = AdaptiveArrayBuilder< | |
| String, | |
| NoArgs, | |
| StringBuilder, | |
| StringDictionaryBuilder<UInt8Type>, | |
| StringDictionaryBuilder<UInt16Type>, | |
| >; |
This type also exposes a append_str_n method which can be used to append the same string multiple times. This is usually faster than appending the values one at a time. So an optimization we could make here, if we're appending the same key multiple times, would be to use this method.
There are similar builders for the other types we're inserting below as values (int, double, bool)
| })?; | ||
|
|
||
| // Build a set of (parent_id, key) pairs that already exist | ||
| let mut existing_keys: BTreeMap<u16, BTreeSet<String>> = BTreeMap::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if there are any ways we could optimize building up the set of attributes we're going to insert.
For example, using a BTreeMap here, and a BTreeSet below for unique_parents means that we'll be hashing every parent_id multiple times. It might be faster to use a RoaringBitmap for the unique_parent_ids, and maybe we could have a RoaringBitmap for each insert entry corresponding to whether the row w/ some parent_id contains the attribute.
| let parent_ids_arr = parent_ids | ||
| .as_any() | ||
| .downcast_ref::<PrimitiveArray<UInt16Type>>() | ||
| .ok_or_else(|| Error::ColumnDataTypeMismatch { | ||
| name: consts::PARENT_ID.into(), | ||
| expect: DataType::UInt16, | ||
| actual: parent_ids.data_type().clone(), | ||
| })?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parent_ids won't always be u16. For example, for metrics datapoint attributes and attributes for span link and span event, the parent ID type can be u32. Also, for u32 IDs, the parent_id_arr can be dictionary encoded (e.g. it's not always a PrimitiveArray).
To handle this, we might need to make the function generic over T where:
T: ParentId,
<T as ParentId>::ArrayType: ArrowPrimitiveType,(You'll see we do something similar in this file for materialize_parent_id_for_attributes).
Then we can get the parent_ids as:
| let parent_ids_arr = parent_ids | |
| .as_any() | |
| .downcast_ref::<PrimitiveArray<UInt16Type>>() | |
| .ok_or_else(|| Error::ColumnDataTypeMismatch { | |
| name: consts::PARENT_ID.into(), | |
| expect: DataType::UInt16, | |
| actual: parent_ids.data_type().clone(), | |
| })?; | |
| let parent_ids_arr = MaybeDictArrayAccessor::<PrimitiveArray<T::ArrayType>>::try_new( | |
| get_required_array(record_batch, consts::PARENT_ID)?, | |
| )?; |
To ensure we hande u32 parent IDs correctly, it probably also makes sense to add a test for this somewhere.
| // We collect columns into a map or vec matching schema order. | ||
| let mut columns = Vec::with_capacity(schema.fields().len()); | ||
|
|
||
| for field in schema.fields() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a case that the logic here might not handle correctly, which is if we're inserting a type of attribute and the original schema did not previously contain any attributes of this type.
All the ATTRIBUTE_* columns are optional in OTAP. For example, if some attribute RecordBatch had no values of type int, the ATTRIBUTE_INT column would be omitted. If we encountered such a record batch and we were inserting an integer attribute, it would not be included in the original batch with this logic.
We should add a test for this and handle it. We might need to write some custom batch concatenation logic for this, rather than relying on arrows concat_batches` compute kernel, or make some modifications to the original batch before we invoke this function
| let combined = arrow::compute::concat_batches(&rb.schema(), &[rb, new_rows]) | ||
| .map_err(|e| Error::Format { | ||
| error: e.to_string(), | ||
| })?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned in https://github.com/open-telemetry/otel-arrow/pull/1737/files#r2704902249 that we might need to be careful about how we invoke this concat_batches function due to the case where we're inserting a column that was not contained in the original batch.
There's another case we probably need to handle as well which is, if the column is dictionary encoded and inserting the new value would cause the dictionary to overflow, then we need to either expand the key type (e.g. convert from a Dict to a Dict) or convert from Dict to non dict encoded array.
albertlockett
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ThomsonTan thanks for taking this! I'm really excited to see this desperately needed feature getting implemented and this looks like a great start!
I left a few comments, mostly around where we could use different types to simplify the code, some edge cases in the OTAP protocol and suggestions for optimizations.
WRT to optimization, I have a few general comments:
-
We could probably do them in a followup PR after adding new benchmarks to the existing suite in
benchmarks/benches/attribute_transform/main.rs. Having benchmarks will give us confidence that the optimizations we're adding are actually effective. -
Another optimization we could consider in the future would be: currently we're materializing the
RecordBatchfor the rename & deletes, then materializing another for the inserts, and concatenating them together. This means that for each column, we create twoArc<dyn Array>, and after discard them while concatenating them into a newArc<dyn Array>for the final result. We might be able to avoid this by:
a) inserting the new keys while we're doing the rename/delete
b) inserting the new values while taking the values/parent_id columns
I realize that this makes the implementation significantly more complex, so it's fine if we want to just document this as a future optimization. The only reason I'm calling it out ahead of time is that some of the code we write to handle OTAP edge cases (see https://github.com/open-telemetry/otel-arrow/pull/1737/files#r2704924006) would be different with this optimization in place.
Fix #1035