From d689dfc8fbd22f82db83cf695bf3c456ccd93dfa Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 12:06:51 -0400 Subject: [PATCH 01/20] feat(ocurl): simpler, cleaner backend implementation we now only have a single representation of a batch, with its own internal state to handle timeouts. It handles its own locking, and there are no callbacks anymore. --- dune | 2 +- src/client/FQueue.ml | 27 -- src/client/FQueue.mli | 11 - src/client/opentelemetry_client_ocurl.ml | 467 ++++++++++++----------- 4 files changed, 240 insertions(+), 267 deletions(-) delete mode 100644 src/client/FQueue.ml delete mode 100644 src/client/FQueue.mli diff --git a/dune b/dune index f2eef302..815c0f49 100644 --- a/dune +++ b/dune @@ -1,3 +1,3 @@ (env (_ - (flags :standard -warn-error -a+8))) + (flags :standard -warn-error -a+8 -strict-sequence))) diff --git a/src/client/FQueue.ml b/src/client/FQueue.ml deleted file mode 100644 index ce04dad3..00000000 --- a/src/client/FQueue.ml +++ /dev/null @@ -1,27 +0,0 @@ -type 'a t = { - arr: 'a array; - mutable i: int; -} - -let create ~dummy n : _ t = - assert (n >= 1); - { arr = Array.make n dummy; i = 0 } - -let[@inline] size self = self.i - -let[@inline] is_full self = self.i = Array.length self.arr - -let push (self : _ t) x : bool = - if is_full self then - false - else ( - self.arr.(self.i) <- x; - self.i <- 1 + self.i; - true - ) - -let pop_iter_all (self : _ t) f = - for j = 0 to self.i - 1 do - f self.arr.(j) - done; - self.i <- 0 diff --git a/src/client/FQueue.mli b/src/client/FQueue.mli deleted file mode 100644 index 0a03b00d..00000000 --- a/src/client/FQueue.mli +++ /dev/null @@ -1,11 +0,0 @@ -(** queue of fixed size *) - -type 'a t - -val create : dummy:'a -> int -> 'a t - -val size : _ t -> int - -val push : 'a t -> 'a -> bool (* true iff it could write element *) - -val pop_iter_all : 'a t -> ('a -> unit) -> unit diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index c69c4245..9ea214bf 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -33,6 +33,7 @@ let _init_curl = type error = [ `Status of int * Opentelemetry.Proto.Status.status | `Failure of string + | `Sysbreak ] let n_errors = Atomic.make 0 @@ -40,6 +41,7 @@ let n_errors = Atomic.make 0 let n_dropped = Atomic.make 0 let report_err_ = function + | `Sysbreak -> Printf.eprintf "opentelemetry: ctrl-c captured, stopping\n%!" | `Failure msg -> Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg | `Status (code, status) -> @@ -112,6 +114,7 @@ module Curl () : CURL = struct let status = Status.decode_status dec in Error (`Status (code, status)) ) + | exception Sys.Break -> Error `Sysbreak | exception Curl.CurlException (_, code, msg) -> let status = Status.default_status ~code:(Int32.of_int code) @@ -119,19 +122,95 @@ module Curl () : CURL = struct () in Error (`Status (code, status)) - with e -> Error (`Failure (Printexc.to_string e)) + with + | Sys.Break -> Error `Sysbreak + | e -> Error (`Failure (Printexc.to_string e)) end -module type PUSH = sig - type elt +module type BATCH = sig end - val push : elt -> unit +module Batch : sig + type 'a t - val is_empty : unit -> bool + val push : 'a t -> 'a -> bool + (** [push batch x] pushes [x] into the batch, and heuristically + returns [true] if the batch is ready to be emitted (to know if we should + wake up the sending thread, if any) *) - val is_big_enough : unit -> bool + val push' : 'a t -> 'a -> unit - val pop_iter_all : (elt -> unit) -> unit + val is_ready : now:Mtime.t -> _ t -> bool + (** is the batch ready to be sent? This is heuristic. *) + + val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option + (** Is the batch ready to be emitted? If batching is disabled, + this is true as soon as {!is_empty} is false. If a timeout is provided + for this batch, then it will be ready if an element has been in it + for at least the timeout. + @param now passed to implement timeout *) + + val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t +end = struct + type 'a t = { + lock: Mutex.t; + mutable size: int; + mutable q: 'a list; + batch: int option; + timeout: Mtime.span option; + mutable start: Mtime.t; + } + + let make ?batch ?timeout () : _ t = + { + lock = Mutex.create (); + size = 0; + start = Mtime_clock.now (); + q = []; + batch; + timeout; + } + + let is_empty_ self = self.size = 0 + + let timeout_expired_ ~now self : bool = + match self.timeout with + | Some t -> + let elapsed = Mtime.span now self.start in + Mtime.Span.compare elapsed t >= 0 + | None -> false + + let is_full_ self : bool = + match self.batch with + | None -> self.size > 0 + | Some b -> self.size >= b + + let is_ready ~now self : bool = + let@ () = with_mutex_ self.lock in + is_full_ self || timeout_expired_ ~now self + + let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = + let@ () = with_mutex_ self.lock in + if + (force && not (is_empty_ self)) + || is_full_ self || timeout_expired_ ~now self + then ( + let l = self.q in + self.q <- []; + self.size <- 0; + Some l + ) else + None + + let push (self : _ t) x : bool = + let@ () = with_mutex_ self.lock in + if self.size = 0 && Option.is_some self.timeout then + self.start <- Mtime_clock.now (); + self.size <- 1 + self.size; + self.q <- x :: self.q; + let ready = is_full_ self in + ready + + let push' self x = ignore (push self x : bool) end (** An emitter. This is used by {!Backend} below to forward traces/metrics/… @@ -152,57 +231,6 @@ module type EMITTER = sig val cleanup : unit -> unit end -type 'a push = (module PUSH with type elt = 'a) - -type on_full_cb = unit -> unit - -(* make a "push" object, along with a setter for a callback to call when - it's ready to emit a batch *) -let mk_push (type a) ?batch () : - (module PUSH with type elt = a) * (on_full_cb -> unit) = - let on_full : on_full_cb ref = ref ignore in - let push = - match batch with - | None -> - let r = ref None in - let module M = struct - type elt = a - - let is_empty () = !r == None - - let is_big_enough () = !r != None - - let push x = - r := Some x; - !on_full () - - let pop_iter_all f = - Option.iter f !r; - r := None - end in - (module M : PUSH with type elt = a) - | Some n -> - let q = FQueue.create ~dummy:(Obj.magic 0) (3 * n) in - let module M = struct - type elt = a - - let is_empty () = FQueue.size q = 0 - - let is_big_enough () = FQueue.size q >= n - - let push x = - if (not (FQueue.push q x)) || FQueue.size q > n then ( - !on_full (); - if not (FQueue.push q x) then Atomic.incr n_dropped (* drop item *) - ) - - let pop_iter_all f = FQueue.pop_iter_all q f - end in - (module M : PUSH with type elt = a) - in - - push, ( := ) on_full - (* start a thread in the background, running [f()] *) let start_bg_thread (f : unit -> unit) : unit = let run () = @@ -224,169 +252,145 @@ let batch_is_empty = List.for_all l_is_empty https://opentelemetry.io/docs/reference/specification/error-handling/ *) let mk_emitter ~(config : Config.t) () : (module EMITTER) = let open Proto in - let continue = ref true in - - let ((module E_trace) : Trace.resource_spans list push), on_trace_full = - mk_push ?batch:config.batch_traces () - in - let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full - = - mk_push ?batch:config.batch_metrics () - in - let ((module E_logs) : Logs.resource_logs list push), on_logs_full = - mk_push ?batch:config.batch_logs () - in - - let encoder = Pbrt.Encoder.create () in - - let ((module C) as curl) = (module Curl () : CURL) in - - let on_tick_cbs_ = ref (ref []) in - let set_on_tick_callbacks = ( := ) on_tick_cbs_ in - - let send_http_ ~path ~encode x : unit = - Pbrt.Encoder.reset encoder; - encode x encoder; - let data = Pbrt.Encoder.to_string encoder in - match C.send ~path ~decode:(fun _ -> ()) data with - | Ok () -> () - | Error err -> - (* TODO: log error _via_ otel? *) - Atomic.incr n_errors; - report_err_ err - in - - let send_metrics_http (l : Metrics.resource_metrics list list) = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let x = - Metrics_service.default_export_metrics_service_request ~resource_metrics:l - () - in - send_http_ ~path:"/v1/metrics" - ~encode:Metrics_service.encode_export_metrics_service_request x - in - - let send_traces_http (l : Trace.resource_spans list list) = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let x = - Trace_service.default_export_trace_service_request ~resource_spans:l () - in - send_http_ ~path:"/v1/traces" - ~encode:Trace_service.encode_export_trace_service_request x - in - - let send_logs_http (l : Logs.resource_logs list list) = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let x = - Logs_service.default_export_logs_service_request ~resource_logs:l () - in - send_http_ ~path:"/v1/logs" - ~encode:Logs_service.encode_export_logs_service_request x - in - - let last_wakeup = Atomic.make (Mtime_clock.now ()) in - let timeout = Mtime.Span.(config.batch_timeout_ms * ms) in - let batch_timeout () : bool = - let elapsed = Mtime.span (Mtime_clock.now ()) (Atomic.get last_wakeup) in - Mtime.Span.compare elapsed timeout >= 0 - in - - let emit_metrics ?(force = false) () : bool = - if force || ((not force) && E_metrics.is_big_enough ()) then ( - let batch = ref [ AList.pop_all gc_metrics ] in - E_metrics.pop_iter_all (fun l -> batch := l :: !batch); - let do_something = not (l_is_empty !batch) in - if do_something then ( - send_metrics_http !batch; - Atomic.set last_wakeup (Mtime_clock.now ()) - ); - do_something - ) else - false - in - let emit_traces ?(force = false) () : bool = - if force || ((not force) && E_trace.is_big_enough ()) then ( - let batch = ref [] in - E_trace.pop_iter_all (fun l -> batch := l :: !batch); - let do_something = not (l_is_empty !batch) in - if do_something then ( - send_traces_http !batch; - Atomic.set last_wakeup (Mtime_clock.now ()) - ); - do_something - ) else - false - in - let emit_logs ?(force = false) () : bool = - if force || ((not force) && E_logs.is_big_enough ()) then ( - let batch = ref [] in - E_logs.pop_iter_all (fun l -> batch := l :: !batch); - let do_something = not (l_is_empty !batch) in - if do_something then ( - send_logs_http !batch; - Atomic.set last_wakeup (Mtime_clock.now ()) - ); - do_something - ) else - false - in - - let[@inline] guard f = - try f () - with e -> - Printf.eprintf "opentelemetry-curl: uncaught exception: %s\n%!" - (Printexc.to_string e) - in + (* local helpers *) + let open struct + let continue = Atomic.make true + + let timeout = + if config.batch_timeout_ms > 0 then + Some Mtime.Span.(config.batch_timeout_ms * ms) + else + None + + let batch_traces : Trace.resource_spans list Batch.t = + Batch.make ?batch:config.batch_traces ?timeout () + + let batch_metrics : Metrics.resource_metrics list Batch.t = + Batch.make ?batch:config.batch_metrics ?timeout () + + let batch_logs : Logs.resource_logs list Batch.t = + Batch.make ?batch:config.batch_logs ?timeout () + + let encoder = Pbrt.Encoder.create () + + let curl = (module Curl () : CURL) + + module C = (val curl) + + let on_tick_cbs_ = ref (ref []) + + let set_on_tick_callbacks = ( := ) on_tick_cbs_ + + let send_http_ ~path ~encode x : unit = + Pbrt.Encoder.reset encoder; + encode x encoder; + let data = Pbrt.Encoder.to_string encoder in + match C.send ~path ~decode:(fun _ -> ()) data with + | Ok () -> () + | Error `Sysbreak -> + Printf.eprintf "ctrl-c captured, stopping\n%!"; + Atomic.set continue false + | Error err -> + (* TODO: log error _via_ otel? *) + Atomic.incr n_errors; + report_err_ err + + let send_metrics_http (l : Metrics.resource_metrics list list) = + let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let x = + Metrics_service.default_export_metrics_service_request + ~resource_metrics:l () + in + send_http_ ~path:"/v1/metrics" + ~encode:Metrics_service.encode_export_metrics_service_request x - let emit_all_force () = - ignore (emit_traces ~force:true () : bool); - ignore (emit_logs ~force:true () : bool); - ignore (emit_metrics ~force:true () : bool) - in + let send_traces_http (l : Trace.resource_spans list list) = + let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let x = + Trace_service.default_export_trace_service_request ~resource_spans:l () + in + send_http_ ~path:"/v1/traces" + ~encode:Trace_service.encode_export_trace_service_request x + let send_logs_http (l : Logs.resource_logs list list) = + let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let x = + Logs_service.default_export_logs_service_request ~resource_logs:l () + in + send_http_ ~path:"/v1/logs" + ~encode:Logs_service.encode_export_logs_service_request x + + (* emit metrics, if the batch is full or timeout lapsed *) + let emit_metrics_maybe ~now ?force () : bool = + match Batch.pop_if_ready ?force ~now batch_metrics with + | None -> false + | Some l -> + let batch = AList.pop_all gc_metrics :: l in + send_metrics_http batch; + true + + let emit_traces_maybe ~now ?force () : bool = + match Batch.pop_if_ready ?force ~now batch_traces with + | None -> false + | Some l -> + send_traces_http l; + true + + let emit_logs_maybe ~now ?force () : bool = + match Batch.pop_if_ready ?force ~now batch_logs with + | None -> false + | Some l -> + send_logs_http l; + true + + let[@inline] guard_exn_ f = + try f () + with e -> + Printf.eprintf "opentelemetry-curl: uncaught exception: %s\n%!" + (Printexc.to_string e) + + let emit_all_force () = + let now = Mtime_clock.now () in + ignore (emit_traces_maybe ~now ~force:true () : bool); + ignore (emit_logs_maybe ~now ~force:true () : bool); + ignore (emit_metrics_maybe ~now ~force:true () : bool) + end in if config.thread then ( (let m = Mutex.create () in Lock.set_mutex ~lock:(fun () -> Mutex.lock m) ~unlock:(fun () -> Mutex.unlock m)); - let ((module C) as curl) = (module Curl () : CURL) in - let m = Mutex.create () in let cond = Condition.create () in (* loop for the thread that processes events and sends them to collector *) let bg_thread () = - while !continue do - let@ () = guard in - let timeout = batch_timeout () in + while Atomic.get continue do + let@ () = guard_exn_ in - let do_metrics = emit_metrics ~force:timeout () in - let do_traces = emit_traces ~force:timeout () in - let do_logs = emit_logs ~force:timeout () in + let now = Mtime_clock.now () in + let do_metrics = emit_metrics_maybe ~now () in + let do_traces = emit_traces_maybe ~now () in + let do_logs = emit_logs_maybe ~now () in if (not do_metrics) && (not do_traces) && not do_logs then - (* wait *) + (* wait for something to happen *) let@ () = with_mutex_ m in Condition.wait cond m done; - (* flush remaining events *) - let@ () = guard in - ignore (emit_traces ~force:true () : bool); - ignore (emit_metrics ~force:true () : bool); - ignore (emit_logs ~force:true () : bool); + (* flush remaining events once we exit *) + let@ () = guard_exn_ in + emit_all_force (); C.cleanup () in start_bg_thread bg_thread; + (* if the bg thread waits, this will wake it up so it can send batches *) let wakeup () = - with_mutex_ m (fun () -> Condition.signal cond); + with_mutex_ m (fun () -> Condition.broadcast cond); Thread.yield () in - (* wake up if a batch is full *) - on_metrics_full wakeup; - on_trace_full wakeup; - let tick () = if Atomic.get needs_gc_metrics then sample_gc_metrics (); List.iter @@ -396,50 +400,47 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) = Printf.eprintf "on tick callback raised: %s\n" (Printexc.to_string e)) !(!on_tick_cbs_); - if batch_timeout () then wakeup () + + let now = Mtime_clock.now () in + if + (not (Atomic.get continue)) + || Batch.is_ready ~now batch_metrics + || Batch.is_ready ~now batch_traces + || Batch.is_ready ~now batch_logs + then + wakeup () in if config.ticker_thread then ( (* thread that calls [tick()] regularly, to help enforce timeouts *) let tick_thread () = - while true do + while Atomic.get continue do Thread.delay 0.5; tick () - done + done; + wakeup () in start_bg_thread tick_thread ); let module M = struct - let push_trace e = - E_trace.push e; - if batch_timeout () then wakeup () + let push_trace e = if Batch.push batch_traces e then wakeup () - let push_metrics e = - E_metrics.push e; - if batch_timeout () then wakeup () + let push_metrics e = if Batch.push batch_metrics e then wakeup () - let push_logs e = - E_logs.push e; - if batch_timeout () then wakeup () + let push_logs e = if Batch.push batch_logs e then wakeup () let set_on_tick_callbacks = set_on_tick_callbacks let tick = tick let cleanup () = - continue := false; + Atomic.set continue false; with_mutex_ m (fun () -> Condition.broadcast cond) end in (module M) ) else ( - on_metrics_full (fun () -> - if Atomic.get needs_gc_metrics then sample_gc_metrics (); - ignore (emit_metrics () : bool)); - on_trace_full (fun () -> ignore (emit_traces () : bool)); - on_logs_full (fun () -> ignore (emit_logs () : bool)); - let cleanup () = emit_all_force (); C.cleanup () @@ -447,25 +448,33 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) = let module M = struct let push_trace e = - let@ () = guard in - E_trace.push e; - if batch_timeout () then emit_all_force () + let@ () = guard_exn_ in + Batch.push' batch_traces e; + let now = Mtime_clock.now () in + ignore (emit_traces_maybe ~now () : bool) let push_metrics e = - let@ () = guard in - E_metrics.push e; - if batch_timeout () then emit_all_force () + let@ () = guard_exn_ in + if Atomic.get needs_gc_metrics then sample_gc_metrics (); + Batch.push' batch_metrics e; + let now = Mtime_clock.now () in + ignore (emit_metrics_maybe ~now () : bool) let push_logs e = - let@ () = guard in - E_logs.push e; - if batch_timeout () then emit_all_force () + let@ () = guard_exn_ in + Batch.push' batch_logs e; + let now = Mtime_clock.now () in + ignore (emit_logs_maybe ~now () : bool) let set_on_tick_callbacks = set_on_tick_callbacks let tick () = if Atomic.get needs_gc_metrics then sample_gc_metrics (); - if batch_timeout () then emit_all_force () + let now = Mtime_clock.now () in + ignore (emit_traces_maybe ~now () : bool); + ignore (emit_metrics_maybe ~now () : bool); + ignore (emit_logs_maybe ~now () : bool); + () let cleanup = cleanup end in @@ -516,13 +525,13 @@ end) [ make_resource_metrics [ - sum ~name:"otel-export.dropped" ~is_monotonic:true + sum ~name:"otel.export.dropped" ~is_monotonic:true [ int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); ]; - sum ~name:"otel-export.errors" ~is_monotonic:true + sum ~name:"otel.export.errors" ~is_monotonic:true [ int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) @@ -537,12 +546,13 @@ end) { send = (fun m ~ret -> - let@ () = Lock.with_lock in if !debug_ then Format.eprintf "send metrics %a@." (Format.pp_print_list Metrics.pp_resource_metrics) m; + let@ () = Lock.with_lock in + let m = List.rev_append (additional_metrics ()) m in push_metrics m; ret ()); @@ -552,11 +562,12 @@ end) { send = (fun m ~ret -> - let@ () = Lock.with_lock in if !debug_ then Format.eprintf "send logs %a@." (Format.pp_print_list Logs.pp_resource_logs) m; + + let@ () = Lock.with_lock in push_logs m; ret ()); } From 331ae9454701afbdfa23505d428262f728db0fa6 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 12:52:03 -0400 Subject: [PATCH 02/20] detail --- src/client/opentelemetry_client_ocurl.ml | 1 + 1 file changed, 1 insertion(+) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 9ea214bf..9c535877 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -161,6 +161,7 @@ end = struct } let make ?batch ?timeout () : _ t = + Option.iter (fun b -> assert (b > 0)) batch; { lock = Mutex.create (); size = 0; From 9c3e2a7076e954055f52de0d3123c3850063cf1f Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 12:53:00 -0400 Subject: [PATCH 03/20] feat: pass `?stop` atomic; allow for multiple background threads --- src/client/config.ml | 22 ++- src/client/config.mli | 13 +- src/client/opentelemetry_client_ocurl.ml | 191 ++++++++++++---------- src/client/opentelemetry_client_ocurl.mli | 21 ++- tests/bin/emit1.ml | 13 +- 5 files changed, 161 insertions(+), 99 deletions(-) diff --git a/src/client/config.ml b/src/client/config.ml index 3c204946..b86e6305 100644 --- a/src/client/config.ml +++ b/src/client/config.ml @@ -8,7 +8,7 @@ type t = { batch_metrics: int option; batch_logs: int option; batch_timeout_ms: int; - thread: bool; + bg_threads: int; ticker_thread: bool; } @@ -24,20 +24,30 @@ let pp out self = batch_metrics; batch_logs; batch_timeout_ms; - thread; + bg_threads; ticker_thread; } = self in Format.fprintf out "{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_traces=%a;@ batch_metrics=%a;@ \ - batch_logs=%a;@ batch_timeout_ms=%d; thread=%B;@ ticker_thread=%B @]}" + batch_logs=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ ticker_thread=%B @]}" debug url ppheaders headers ppiopt batch_traces ppiopt batch_metrics ppiopt - batch_logs batch_timeout_ms thread ticker_thread + batch_logs batch_timeout_ms bg_threads ticker_thread let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ()) ?(batch_traces = Some 400) ?(batch_metrics = None) ?(batch_logs = Some 400) - ?(batch_timeout_ms = 500) ?(thread = true) ?(ticker_thread = true) () : t = + ?(batch_timeout_ms = 500) ?(thread = true) ?bg_threads + ?(ticker_thread = true) () : t = + let bg_threads = + match bg_threads with + | Some n -> max n 0 + | None -> + if thread then + 4 + else + 0 + in { debug; url; @@ -46,6 +56,6 @@ let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ()) batch_metrics; batch_timeout_ms; batch_logs; - thread; + bg_threads; ticker_thread; } diff --git a/src/client/config.mli b/src/client/config.mli index 3682b9cf..957dba13 100644 --- a/src/client/config.mli +++ b/src/client/config.mli @@ -28,12 +28,13 @@ type t = private { incomplete. Note that the batch might take longer than that, because this is only checked when a new event occurs. Default 500. *) - thread: bool; (** Is there a background thread? Default [true] *) + bg_threads: int; + (** Are there background threads, and how many? Default [4] *) ticker_thread: bool; (** Is there a ticker thread? Default [true]. This thread will regularly call [tick()] on the backend, to make sure it makes progress, and regularly send events to the collector. - This option is ignored if [thread=false]. *) + This option is ignored if [bg_threads=0]. *) } (** Configuration. @@ -49,9 +50,15 @@ val make : ?batch_logs:int option -> ?batch_timeout_ms:int -> ?thread:bool -> + ?bg_threads:int -> ?ticker_thread:bool -> unit -> t -(** Make a configuration *) +(** Make a configuration. + + @param thread if true and [bg_threads] is not provided, we will pick a number + of bg threads. Otherwise the number of [bg_threads] superseeds this option. + + *) val pp : Format.formatter -> t -> unit diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 9c535877..6d7671c2 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -44,37 +44,51 @@ let report_err_ = function | `Sysbreak -> Printf.eprintf "opentelemetry: ctrl-c captured, stopping\n%!" | `Failure msg -> Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg - | `Status (code, status) -> + | `Status (code, { Opentelemetry.Proto.Status.code = scode; message; details }) + -> + let pp_details out l = + List.iter + (fun s -> Format.fprintf out "%S;@ " (Bytes.unsafe_to_string s)) + l + in Format.eprintf - "@[<2>opentelemetry: export failed with@ http code=%d@ status %a@]@." code - Proto.Status.pp_status status + "@[<2>opentelemetry: export failed with@ http code=%d@ status \ + {@[code=%ld;@ message=%S;@ details=[@[%a@]]@]}@]@." + code scode + (Bytes.unsafe_to_string message) + pp_details details -module type CURL = sig - val send : - path:string -> decode:(Pbrt.Decoder.t -> 'a) -> string -> ('a, error) result +module Httpc : sig + type t - val cleanup : unit -> unit -end + val create : unit -> t -(* create a curl client *) -module Curl () : CURL = struct + val send : + t -> + path:string -> + decode:(Pbrt.Decoder.t -> 'a) -> + string -> + ('a, error) result + + val cleanup : t -> unit +end = struct open Opentelemetry.Proto let () = Lazy.force _init_curl - let buf_res = Buffer.create 256 - (* TODO: use Curl.Multi, etc. instead? *) + type t = { + buf_res: Buffer.t; + curl: Curl.t; + } - (* http client *) - let curl : Curl.t = Curl.init () - - let cleanup () = Curl.cleanup curl + let create () : t = { buf_res = Buffer.create 256; curl = Curl.init () } - (* TODO: use Curl multi *) + let cleanup self = Curl.cleanup self.curl (* send the content to the remote endpoint/path *) - let send ~path ~decode (bod : string) : ('a, error) result = + let send (self : t) ~path ~decode (bod : string) : ('a, error) result = + let { curl; buf_res } = self in Curl.reset curl; if !debug_ then Curl.set_verbose curl true; Curl.set_url curl (!url ^ path); @@ -251,12 +265,10 @@ let batch_is_empty = List.for_all l_is_empty exceptions inside should be caught, see https://opentelemetry.io/docs/reference/specification/error-handling/ *) -let mk_emitter ~(config : Config.t) () : (module EMITTER) = +let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let open Proto in (* local helpers *) let open struct - let continue = Atomic.make true - let timeout = if config.batch_timeout_ms > 0 then Some Mtime.Span.(config.batch_timeout_ms * ms) @@ -272,76 +284,71 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) = let batch_logs : Logs.resource_logs list Batch.t = Batch.make ?batch:config.batch_logs ?timeout () - let encoder = Pbrt.Encoder.create () - - let curl = (module Curl () : CURL) - - module C = (val curl) - let on_tick_cbs_ = ref (ref []) let set_on_tick_callbacks = ( := ) on_tick_cbs_ - let send_http_ ~path ~encode x : unit = + let send_http_ (httpc : Httpc.t) encoder ~path ~encode x : unit = Pbrt.Encoder.reset encoder; encode x encoder; let data = Pbrt.Encoder.to_string encoder in - match C.send ~path ~decode:(fun _ -> ()) data with + match Httpc.send httpc ~path ~decode:(fun _ -> ()) data with | Ok () -> () | Error `Sysbreak -> Printf.eprintf "ctrl-c captured, stopping\n%!"; - Atomic.set continue false + Atomic.set stop true | Error err -> (* TODO: log error _via_ otel? *) Atomic.incr n_errors; report_err_ err - let send_metrics_http (l : Metrics.resource_metrics list list) = + let send_metrics_http curl encoder (l : Metrics.resource_metrics list list) + = let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in let x = Metrics_service.default_export_metrics_service_request ~resource_metrics:l () in - send_http_ ~path:"/v1/metrics" + send_http_ curl encoder ~path:"/v1/metrics" ~encode:Metrics_service.encode_export_metrics_service_request x - let send_traces_http (l : Trace.resource_spans list list) = + let send_traces_http curl encoder (l : Trace.resource_spans list list) = let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in let x = Trace_service.default_export_trace_service_request ~resource_spans:l () in - send_http_ ~path:"/v1/traces" + send_http_ curl encoder ~path:"/v1/traces" ~encode:Trace_service.encode_export_trace_service_request x - let send_logs_http (l : Logs.resource_logs list list) = + let send_logs_http curl encoder (l : Logs.resource_logs list list) = let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in let x = Logs_service.default_export_logs_service_request ~resource_logs:l () in - send_http_ ~path:"/v1/logs" + send_http_ curl encoder ~path:"/v1/logs" ~encode:Logs_service.encode_export_logs_service_request x (* emit metrics, if the batch is full or timeout lapsed *) - let emit_metrics_maybe ~now ?force () : bool = + let emit_metrics_maybe ~now ?force httpc encoder : bool = match Batch.pop_if_ready ?force ~now batch_metrics with | None -> false | Some l -> let batch = AList.pop_all gc_metrics :: l in - send_metrics_http batch; + send_metrics_http httpc encoder batch; true - let emit_traces_maybe ~now ?force () : bool = + let emit_traces_maybe ~now ?force httpc encoder : bool = match Batch.pop_if_ready ?force ~now batch_traces with | None -> false | Some l -> - send_traces_http l; + send_traces_http httpc encoder l; true - let emit_logs_maybe ~now ?force () : bool = + let emit_logs_maybe ~now ?force httpc encoder : bool = match Batch.pop_if_ready ?force ~now batch_logs with | None -> false | Some l -> - send_logs_http l; + send_logs_http httpc encoder l; true let[@inline] guard_exn_ f = @@ -350,13 +357,13 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) = Printf.eprintf "opentelemetry-curl: uncaught exception: %s\n%!" (Printexc.to_string e) - let emit_all_force () = + let emit_all_force (httpc : Httpc.t) encoder = let now = Mtime_clock.now () in - ignore (emit_traces_maybe ~now ~force:true () : bool); - ignore (emit_logs_maybe ~now ~force:true () : bool); - ignore (emit_metrics_maybe ~now ~force:true () : bool) + ignore (emit_traces_maybe ~now ~force:true httpc encoder : bool); + ignore (emit_logs_maybe ~now ~force:true httpc encoder : bool); + ignore (emit_metrics_maybe ~now ~force:true httpc encoder : bool) end in - if config.thread then ( + if config.bg_threads > 0 then ( (let m = Mutex.create () in Lock.set_mutex ~lock:(fun () -> Mutex.lock m) @@ -367,13 +374,15 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) = (* loop for the thread that processes events and sends them to collector *) let bg_thread () = - while Atomic.get continue do + let httpc = Httpc.create () in + let encoder = Pbrt.Encoder.create () in + while not @@ Atomic.get stop do let@ () = guard_exn_ in let now = Mtime_clock.now () in - let do_metrics = emit_metrics_maybe ~now () in - let do_traces = emit_traces_maybe ~now () in - let do_logs = emit_logs_maybe ~now () in + let do_metrics = emit_metrics_maybe ~now httpc encoder in + let do_traces = emit_traces_maybe ~now httpc encoder in + let do_logs = emit_logs_maybe ~now httpc encoder in if (not do_metrics) && (not do_traces) && not do_logs then (* wait for something to happen *) let@ () = with_mutex_ m in @@ -381,14 +390,21 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) = done; (* flush remaining events once we exit *) let@ () = guard_exn_ in - emit_all_force (); - C.cleanup () + emit_all_force httpc encoder; + Httpc.cleanup httpc in - start_bg_thread bg_thread; + + for _i = 1 to config.bg_threads do + start_bg_thread bg_thread + done; (* if the bg thread waits, this will wake it up so it can send batches *) - let wakeup () = - with_mutex_ m (fun () -> Condition.broadcast cond); + let wakeup ~all () = + with_mutex_ m (fun () -> + if all then + Condition.broadcast cond + else + Condition.signal cond); Thread.yield () in @@ -403,90 +419,97 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) = !(!on_tick_cbs_); let now = Mtime_clock.now () in - if - (not (Atomic.get continue)) - || Batch.is_ready ~now batch_metrics + if Atomic.get stop then + wakeup ~all:true () + else if + Batch.is_ready ~now batch_metrics || Batch.is_ready ~now batch_traces || Batch.is_ready ~now batch_logs then - wakeup () + wakeup ~all:false () in if config.ticker_thread then ( (* thread that calls [tick()] regularly, to help enforce timeouts *) let tick_thread () = - while Atomic.get continue do + while not @@ Atomic.get stop do Thread.delay 0.5; tick () done; - wakeup () + wakeup ~all:true () in start_bg_thread tick_thread ); let module M = struct - let push_trace e = if Batch.push batch_traces e then wakeup () + let push_trace e = if Batch.push batch_traces e then wakeup ~all:false () - let push_metrics e = if Batch.push batch_metrics e then wakeup () + let push_metrics e = + if Batch.push batch_metrics e then wakeup ~all:false () - let push_logs e = if Batch.push batch_logs e then wakeup () + let push_logs e = if Batch.push batch_logs e then wakeup ~all:false () let set_on_tick_callbacks = set_on_tick_callbacks let tick = tick let cleanup () = - Atomic.set continue false; + Atomic.set stop true; + if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!"; + (* wakeup everyone *) with_mutex_ m (fun () -> Condition.broadcast cond) end in (module M) ) else ( - let cleanup () = - emit_all_force (); - C.cleanup () - in + let httpc = Httpc.create () in + let encoder = Pbrt.Encoder.create () in let module M = struct let push_trace e = let@ () = guard_exn_ in Batch.push' batch_traces e; let now = Mtime_clock.now () in - ignore (emit_traces_maybe ~now () : bool) + ignore (emit_traces_maybe ~now httpc encoder : bool) let push_metrics e = let@ () = guard_exn_ in if Atomic.get needs_gc_metrics then sample_gc_metrics (); Batch.push' batch_metrics e; let now = Mtime_clock.now () in - ignore (emit_metrics_maybe ~now () : bool) + ignore (emit_metrics_maybe ~now httpc encoder : bool) let push_logs e = let@ () = guard_exn_ in Batch.push' batch_logs e; let now = Mtime_clock.now () in - ignore (emit_logs_maybe ~now () : bool) + ignore (emit_logs_maybe ~now httpc encoder : bool) let set_on_tick_callbacks = set_on_tick_callbacks let tick () = if Atomic.get needs_gc_metrics then sample_gc_metrics (); let now = Mtime_clock.now () in - ignore (emit_traces_maybe ~now () : bool); - ignore (emit_metrics_maybe ~now () : bool); - ignore (emit_logs_maybe ~now () : bool); + ignore (emit_traces_maybe ~now httpc encoder : bool); + ignore (emit_metrics_maybe ~now httpc encoder : bool); + ignore (emit_logs_maybe ~now httpc encoder : bool); () - let cleanup = cleanup + let cleanup () = + if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!"; + emit_all_force httpc encoder; + Httpc.cleanup httpc end in (module M) ) module Backend (Arg : sig + val stop : bool Atomic.t + val config : Config.t end) () : Opentelemetry.Collector.BACKEND = struct - include (val mk_emitter ~config:Arg.config ()) + include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ()) open Opentelemetry.Proto open Opentelemetry.Collector @@ -574,11 +597,13 @@ end) } end -let setup_ ~(config : Config.t) () = +let setup_ ?(stop = Atomic.make false) ~(config : Config.t) () = debug_ := config.debug; let module B = Backend (struct + let stop = stop + let config = config end) () @@ -586,15 +611,15 @@ let setup_ ~(config : Config.t) () = Opentelemetry.Collector.set_backend (module B); B.cleanup -let setup ?(config = Config.make ()) ?(enable = true) () = +let setup ?stop ?(config = Config.make ()) ?(enable = true) () = if enable then ( - let cleanup = setup_ ~config () in + let cleanup = setup_ ?stop ~config () in at_exit cleanup ) -let with_setup ?(config = Config.make ()) ?(enable = true) () f = +let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f = if enable then ( - let cleanup = setup_ ~config () in + let cleanup = setup_ ?stop ~config () in Fun.protect ~finally:cleanup f ) else f () diff --git a/src/client/opentelemetry_client_ocurl.mli b/src/client/opentelemetry_client_ocurl.mli index 9f8bc053..250767d9 100644 --- a/src/client/opentelemetry_client_ocurl.mli +++ b/src/client/opentelemetry_client_ocurl.mli @@ -20,13 +20,24 @@ val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit module Config = Config -val setup : ?config:Config.t -> ?enable:bool -> unit -> unit +val setup : + ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. @param enable actually setup the backend (default true). This can be used to enable/disable the setup depending on CLI arguments or environment. - @param config configuration to use *) - -val with_setup : ?config:Config.t -> ?enable:bool -> unit -> (unit -> 'a) -> 'a + @param config configuration to use + @param stop an atomic boolean. When it becomes true, background threads + will all stop after a little while. +*) + +val with_setup : + ?stop:bool Atomic.t -> + ?config:Config.t -> + ?enable:bool -> + unit -> + (unit -> 'a) -> + 'a (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up - after [f()] returns. *) + after [f()] returns + See {!setup} for more details. *) diff --git a/tests/bin/emit1.ml b/tests/bin/emit1.ml index 385dae36..d5196413 100644 --- a/tests/bin/emit1.ml +++ b/tests/bin/emit1.ml @@ -98,6 +98,7 @@ let () = let debug = ref false in let thread = ref true in + let n_bg_threads = ref 0 in let batch_traces = ref 400 in let batch_metrics = ref 3 in let opts = @@ -114,6 +115,7 @@ let () = "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; "-j", Arg.Set_int n_jobs, " number of parallel jobs"; + "--bg-threads", Arg.Set_int n_bg_threads, " number of background threads"; ] |> Arg.align in @@ -130,7 +132,14 @@ let () = Opentelemetry_client_ocurl.Config.make ~debug:!debug ~batch_traces:(some_if_nzero batch_traces) ~batch_metrics:(some_if_nzero batch_metrics) - ~thread:!thread () + ~thread:!thread + ?bg_threads: + (let n = !n_bg_threads in + if n = 0 then + None + else + Some n) + () in Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@." !sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config; @@ -142,4 +151,4 @@ let () = Printf.printf "\ndone. %d spans in %.4fs (%.4f/s)\n%!" (Atomic.get num_tr) elapsed n_per_sec) in - Opentelemetry_client_ocurl.with_setup ~config () run + Opentelemetry_client_ocurl.with_setup ~stop ~config () run From 6179c97e9958b4862a148aa3151f7cfc1d9bbfc5 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 13:41:03 -0400 Subject: [PATCH 04/20] add high watermark to ocurl client --- src/client/opentelemetry_client_ocurl.ml | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 6d7671c2..4591c2eb 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -170,12 +170,14 @@ end = struct mutable size: int; mutable q: 'a list; batch: int option; + high_watermark: int; timeout: Mtime.span option; mutable start: Mtime.t; } let make ?batch ?timeout () : _ t = Option.iter (fun b -> assert (b > 0)) batch; + let high_watermark = Option.fold ~none:100 ~some:(fun x -> x * 10) batch in { lock = Mutex.create (); size = 0; @@ -183,6 +185,7 @@ end = struct q = []; batch; timeout; + high_watermark; } let is_empty_ self = self.size = 0 @@ -219,11 +222,19 @@ end = struct let push (self : _ t) x : bool = let@ () = with_mutex_ self.lock in if self.size = 0 && Option.is_some self.timeout then + (* current batch starts now *) self.start <- Mtime_clock.now (); - self.size <- 1 + self.size; - self.q <- x :: self.q; - let ready = is_full_ self in - ready + if self.size >= self.high_watermark then ( + (* drop this to prevent queue from growing too fast *) + Atomic.incr n_dropped; + true + ) else ( + (* add to queue *) + self.size <- 1 + self.size; + self.q <- x :: self.q; + let ready = is_full_ self in + ready + ) let push' self x = ignore (push self x : bool) end From 6d72a6fac0b84ae972e4b590e3e150d48d659840 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 13:42:52 -0400 Subject: [PATCH 05/20] fix error on 4.08 --- src/client/opentelemetry_client_ocurl.mli | 1 + 1 file changed, 1 insertion(+) diff --git a/src/client/opentelemetry_client_ocurl.mli b/src/client/opentelemetry_client_ocurl.mli index 250767d9..4292586f 100644 --- a/src/client/opentelemetry_client_ocurl.mli +++ b/src/client/opentelemetry_client_ocurl.mli @@ -18,6 +18,7 @@ val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit (** Set a lock/unlock pair to protect the critical sections of {!Opentelemetry.Collector.BACKEND} *) +module Atomic = Opentelemetry_atomic.Atomic module Config = Config val setup : From 053493db8be66789cb7749e144601f8b1179b10f Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 14:22:53 -0400 Subject: [PATCH 06/20] limit scope of locks --- src/client/opentelemetry_client_ocurl.ml | 109 +++++++++++++---------- 1 file changed, 64 insertions(+), 45 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 4591c2eb..9ce2fd75 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -143,6 +143,9 @@ end module type BATCH = sig end +(** Batch of resources to be pushed later. + + This type is thread-safe. *) module Batch : sig type 'a t @@ -164,6 +167,7 @@ module Batch : sig @param now passed to implement timeout *) val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t + (** Create a new batch *) end = struct type 'a t = { lock: Mutex.t; @@ -266,12 +270,6 @@ let start_bg_thread (f : unit -> unit) : unit = in ignore (Thread.create run () : Thread.t) -let l_is_empty = function - | [] -> true - | _ :: _ -> false - -let batch_is_empty = List.for_all l_is_empty - (* make an emitter. exceptions inside should be caught, see @@ -295,9 +293,9 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let batch_logs : Logs.resource_logs list Batch.t = Batch.make ?batch:config.batch_logs ?timeout () - let on_tick_cbs_ = ref (ref []) + let on_tick_cbs_ = Atomic.make (ref []) - let set_on_tick_callbacks = ( := ) on_tick_cbs_ + let set_on_tick_callbacks = Atomic.set on_tick_cbs_ let send_http_ (httpc : Httpc.t) encoder ~path ~encode x : unit = Pbrt.Encoder.reset encoder; @@ -373,13 +371,39 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = ignore (emit_traces_maybe ~now ~force:true httpc encoder : bool); ignore (emit_logs_maybe ~now ~force:true httpc encoder : bool); ignore (emit_metrics_maybe ~now ~force:true httpc encoder : bool) + + let tick_common_ () = + if Atomic.get needs_gc_metrics then sample_gc_metrics (); + List.iter + (fun f -> + try f () + with e -> + Printf.eprintf "on tick callback raised: %s\n" + (Printexc.to_string e)) + !(Atomic.get on_tick_cbs_); + () + + let setup_ticker_thread ~tick ~finally () = + (* thread that calls [tick()] regularly, to help enforce timeouts *) + let tick_thread () = + let@ () = + Fun.protect ~finally:(fun () -> + Atomic.set stop true; + finally ()) + in + while not @@ Atomic.get stop do + Thread.delay 0.5; + tick () + done + in + start_bg_thread tick_thread end in - if config.bg_threads > 0 then ( - (let m = Mutex.create () in - Lock.set_mutex - ~lock:(fun () -> Mutex.lock m) - ~unlock:(fun () -> Mutex.unlock m)); + (let m = Mutex.create () in + Lock.set_mutex + ~lock:(fun () -> Mutex.lock m) + ~unlock:(fun () -> Mutex.unlock m)); + if config.bg_threads > 0 then ( let m = Mutex.create () in let cond = Condition.create () in @@ -420,14 +444,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = in let tick () = - if Atomic.get needs_gc_metrics then sample_gc_metrics (); - List.iter - (fun f -> - try f () - with e -> - Printf.eprintf "on tick callback raised: %s\n" - (Printexc.to_string e)) - !(!on_tick_cbs_); + tick_common_ (); let now = Mtime_clock.now () in if Atomic.get stop then @@ -440,18 +457,8 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = wakeup ~all:false () in - if config.ticker_thread then ( - (* thread that calls [tick()] regularly, to help enforce timeouts *) - let tick_thread () = - while not @@ Atomic.get stop do - Thread.delay 0.5; - tick () - done; - wakeup ~all:true () - in - - start_bg_thread tick_thread - ); + if config.ticker_thread then + setup_ticker_thread ~tick ~finally:(fun () -> wakeup ~all:true ()) (); let module M = struct let push_trace e = if Batch.push batch_traces e then wakeup ~all:false () @@ -468,8 +475,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let cleanup () = Atomic.set stop true; if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!"; - (* wakeup everyone *) - with_mutex_ m (fun () -> Condition.broadcast cond) + wakeup ~all:true () end in (module M) ) else ( @@ -477,10 +483,16 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let encoder = Pbrt.Encoder.create () in let module M = struct + (* we make sure that this is thread-safe, even though we don't have a + background thread. There can still be a ticker thread, and there + can also be several user threads that produce spans and call + the emit functions. *) + let push_trace e = let@ () = guard_exn_ in Batch.push' batch_traces e; let now = Mtime_clock.now () in + let@ () = Lock.with_lock in ignore (emit_traces_maybe ~now httpc encoder : bool) let push_metrics e = @@ -488,24 +500,32 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = if Atomic.get needs_gc_metrics then sample_gc_metrics (); Batch.push' batch_metrics e; let now = Mtime_clock.now () in + let@ () = Lock.with_lock in ignore (emit_metrics_maybe ~now httpc encoder : bool) let push_logs e = let@ () = guard_exn_ in Batch.push' batch_logs e; let now = Mtime_clock.now () in + let@ () = Lock.with_lock in ignore (emit_logs_maybe ~now httpc encoder : bool) let set_on_tick_callbacks = set_on_tick_callbacks let tick () = if Atomic.get needs_gc_metrics then sample_gc_metrics (); + let@ () = Lock.with_lock in let now = Mtime_clock.now () in ignore (emit_traces_maybe ~now httpc encoder : bool); ignore (emit_metrics_maybe ~now httpc encoder : bool); ignore (emit_logs_maybe ~now httpc encoder : bool); () + (* make sure we have a ticker thread, if required *) + let () = + if config.ticker_thread then + setup_ticker_thread ~tick ~finally:ignore () + let cleanup () = if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!"; emit_all_force httpc encoder; @@ -529,11 +549,11 @@ end) { send = (fun l ~ret -> - let@ () = Lock.with_lock in - if !debug_ then + (if !debug_ then + let@ () = Lock.with_lock in Format.eprintf "send spans %a@." (Format.pp_print_list Trace.pp_resource_spans) - l; + l); push_trace l; ret ()); } @@ -581,12 +601,11 @@ end) { send = (fun m ~ret -> - if !debug_ then + (if !debug_ then + let@ () = Lock.with_lock in Format.eprintf "send metrics %a@." (Format.pp_print_list Metrics.pp_resource_metrics) - m; - - let@ () = Lock.with_lock in + m); let m = List.rev_append (additional_metrics ()) m in push_metrics m; @@ -597,12 +616,12 @@ end) { send = (fun m ~ret -> - if !debug_ then + (if !debug_ then + let@ () = Lock.with_lock in Format.eprintf "send logs %a@." (Format.pp_print_list Logs.pp_resource_logs) - m; + m); - let@ () = Lock.with_lock in push_logs m; ret ()); } From 84be273b76f17a74c665720df97e3ee0443d0376 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 14:27:09 -0400 Subject: [PATCH 07/20] small refactor --- src/client/opentelemetry_client_ocurl.ml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 9ce2fd75..0d8b9b95 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -225,14 +225,15 @@ end = struct let push (self : _ t) x : bool = let@ () = with_mutex_ self.lock in - if self.size = 0 && Option.is_some self.timeout then - (* current batch starts now *) - self.start <- Mtime_clock.now (); if self.size >= self.high_watermark then ( (* drop this to prevent queue from growing too fast *) Atomic.incr n_dropped; true ) else ( + if self.size = 0 && Option.is_some self.timeout then + (* current batch starts now *) + self.start <- Mtime_clock.now (); + (* add to queue *) self.size <- 1 + self.size; self.q <- x :: self.q; From 0a8e60ba460646c0ac0f361518f7519a73e86449 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 14:56:27 -0400 Subject: [PATCH 08/20] better debug --- src/client/opentelemetry_client_ocurl.ml | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 0d8b9b95..a1efb407 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -361,11 +361,11 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = send_logs_http httpc encoder l; true - let[@inline] guard_exn_ f = + let[@inline] guard_exn_ where f = try f () with e -> - Printf.eprintf "opentelemetry-curl: uncaught exception: %s\n%!" - (Printexc.to_string e) + Printf.eprintf "opentelemetry-curl: uncaught exception in %s: %s\n%!" + where (Printexc.to_string e) let emit_all_force (httpc : Httpc.t) encoder = let now = Mtime_clock.now () in @@ -413,7 +413,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let httpc = Httpc.create () in let encoder = Pbrt.Encoder.create () in while not @@ Atomic.get stop do - let@ () = guard_exn_ in + let@ () = guard_exn_ "bg thread (main loop)" in let now = Mtime_clock.now () in let do_metrics = emit_metrics_maybe ~now httpc encoder in @@ -425,7 +425,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = Condition.wait cond m done; (* flush remaining events once we exit *) - let@ () = guard_exn_ in + let@ () = guard_exn_ "bg thread (cleanup)" in emit_all_force httpc encoder; Httpc.cleanup httpc in @@ -490,14 +490,14 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = the emit functions. *) let push_trace e = - let@ () = guard_exn_ in + let@ () = guard_exn_ "push trace" in Batch.push' batch_traces e; let now = Mtime_clock.now () in let@ () = Lock.with_lock in ignore (emit_traces_maybe ~now httpc encoder : bool) let push_metrics e = - let@ () = guard_exn_ in + let@ () = guard_exn_ "push metrics" in if Atomic.get needs_gc_metrics then sample_gc_metrics (); Batch.push' batch_metrics e; let now = Mtime_clock.now () in @@ -505,7 +505,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = ignore (emit_metrics_maybe ~now httpc encoder : bool) let push_logs e = - let@ () = guard_exn_ in + let@ () = guard_exn_ "push logs" in Batch.push' batch_logs e; let now = Mtime_clock.now () in let@ () = Lock.with_lock in @@ -575,6 +575,8 @@ end) Mtime.Span.compare elapsed timeout_sent_metrics > 0 in + (* there is a possible race condition here, as several threads might update + metrics at the same time. But that's harmless. *) if add_own_metrics then ( let open OT.Metrics in Atomic.set last_sent_metrics now; From 88ce296d2195c20d80492ffaf5ad80d358798ce7 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 15:27:32 -0400 Subject: [PATCH 09/20] small refactor --- src/client/opentelemetry_client_ocurl.ml | 26 ++++++++++++++++-------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index a1efb407..cbcc93f1 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -364,8 +364,10 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let[@inline] guard_exn_ where f = try f () with e -> - Printf.eprintf "opentelemetry-curl: uncaught exception in %s: %s\n%!" - where (Printexc.to_string e) + let bt = Printexc.get_backtrace () in + Printf.eprintf + "opentelemetry-curl: uncaught exception in %s: %s\n%s\n%!" where + (Printexc.to_string e) bt let emit_all_force (httpc : Httpc.t) encoder = let now = Mtime_clock.now () in @@ -399,12 +401,15 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = in start_bg_thread tick_thread end in - (let m = Mutex.create () in + (* setup a global lock *) + (let global_lock_ = Mutex.create () in Lock.set_mutex - ~lock:(fun () -> Mutex.lock m) - ~unlock:(fun () -> Mutex.unlock m)); + ~lock:(fun () -> Mutex.lock global_lock_) + ~unlock:(fun () -> Mutex.unlock global_lock_)); if config.bg_threads > 0 then ( + (* lock+condition used for background threads to wait, and be woken up + when a batch is ready *) let m = Mutex.create () in let cond = Condition.create () in @@ -419,10 +424,13 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let do_metrics = emit_metrics_maybe ~now httpc encoder in let do_traces = emit_traces_maybe ~now httpc encoder in let do_logs = emit_logs_maybe ~now httpc encoder in - if (not do_metrics) && (not do_traces) && not do_logs then + if (not do_metrics) && (not do_traces) && not do_logs then ( + let@ () = guard_exn_ "bg thread (waiting)" in (* wait for something to happen *) - let@ () = with_mutex_ m in - Condition.wait cond m + Mutex.lock m; + Condition.wait cond m; + Mutex.unlock m + ) done; (* flush remaining events once we exit *) let@ () = guard_exn_ "bg thread (cleanup)" in @@ -578,8 +586,8 @@ end) (* there is a possible race condition here, as several threads might update metrics at the same time. But that's harmless. *) if add_own_metrics then ( - let open OT.Metrics in Atomic.set last_sent_metrics now; + let open OT.Metrics in [ make_resource_metrics [ From 9125eb5b65678b06e32f402b5cc5747afa787c8a Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 15:48:46 -0400 Subject: [PATCH 10/20] better logging; improve GC sample collection --- src/client/common_.ml | 4 +++ src/client/opentelemetry_client_ocurl.ml | 35 ++++++++++++++---------- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/src/client/common_.ml b/src/client/common_.ml index 4ad77233..3b7c8089 100644 --- a/src/client/common_.ml +++ b/src/client/common_.ml @@ -3,6 +3,10 @@ include Opentelemetry.Lock let[@inline] ( let@ ) f x = f x +let spf = Printf.sprintf + +let tid () = Thread.id @@ Thread.self () + let debug_ = ref (match Sys.getenv_opt "OTEL_OCAML_DEBUG" with diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index cbcc93f1..8280baef 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -12,16 +12,18 @@ let needs_gc_metrics = Atomic.make false let gc_metrics = AList.make () (* side channel for GC, appended to {!E_metrics}'s data *) -(* capture current GC metrics and push them into {!gc_metrics} for later +(* capture current GC metrics if {!needs_gc_metrics} is true, + and push them into {!gc_metrics} for later collection *) -let sample_gc_metrics () = - Atomic.set needs_gc_metrics false; - let l = - OT.Metrics.make_resource_metrics - ~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ()) - @@ Opentelemetry.GC_metrics.get_metrics () - in - AList.add gc_metrics l +let sample_gc_metrics_if_needed () = + if Atomic.compare_and_set needs_gc_metrics true false then ( + let l = + OT.Metrics.make_resource_metrics + ~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ()) + @@ Opentelemetry.GC_metrics.get_metrics () + in + AList.add gc_metrics l + ) module Config = Config @@ -376,7 +378,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = ignore (emit_metrics_maybe ~now ~force:true httpc encoder : bool) let tick_common_ () = - if Atomic.get needs_gc_metrics then sample_gc_metrics (); + sample_gc_metrics_if_needed (); List.iter (fun f -> try f () @@ -418,14 +420,14 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let httpc = Httpc.create () in let encoder = Pbrt.Encoder.create () in while not @@ Atomic.get stop do - let@ () = guard_exn_ "bg thread (main loop)" in + let@ () = guard_exn_ (spf "bg thread[%d] (main loop)" @@ tid ()) in let now = Mtime_clock.now () in let do_metrics = emit_metrics_maybe ~now httpc encoder in let do_traces = emit_traces_maybe ~now httpc encoder in let do_logs = emit_logs_maybe ~now httpc encoder in if (not do_metrics) && (not do_traces) && not do_logs then ( - let@ () = guard_exn_ "bg thread (waiting)" in + let@ () = guard_exn_ (spf "bg thread[%d] (waiting)" @@ tid ()) in (* wait for something to happen *) Mutex.lock m; Condition.wait cond m; @@ -506,7 +508,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let push_metrics e = let@ () = guard_exn_ "push metrics" in - if Atomic.get needs_gc_metrics then sample_gc_metrics (); + sample_gc_metrics_if_needed (); Batch.push' batch_metrics e; let now = Mtime_clock.now () in let@ () = Lock.with_lock in @@ -522,7 +524,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let set_on_tick_callbacks = set_on_tick_callbacks let tick () = - if Atomic.get needs_gc_metrics then sample_gc_metrics (); + sample_gc_metrics_if_needed (); let@ () = Lock.with_lock in let now = Mtime_clock.now () in ignore (emit_traces_maybe ~now httpc encoder : bool); @@ -572,7 +574,10 @@ end) let timeout_sent_metrics = Mtime.Span.(5 * s) (* send metrics from time to time *) - let signal_emit_gc_metrics () = Atomic.set needs_gc_metrics true + let signal_emit_gc_metrics () = + if !debug_ then + Printf.eprintf "opentelemetry: emit GC metrics requested\n%!"; + Atomic.set needs_gc_metrics true let additional_metrics () : Metrics.resource_metrics list = (* add exporter metrics to the lot? *) From 2d97dd07055d06391d902475a46229e53b8c22fa Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 16:02:49 -0400 Subject: [PATCH 11/20] debug --- src/client/opentelemetry_client_ocurl.ml | 1 + 1 file changed, 1 insertion(+) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 8280baef..6117c68a 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -378,6 +378,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = ignore (emit_metrics_maybe ~now ~force:true httpc encoder : bool) let tick_common_ () = + if !debug_ then Printf.eprintf "tick (from %d)\n%!" (tid ()); sample_gc_metrics_if_needed (); List.iter (fun f -> From 77ebf0cced5976b39401146c21d3492d8acb5c29 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 16:21:12 -0400 Subject: [PATCH 12/20] docs --- src/opentelemetry.ml | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index 71cf4669..7117fe63 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -600,7 +600,13 @@ module Trace = struct default_resource_spans ~resource:(Some resource) ~instrumentation_library_spans:[ ils ] () - (** Sync emitter *) + (** Sync emitter. + + This instructs the collector to forward + the spans to some backend at a later point. + + {b NOTE} be careful not too call this inside a Gc alarm, as it can + cause deadlocks. *) let emit ?service_name ?attrs (spans : span list) : unit = let rs = make_resource_spans ?service_name ?attrs spans in Collector.send_trace [ rs ] ~ret:(fun () -> ()) @@ -629,7 +635,10 @@ module Trace = struct if Collector.has_backend () then scope.attrs <- List.rev_append (attrs ()) scope.attrs - (** Sync span guard *) + (** Sync span guard. + + {b NOTE} be careful not too call this inside a Gc alarm, as it can + cause deadlocks. *) let with_ ?trace_state ?service_name ?(attrs : (string * [< value ]) list = []) ?kind ?trace_id ?parent ?scope ?links name (f : scope -> 'a) : 'a = @@ -771,7 +780,11 @@ module Metrics = struct (** Emit some metrics to the collector (sync). This blocks until the backend has pushed the metrics into some internal queue, or - discarded them. *) + discarded them. + + {b NOTE} be careful not too call this inside a Gc alarm, as it can + cause deadlocks. + *) let emit ?attrs (l : t list) : unit = let rm = make_resource_metrics ?attrs l in Collector.send_metrics [ rm ] ~ret:ignore @@ -851,6 +864,12 @@ module Logs = struct ?trace_id ?span_id bod) fmt + (** Emit logs. + + This instructs the collector to send the logs to some backend at + a later date. + {b NOTE} be careful not too call this inside a Gc alarm, as it can + cause deadlocks. *) let emit ?service_name ?attrs (l : t list) : unit = let attributes = Globals.mk_attributes ?service_name ?attrs () in let resource = Proto.Resource.default_resource ~attributes () in @@ -880,8 +899,10 @@ module Metrics_callbacks = struct end (** [register f] adds the callback [f] to the list. - [f] will be called at unspecified times and is expected to return - a list of metrics. *) + + [f] will be called at unspecified times and is expected to return + a list of metrics. It might be called regularly by the backend, + in particular (but not only) when {!Collector.tick} is called. *) let register f : unit = if !cbs_ = [] then (* make sure we call [f] (and others) at each tick *) From 12b5f6c76536e622557434603f8fbece259f8dbb Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 16:29:02 -0400 Subject: [PATCH 13/20] heinous typo --- src/opentelemetry.ml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index 7117fe63..a23244fa 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -605,7 +605,7 @@ module Trace = struct This instructs the collector to forward the spans to some backend at a later point. - {b NOTE} be careful not too call this inside a Gc alarm, as it can + {b NOTE} be careful not to call this inside a Gc alarm, as it can cause deadlocks. *) let emit ?service_name ?attrs (spans : span list) : unit = let rs = make_resource_spans ?service_name ?attrs spans in @@ -637,7 +637,7 @@ module Trace = struct (** Sync span guard. - {b NOTE} be careful not too call this inside a Gc alarm, as it can + {b NOTE} be careful not to call this inside a Gc alarm, as it can cause deadlocks. *) let with_ ?trace_state ?service_name ?(attrs : (string * [< value ]) list = []) ?kind ?trace_id ?parent ?scope @@ -782,7 +782,7 @@ module Metrics = struct the backend has pushed the metrics into some internal queue, or discarded them. - {b NOTE} be careful not too call this inside a Gc alarm, as it can + {b NOTE} be careful not to call this inside a Gc alarm, as it can cause deadlocks. *) let emit ?attrs (l : t list) : unit = @@ -868,7 +868,7 @@ module Logs = struct This instructs the collector to send the logs to some backend at a later date. - {b NOTE} be careful not too call this inside a Gc alarm, as it can + {b NOTE} be careful not to call this inside a Gc alarm, as it can cause deadlocks. *) let emit ?service_name ?attrs (l : t list) : unit = let attributes = Globals.mk_attributes ?service_name ?attrs () in From 50743d3097984e504d46b4bd30b1bf8034290534 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 19:11:26 -0400 Subject: [PATCH 14/20] do not decode result of http post --- src/client/opentelemetry_client_ocurl.ml | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 6117c68a..61cc8b76 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -68,7 +68,7 @@ module Httpc : sig val send : t -> path:string -> - decode:(Pbrt.Decoder.t -> 'a) -> + decode:[ `Dec of Pbrt.Decoder.t -> 'a | `Ret of 'a ] -> string -> ('a, error) result @@ -122,11 +122,17 @@ end = struct let code = Curl.get_responsecode curl in if !debug_ then Printf.eprintf "result body: %S\n%!" (Buffer.contents buf_res); - let dec = Pbrt.Decoder.of_string (Buffer.contents buf_res) in if code >= 200 && code < 300 then ( - let res = decode dec in - Ok res + match decode with + | `Ret x -> Ok x + | `Dec f -> + let dec = Pbrt.Decoder.of_string (Buffer.contents buf_res) in + (try Ok (f dec) + with e -> + Error + (`Failure ("decoding failed with:\n" ^ Printexc.to_string e))) ) else ( + let dec = Pbrt.Decoder.of_string (Buffer.contents buf_res) in let status = Status.decode_status dec in Error (`Status (code, status)) ) @@ -304,7 +310,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = Pbrt.Encoder.reset encoder; encode x encoder; let data = Pbrt.Encoder.to_string encoder in - match Httpc.send httpc ~path ~decode:(fun _ -> ()) data with + match Httpc.send httpc ~path ~decode:(`Ret ()) data with | Ok () -> () | Error `Sysbreak -> Printf.eprintf "ctrl-c captured, stopping\n%!"; From 4c9876443250d5f576a44e550d930f84849d6580 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 19:14:40 -0400 Subject: [PATCH 15/20] reduce impact of error --- src/client/opentelemetry_client_ocurl.ml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 61cc8b76..3979a317 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -318,7 +318,9 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = | Error err -> (* TODO: log error _via_ otel? *) Atomic.incr n_errors; - report_err_ err + report_err_ err; + (* avoid crazy error loop *) + Thread.delay 3. let send_metrics_http curl encoder (l : Metrics.resource_metrics list list) = From 7eb75d66d66b2e8b256d4313d1cfe8f8e154d4e2 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 19:23:32 -0400 Subject: [PATCH 16/20] better error reporting --- src/client/opentelemetry_client_ocurl.ml | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 3979a317..3117db24 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -129,8 +129,11 @@ end = struct let dec = Pbrt.Decoder.of_string (Buffer.contents buf_res) in (try Ok (f dec) with e -> + let bt = Printexc.get_backtrace () in Error - (`Failure ("decoding failed with:\n" ^ Printexc.to_string e))) + (`Failure + (spf "decoding failed with:\n%s\n%s" (Printexc.to_string e) + bt))) ) else ( let dec = Pbrt.Decoder.of_string (Buffer.contents buf_res) in let status = Status.decode_status dec in @@ -146,7 +149,10 @@ end = struct Error (`Status (code, status)) with | Sys.Break -> Error `Sysbreak - | e -> Error (`Failure (Printexc.to_string e)) + | e -> + let bt = Printexc.get_backtrace () in + Error + (`Failure (spf "httpc: failed with:\n%s\n%s" (Printexc.to_string e) bt)) end module type BATCH = sig end From 8495d469ee50ddca2af7b559eec189ac22102d84 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 19:24:24 -0400 Subject: [PATCH 17/20] improve error reporting --- src/client/opentelemetry_client_ocurl.ml | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 3117db24..358c7386 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -136,8 +136,15 @@ end = struct bt))) ) else ( let dec = Pbrt.Decoder.of_string (Buffer.contents buf_res) in - let status = Status.decode_status dec in - Error (`Status (code, status)) + try + let status = Status.decode_status dec in + Error (`Status (code, status)) + with e -> + let bt = Printexc.get_backtrace () in + Error + (`Failure + (spf "decoding of status failed with:\n%s\n%s" + (Printexc.to_string e) bt)) ) | exception Sys.Break -> Error `Sysbreak | exception Curl.CurlException (_, code, msg) -> From d3b366f04bf4b2066d184d6880523b1e8f692fb0 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 19:38:36 -0400 Subject: [PATCH 18/20] print received status when httpclient fails --- src/client/opentelemetry_client_ocurl.ml | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 358c7386..eb1ddaf8 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -135,7 +135,9 @@ end = struct (spf "decoding failed with:\n%s\n%s" (Printexc.to_string e) bt))) ) else ( - let dec = Pbrt.Decoder.of_string (Buffer.contents buf_res) in + let str = Buffer.contents buf_res in + let dec = Pbrt.Decoder.of_string str in + try let status = Status.decode_status dec in Error (`Status (code, status)) @@ -143,8 +145,12 @@ end = struct let bt = Printexc.get_backtrace () in Error (`Failure - (spf "decoding of status failed with:\n%s\n%s" - (Printexc.to_string e) bt)) + (spf + "decoding of status (code=%d) failed with:\n\ + %s\n\ + status: %S\n\ + %s" + code (Printexc.to_string e) str bt)) ) | exception Sys.Break -> Error `Sysbreak | exception Curl.CurlException (_, code, msg) -> From 16667a3fcf9683cea6ede1cf809a348926f9c2fb Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 19:49:12 -0400 Subject: [PATCH 19/20] improved logging for ocurl exporter --- src/client/opentelemetry_client_ocurl.ml | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index eb1ddaf8..1ee1a918 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -93,7 +93,8 @@ end = struct let { curl; buf_res } = self in Curl.reset curl; if !debug_ then Curl.set_verbose curl true; - Curl.set_url curl (!url ^ path); + let full_url = !url ^ path in + Curl.set_url curl full_url; Curl.set_httppost curl []; let to_http_header (k, v) = Printf.sprintf "%s: %s" k v in let http_headers = List.map to_http_header !headers in @@ -146,11 +147,11 @@ end = struct Error (`Failure (spf - "decoding of status (code=%d) failed with:\n\ + "httpc: decoding of status (url=%S, code=%d) failed with:\n\ %s\n\ status: %S\n\ %s" - code (Printexc.to_string e) str bt)) + full_url code (Printexc.to_string e) str bt)) ) | exception Sys.Break -> Error `Sysbreak | exception Curl.CurlException (_, code, msg) -> @@ -165,11 +166,11 @@ end = struct | e -> let bt = Printexc.get_backtrace () in Error - (`Failure (spf "httpc: failed with:\n%s\n%s" (Printexc.to_string e) bt)) + (`Failure + (spf "httpc: post on url=%S failed with:\n%s\n%s" full_url + (Printexc.to_string e) bt)) end -module type BATCH = sig end - (** Batch of resources to be pushed later. This type is thread-safe. *) From 41e9962c088327061bc10866710bbb25b7f211fe Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 19:55:06 -0400 Subject: [PATCH 20/20] fix: do not emit empty batches --- src/client/opentelemetry_client_ocurl.ml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 1ee1a918..d0f5f62e 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -240,13 +240,12 @@ end = struct let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = let@ () = with_mutex_ self.lock in - if - (force && not (is_empty_ self)) - || is_full_ self || timeout_expired_ ~now self + if self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self) then ( let l = self.q in self.q <- []; self.size <- 0; + assert (l <> []); Some l ) else None