From 55977b13d810f93591495fa543660eb56b5b7047 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 17 Oct 2024 15:06:45 -0400 Subject: [PATCH] fix: wait for cleanup in cohttp client in `Opentelemetry_client_cohttp_lwt.with_setup` we should now wait for the cleanup to be done, by sneaking in a `unit Lwt.u` that is only resolved after the cleanup is done. close #41 --- .../opentelemetry_client_cohttp_lwt.ml | 50 ++++++++++++++----- .../opentelemetry_client_cohttp_lwt.mli | 7 ++- 2 files changed, 43 insertions(+), 14 deletions(-) diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml index 5894f8b7..1cf7eeb9 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml @@ -258,7 +258,8 @@ end exceptions inside should be caught, see https://opentelemetry.io/docs/reference/specification/error-handling/ *) -let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = +let mk_emitter ~(after_cleanup : unit Lwt.u option) ~stop ~(config : Config.t) + () : (module EMITTER) = let open Proto in let open Lwt.Syntax in (* local helpers *) @@ -448,6 +449,8 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = Lwt.async (fun () -> let* () = emit_all_force httpc encoder in Httpc.cleanup httpc; + (* resolve [after_cleanup], if provided *) + Option.iter (fun prom -> Lwt.wakeup_later prom ()) after_cleanup; Lwt.return ()) end in (module M) @@ -457,9 +460,13 @@ module Backend val stop : bool Atomic.t val config : Config.t + + val after_cleanup : unit Lwt.u option end) () : Opentelemetry.Collector.BACKEND = struct - include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ()) + include + (val mk_emitter ~after_cleanup:Arg.after_cleanup ~stop:Arg.stop + ~config:Arg.config ()) open Opentelemetry.Proto open Opentelemetry.Collector @@ -551,7 +558,8 @@ module Backend } end -let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) () = +let create_backend ?after_cleanup ?(stop = Atomic.make false) + ?(config = Config.make ()) () = debug_ := config.debug; let module B = @@ -560,25 +568,43 @@ let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) () = let stop = stop let config = config + + let after_cleanup = after_cleanup end) () in (module B : OT.Collector.BACKEND) -let setup_ ?stop ?config () = - let backend = create_backend ?stop ?config () in +let setup_ ?stop ?config () : (unit -> unit) * unit Lwt.t = + let cleanup_done, cleanup_done_prom = Lwt.wait () in + let backend = + create_backend ~after_cleanup:cleanup_done_prom ?stop ?config () + in OT.Collector.set_backend backend; - OT.Collector.remove_backend + + OT.Collector.remove_backend, cleanup_done let setup ?stop ?config ?(enable = true) () = if enable then ( - let cleanup = setup_ ?stop ?config () in + let cleanup, _lwt = setup_ ?stop ?config () in at_exit cleanup ) -let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f = - if enable then ( - let cleanup = setup_ ?stop ~config () in - Fun.protect ~finally:cleanup f - ) else +let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t + = + if enable then + let open Lwt.Syntax in + let cleanup, cleanup_done = setup_ ?stop ~config () in + + Lwt.catch + (fun () -> + let* res = f () in + cleanup (); + let+ () = cleanup_done in + res) + (fun exn -> + cleanup (); + let* () = cleanup_done in + Lwt.reraise exn) + else f () diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli index 09f64e7a..c57332ab 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli @@ -13,10 +13,13 @@ val set_headers : (string * string) list -> unit module Config = Config val create_backend : + ?after_cleanup:unit Lwt.u -> ?stop:bool Atomic.t -> ?config:Config.t -> unit -> (module Opentelemetry.Collector.BACKEND) +(** Create a new backend using lwt and cohttp + @param after_cleanup if provided, this is resolved into [()] after cleanup is done (since NEXT_RELEASE) *) val setup : ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit @@ -34,8 +37,8 @@ val with_setup : ?config:Config.t -> ?enable:bool -> unit -> - (unit -> 'a) -> - 'a + (unit -> 'a Lwt.t) -> + 'a Lwt.t (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after [f()] returns See {!setup} for more details. *)