1
1
mod aggregate;
2
2
mod exponential_histogram;
3
+ mod hashed;
3
4
mod histogram;
4
5
mod last_value;
5
6
mod precomputed_sum;
@@ -15,13 +16,12 @@ use std::sync::{Arc, RwLock};
15
16
use aggregate:: is_under_cardinality_limit;
16
17
pub ( crate ) use aggregate:: { AggregateBuilder , ComputeAggregation , Measure } ;
17
18
pub ( crate ) use exponential_histogram:: { EXPO_MAX_SCALE , EXPO_MIN_SCALE } ;
19
+ use hashed:: { Hashed , HashedNoOpBuilder } ;
18
20
use once_cell:: sync:: Lazy ;
19
21
use opentelemetry:: { otel_warn, KeyValue } ;
20
22
21
- use crate :: metrics:: AttributeSet ;
22
-
23
- pub ( crate ) static STREAM_OVERFLOW_ATTRIBUTES : Lazy < Vec < KeyValue > > =
24
- Lazy :: new ( || vec ! [ KeyValue :: new( "otel.metric.overflow" , "true" ) ] ) ;
23
+ pub ( crate ) static STREAM_OVERFLOW_ATTRIBUTES : Lazy < Hashed < ' static , [ KeyValue ] > > =
24
+ Lazy :: new ( || Hashed :: from_owned ( vec ! [ KeyValue :: new( "otel.metric.overflow" , "true" ) ] ) ) ;
25
25
26
26
pub ( crate ) trait Aggregator {
27
27
/// A static configuration that is needed in order to initialize aggregator.
52
52
A : Aggregator ,
53
53
{
54
54
/// Trackers store the values associated with different attribute sets.
55
- trackers : RwLock < HashMap < Vec < KeyValue > , Arc < A > > > ,
55
+ trackers : RwLock < HashMap < Hashed < ' static , [ KeyValue ] > , Arc < A > , HashedNoOpBuilder > > ,
56
56
/// Number of different attribute set stored in the `trackers` map.
57
57
count : AtomicUsize ,
58
58
/// Indicates whether a value with no attributes has been stored.
69
69
{
70
70
fn new ( config : A :: InitConfig ) -> Self {
71
71
ValueMap {
72
- trackers : RwLock :: new ( HashMap :: new ( ) ) ,
72
+ trackers : RwLock :: new ( HashMap :: default ( ) ) ,
73
73
has_no_attribute_value : AtomicBool :: new ( false ) ,
74
74
no_attribute_tracker : A :: create ( & config) ,
75
75
count : AtomicUsize :: new ( 0 ) ,
@@ -84,19 +84,25 @@ where
84
84
return ;
85
85
}
86
86
87
+ let attributes = Hashed :: from_borrowed ( attributes) ;
88
+
87
89
let Ok ( trackers) = self . trackers . read ( ) else {
88
90
return ;
89
91
} ;
90
92
91
93
// Try to retrieve and update the tracker with the attributes in the provided order first
92
- if let Some ( tracker) = trackers. get ( attributes) {
94
+ if let Some ( tracker) = trackers. get ( & attributes) {
93
95
tracker. update ( value) ;
94
96
return ;
95
97
}
96
98
97
99
// Try to retrieve and update the tracker with the attributes sorted.
98
- let sorted_attrs = AttributeSet :: from ( attributes) . into_vec ( ) ;
99
- if let Some ( tracker) = trackers. get ( sorted_attrs. as_slice ( ) ) {
100
+ let sorted_attrs = attributes. clone ( ) . mutate ( |list| {
101
+ // use stable sort
102
+ list. sort_by ( |a, b| a. key . cmp ( & b. key ) ) ;
103
+ dedup_remove_first ( list, |a, b| a. key == b. key ) ;
104
+ } ) ;
105
+ if let Some ( tracker) = trackers. get ( & sorted_attrs) {
100
106
tracker. update ( value) ;
101
107
return ;
102
108
}
@@ -110,20 +116,20 @@ where
110
116
111
117
// Recheck both the provided and sorted orders after acquiring the write lock
112
118
// in case another thread has pushed an update in the meantime.
113
- if let Some ( tracker) = trackers. get ( attributes) {
119
+ if let Some ( tracker) = trackers. get ( & attributes) {
114
120
tracker. update ( value) ;
115
- } else if let Some ( tracker) = trackers. get ( sorted_attrs. as_slice ( ) ) {
121
+ } else if let Some ( tracker) = trackers. get ( & sorted_attrs) {
116
122
tracker. update ( value) ;
117
123
} else if is_under_cardinality_limit ( self . count . load ( Ordering :: SeqCst ) ) {
118
124
let new_tracker = Arc :: new ( A :: create ( & self . config ) ) ;
119
125
new_tracker. update ( value) ;
120
126
121
127
// Insert tracker with the attributes in the provided and sorted orders
122
- trackers. insert ( attributes. to_vec ( ) , new_tracker. clone ( ) ) ;
128
+ trackers. insert ( attributes. into_owned ( ) , new_tracker. clone ( ) ) ;
123
129
trackers. insert ( sorted_attrs, new_tracker) ;
124
130
125
131
self . count . fetch_add ( 1 , Ordering :: SeqCst ) ;
126
- } else if let Some ( overflow_value) = trackers. get ( STREAM_OVERFLOW_ATTRIBUTES . as_slice ( ) ) {
132
+ } else if let Some ( overflow_value) = trackers. get ( & STREAM_OVERFLOW_ATTRIBUTES ) {
127
133
overflow_value. update ( value) ;
128
134
} else {
129
135
let new_tracker = A :: create ( & self . config ) ;
@@ -153,7 +159,7 @@ where
153
159
let mut seen = HashSet :: new ( ) ;
154
160
for ( attrs, tracker) in trackers. iter ( ) {
155
161
if seen. insert ( Arc :: as_ptr ( tracker) ) {
156
- dest. push ( map_fn ( attrs. clone ( ) , tracker) ) ;
162
+ dest. push ( map_fn ( attrs. clone ( ) . into_inner_owned ( ) , tracker) ) ;
157
163
}
158
164
}
159
165
}
@@ -183,8 +189,25 @@ where
183
189
let mut seen = HashSet :: new ( ) ;
184
190
for ( attrs, tracker) in trackers. into_iter ( ) {
185
191
if seen. insert ( Arc :: as_ptr ( & tracker) ) {
186
- dest. push ( map_fn ( attrs, tracker. clone_and_reset ( & self . config ) ) ) ;
192
+ dest. push ( map_fn (
193
+ attrs. into_inner_owned ( ) ,
194
+ tracker. clone_and_reset ( & self . config ) ,
195
+ ) ) ;
196
+ }
197
+ }
198
+ }
199
+ }
200
+
201
+ fn dedup_remove_first < T > ( values : & mut Vec < T > , is_eq : impl Fn ( & T , & T ) -> bool ) {
202
+ // we cannot use vec.dedup_by because it will remove last duplicate not first
203
+ if values. len ( ) > 1 {
204
+ let mut i = values. len ( ) - 1 ;
205
+ while i != 0 {
206
+ let is_same = unsafe { is_eq ( values. get_unchecked ( i - 1 ) , values. get_unchecked ( i) ) } ;
207
+ if is_same {
208
+ values. remove ( i - 1 ) ;
187
209
}
210
+ i -= 1 ;
188
211
}
189
212
}
190
213
}
@@ -392,8 +415,45 @@ impl AtomicallyUpdate<f64> for f64 {
392
415
393
416
#[ cfg( test) ]
394
417
mod tests {
418
+ use std:: usize;
419
+
395
420
use super :: * ;
396
421
422
+ fn assert_deduped < const N : usize , const M : usize > (
423
+ input : [ ( i32 , bool ) ; N ] ,
424
+ expect : [ ( i32 , bool ) ; M ] ,
425
+ ) {
426
+ let mut list: Vec < ( i32 , bool ) > = Vec :: from ( input) ;
427
+ dedup_remove_first ( & mut list, |a, b| a. 0 == b. 0 ) ;
428
+ assert_eq ! ( list, expect) ;
429
+ }
430
+
431
+ #[ test]
432
+ fn deduplicate_by_removing_first_element_from_sorted_array ( ) {
433
+ assert_deduped ( [ ] , [ ] ) ;
434
+ assert_deduped ( [ ( 1 , true ) ] , [ ( 1 , true ) ] ) ;
435
+ assert_deduped ( [ ( 1 , false ) , ( 1 , false ) , ( 1 , true ) ] , [ ( 1 , true ) ] ) ;
436
+ assert_deduped (
437
+ [ ( 1 , true ) , ( 2 , false ) , ( 2 , false ) , ( 2 , true ) ] ,
438
+ [ ( 1 , true ) , ( 2 , true ) ] ,
439
+ ) ;
440
+ assert_deduped (
441
+ [ ( 1 , true ) , ( 1 , false ) , ( 1 , true ) , ( 2 , true ) ] ,
442
+ [ ( 1 , true ) , ( 2 , true ) ] ,
443
+ ) ;
444
+ assert_deduped (
445
+ [
446
+ ( 1 , false ) ,
447
+ ( 1 , true ) ,
448
+ ( 2 , false ) ,
449
+ ( 2 , true ) ,
450
+ ( 3 , false ) ,
451
+ ( 3 , true ) ,
452
+ ] ,
453
+ [ ( 1 , true ) , ( 2 , true ) , ( 3 , true ) ] ,
454
+ ) ;
455
+ }
456
+
397
457
#[ test]
398
458
fn can_store_u64_atomic_value ( ) {
399
459
let atomic = u64:: new_atomic_tracker ( 0 ) ;
0 commit comments