@@ -97,9 +97,23 @@ use futures::{
97
97
Future , Stream , StreamExt ,
98
98
} ;
99
99
use std:: pin:: Pin ;
100
+ use std:: str:: FromStr ;
100
101
use std:: sync:: { Arc , Mutex } ;
101
102
use std:: time;
102
103
104
+ /// Delay interval between two consecutive exports, default to be 5000.
105
+ const OTEL_BSP_SCHEDULE_DELAY_MILLIS : & str = "OTEL_BSP_SCHEDULE_DELAY_MILLIS" ;
106
+ /// Default delay interval between two consecutive exports.
107
+ const OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT : u64 = 5000 ;
108
+ /// Maximum queue size, default to be 2048
109
+ const OTEL_BSP_MAX_QUEUE_SIZE : & str = "OTEL_BSP_MAX_QUEUE_SIZE" ;
110
+ /// Default maximum queue size
111
+ const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT : usize = 2048 ;
112
+ /// Maximum batch size, must be less than or equal to OTEL_BSP_MAX_QUEUE_SIZE, default to be 512
113
+ const OTEL_BSP_MAX_EXPORT_BATCH_SIZE : & str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE" ;
114
+ /// Default maximum batch size
115
+ const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT : usize = 512 ;
116
+
103
117
/// A [`SpanProcessor`] that exports synchronously when spans are finished.
104
118
///
105
119
/// [`SpanProcessor`]: ../../../api/trace/span_processor/trait.SpanProcessor.html
@@ -269,6 +283,57 @@ impl BatchSpanProcessor {
269
283
config : Default :: default ( ) ,
270
284
}
271
285
}
286
+
287
+ /// Create a new batch processor builder and set the config value based on environment variables.
288
+ ///
289
+ /// If the value in environment variables is illegal, will fall back to use default value.
290
+ ///
291
+ /// Note that export batch size should be less than or equals to max queue size.
292
+ /// If export batch size is larger than max queue size, we will lower to be the same as max
293
+ /// queue size
294
+ pub fn from_env < E , S , SO , I , IO > (
295
+ exporter : E ,
296
+ spawn : S ,
297
+ interval : I ,
298
+ ) -> BatchSpanProcessorBuilder < E , S , I >
299
+ where
300
+ E : exporter:: trace:: SpanExporter ,
301
+ S : Fn ( BatchSpanProcessorWorker ) -> SO ,
302
+ I : Fn ( time:: Duration ) -> IO ,
303
+ {
304
+ let mut config = BatchConfig :: default ( ) ;
305
+ let schedule_delay = std:: env:: var ( OTEL_BSP_SCHEDULE_DELAY_MILLIS )
306
+ . map ( |delay| u64:: from_str ( & delay) . unwrap_or ( OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT ) )
307
+ . unwrap_or ( OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT ) ;
308
+ config. scheduled_delay = time:: Duration :: from_millis ( schedule_delay) ;
309
+
310
+ let max_queue_size = std:: env:: var ( OTEL_BSP_MAX_QUEUE_SIZE )
311
+ . map ( |queue_size| {
312
+ usize:: from_str ( & queue_size) . unwrap_or ( OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT )
313
+ } )
314
+ . unwrap_or ( OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT ) ;
315
+ config. max_queue_size = max_queue_size;
316
+
317
+ let max_export_batch_size = std:: env:: var ( OTEL_BSP_MAX_EXPORT_BATCH_SIZE )
318
+ . map ( |batch_size| {
319
+ usize:: from_str ( & batch_size) . unwrap_or ( OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT )
320
+ } )
321
+ . unwrap_or ( OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT ) ;
322
+ // max export batch size must be less or equal to max queue size.
323
+ // we set max export batch size to max queue size if it's larger than max queue size.
324
+ if max_export_batch_size > max_queue_size {
325
+ config. max_export_batch_size = max_queue_size;
326
+ } else {
327
+ config. max_export_batch_size = max_export_batch_size;
328
+ }
329
+
330
+ BatchSpanProcessorBuilder {
331
+ config,
332
+ exporter,
333
+ spawn,
334
+ interval,
335
+ }
336
+ }
272
337
}
273
338
274
339
/// Batch span processor configuration
@@ -292,9 +357,9 @@ pub struct BatchConfig {
292
357
impl Default for BatchConfig {
293
358
fn default ( ) -> Self {
294
359
BatchConfig {
295
- max_queue_size : 2048 ,
296
- scheduled_delay : time:: Duration :: from_secs ( 5 ) ,
297
- max_export_batch_size : 512 ,
360
+ max_queue_size : OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT ,
361
+ scheduled_delay : time:: Duration :: from_millis ( OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT ) ,
362
+ max_export_batch_size : OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT ,
298
363
}
299
364
}
300
365
}
@@ -333,10 +398,16 @@ where
333
398
BatchSpanProcessorBuilder { config, ..self }
334
399
}
335
400
336
- /// Set max export size for batches
401
+ /// Set max export size for batches, should always less than or equals to max queue size.
402
+ ///
403
+ /// If input is larger than max queue size, will lower it to be equal to max queue size
337
404
pub fn with_max_export_batch_size ( self , size : usize ) -> Self {
338
405
let mut config = self . config ;
339
- config. max_export_batch_size = size;
406
+ if size > config. max_queue_size {
407
+ config. max_export_batch_size = config. max_queue_size ;
408
+ } else {
409
+ config. max_export_batch_size = size;
410
+ }
340
411
341
412
BatchSpanProcessorBuilder { config, ..self }
342
413
}
@@ -351,3 +422,47 @@ where
351
422
)
352
423
}
353
424
}
425
+
426
+ #[ cfg( test) ]
427
+ mod tests {
428
+ use crate :: exporter:: trace:: stdout;
429
+ use crate :: sdk:: trace:: span_processor:: {
430
+ OTEL_BSP_MAX_EXPORT_BATCH_SIZE , OTEL_BSP_MAX_QUEUE_SIZE ,
431
+ OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT , OTEL_BSP_SCHEDULE_DELAY_MILLIS ,
432
+ OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT ,
433
+ } ;
434
+ use crate :: sdk:: BatchSpanProcessor ;
435
+ use std:: time;
436
+
437
+ #[ test]
438
+ fn test_build_batch_span_processor_from_env ( ) {
439
+ std:: env:: set_var ( OTEL_BSP_MAX_EXPORT_BATCH_SIZE , "500" ) ;
440
+ std:: env:: set_var ( OTEL_BSP_SCHEDULE_DELAY_MILLIS , "I am not number" ) ;
441
+
442
+ let mut builder = BatchSpanProcessor :: from_env (
443
+ stdout:: Exporter :: new ( std:: io:: stdout ( ) , true ) ,
444
+ tokio:: spawn,
445
+ tokio:: time:: interval,
446
+ ) ;
447
+ // export batch size cannot exceed max queue size
448
+ assert_eq ! ( builder. config. max_export_batch_size, 500 ) ;
449
+ assert_eq ! (
450
+ builder. config. scheduled_delay,
451
+ time:: Duration :: from_millis( OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT )
452
+ ) ;
453
+ assert_eq ! (
454
+ builder. config. max_queue_size,
455
+ OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT
456
+ ) ;
457
+
458
+ std:: env:: set_var ( OTEL_BSP_MAX_QUEUE_SIZE , "120" ) ;
459
+ builder = BatchSpanProcessor :: from_env (
460
+ stdout:: Exporter :: new ( std:: io:: stdout ( ) , true ) ,
461
+ tokio:: spawn,
462
+ tokio:: time:: interval,
463
+ ) ;
464
+
465
+ assert_eq ! ( builder. config. max_export_batch_size, 120 ) ;
466
+ assert_eq ! ( builder. config. max_queue_size, 120 ) ;
467
+ }
468
+ }
0 commit comments