diff --git a/async/httpaf_async.ml b/async/httpaf_async.ml index 4db6796a..104c673c 100644 --- a/async/httpaf_async.ml +++ b/async/httpaf_async.ml @@ -90,8 +90,29 @@ let read fd buffer = open Httpaf module Server = struct - let create_connection_handler ?(config=Config.default) ~request_handler ~error_handler = + module Upgrade = struct + type 'a t = + | Ignore + | Raise + | Handle of (([`Active], 'a) Socket.t -> Httpaf.Request.t -> Httpaf.Response.t -> unit Deferred.t) + + let to_handler = function + | Ignore -> (fun socket _request _response -> Fd.close (Socket.fd socket)) + | Raise -> + (fun socket _request _response -> + don't_wait_for (Fd.close (Socket.fd socket)); + failwith "Upgrades not supported by server") + | Handle handler -> handler + end + + let create_connection_handler + ?(config=Config.default) + ~upgrade_handler + ~request_handler + ~error_handler + = fun client_addr socket -> + let upgrade_handler = Upgrade.to_handler upgrade_handler in let fd = Socket.fd socket in let writev = Faraday_async.writev_of_fd fd in let request_handler = request_handler client_addr in @@ -101,6 +122,7 @@ module Server = struct let buffer = Buffer.create config.read_buffer_size in let rec reader_thread () = match Server_connection.next_read_operation conn with + | `Upgrade -> () | `Read -> (* Log.Global.printf "read(%d)%!" (Fd.to_int_exn fd); *) read fd buffer @@ -136,6 +158,8 @@ module Server = struct | `Yield -> (* Log.Global.printf "write_yield(%d)%!" (Fd.to_int_exn fd); *) Server_connection.yield_writer conn writer_thread; + | `Upgrade(request, response) -> + upgrade_handler socket request response >>> Ivar.fill write_complete | `Close _ -> (* Log.Global.printf "write_close(%d)%!" (Fd.to_int_exn fd); *) Ivar.fill write_complete (); diff --git a/async/httpaf_async.mli b/async/httpaf_async.mli index f120624d..a4767e82 100644 --- a/async/httpaf_async.mli +++ b/async/httpaf_async.mli @@ -1,11 +1,17 @@ -open! Core open Async - open Httpaf module Server : sig + module Upgrade : sig + type 'a t = + | Ignore + | Raise + | Handle of (([`Active], 'a) Socket.t -> Request.t -> Response.t -> unit Deferred.t) + end + val create_connection_handler : ?config : Config.t + -> upgrade_handler : 'a Upgrade.t -> request_handler : ('a -> Server_connection.request_handler) -> error_handler : ('a -> Server_connection.error_handler) -> ([< Socket.Address.t] as 'a) diff --git a/examples/async/async_echo_post.ml b/examples/async/async_echo_post.ml index 0f524a2a..2787c469 100644 --- a/examples/async/async_echo_post.ml +++ b/examples/async/async_echo_post.ml @@ -10,7 +10,7 @@ let main port max_accepts_per_batch () = let where_to_listen = Tcp.Where_to_listen.of_port port in Tcp.(Server.create_sock ~on_handler_error:`Raise ~backlog:10_000 ~max_connections:10_000 ~max_accepts_per_batch where_to_listen) - (Server.create_connection_handler ~request_handler ~error_handler) + (Server.create_connection_handler ~upgrade_handler:Raise ~request_handler ~error_handler) >>= fun _server -> Stdio.printf "Listening on port %i and echoing POST requests.\n" port; Stdio.printf "To send a POST request, try one of the following\n\n"; diff --git a/examples/lwt/lwt_echo_post.ml b/examples/lwt/lwt_echo_post.ml index 18307107..603011f7 100644 --- a/examples/lwt/lwt_echo_post.ml +++ b/examples/lwt/lwt_echo_post.ml @@ -12,7 +12,7 @@ let main port = Lwt.async (fun () -> Lwt_io.establish_server_with_client_socket listen_address - (Server.create_connection_handler ~request_handler ~error_handler) + (Server.create_connection_handler ~upgrade_handler:Raise ~request_handler ~error_handler) >|= fun _server -> Stdio.printf "Listening on port %i and echoing POST requests.\n" port; Stdio.printf "To send a POST request, try one of the following\n\n"; diff --git a/lib/httpaf.mli b/lib/httpaf.mli index b2c341f2..69e86ec7 100644 --- a/lib/httpaf.mli +++ b/lib/httpaf.mli @@ -637,6 +637,8 @@ module Reqd : sig val respond_with_bigstring : t -> Response.t -> Bigstringaf.t -> unit val respond_with_streaming : ?flush_headers_immediately:bool -> t -> Response.t -> [`write] Body.t + val respond_with_upgrade : ?reason:string -> t -> Headers.t -> unit + (** {3 Exception Handling} *) val report_exn : t -> exn -> unit @@ -678,7 +680,7 @@ module Server_connection : sig (** [create ?config ?error_handler ~request_handler] creates a connection handler that will service individual requests with [request_handler]. *) - val next_read_operation : t -> [ `Read | `Yield | `Close ] + val next_read_operation : t -> [ `Read | `Yield | `Close | `Upgrade ] (** [next_read_operation t] returns a value describing the next operation that the caller should conduct on behalf of the connection. *) @@ -705,6 +707,7 @@ module Server_connection : sig val next_write_operation : t -> [ | `Write of Bigstringaf.t IOVec.t list | `Yield + | `Upgrade of Request.t * Response.t | `Close of int ] (** [next_write_operation t] returns a value describing the next operation that the caller should conduct on behalf of the connection. *) diff --git a/lib/reqd.ml b/lib/reqd.ml index c74b4f95..f240fde4 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -34,10 +34,29 @@ type error = [ `Bad_request | `Bad_gateway | `Internal_server_error | `Exn of exn ] -type response_state = - | Waiting of Optional_thunk.t ref - | Complete of Response.t - | Streaming of Response.t * [`write] Body.t +module Response_state = struct + type t = + | Waiting of Optional_thunk.t ref + | Upgrade of Response.t + | Complete of Response.t + | Streaming of Response.t * [`write] Body.t +end + +module Input_state = struct + type t = + | Provide + | Wait + | Complete + | Upgrade +end + +module Output_state = struct + type t = + | Ready + | Wait + | Complete + | Upgrade +end type error_handler = ?request:Request.t -> error -> (Headers.t -> [`write] Body.t) -> unit @@ -74,7 +93,7 @@ type t = ; response_body_buffer : Bigstringaf.t ; error_handler : error_handler ; mutable persistent : bool - ; mutable response_state : response_state + ; mutable response_state : Response_state.t ; mutable error_code : [`Ok | error ] } @@ -101,13 +120,15 @@ let response { response_state; _ } = match response_state with | Waiting _ -> None | Streaming(response, _) - | Complete (response) -> Some response + | Upgrade response + | Complete response -> Some response let response_exn { response_state; _ } = match response_state with | Waiting _ -> failwith "httpaf.Reqd.response_exn: response has not started" | Streaming(response, _) - | Complete (response) -> response + | Upgrade response + | Complete response -> response let respond_with_string t response str = if t.error_code <> `Ok then @@ -115,7 +136,7 @@ let respond_with_string t response str = match t.response_state with | Waiting when_done_waiting -> (* XXX(seliopou): check response body length *) - Writer.write_response t.writer response; + Writer.write_response t.writer response; Writer.write_string t.writer str; if t.persistent then t.persistent <- Response.persistent_connection response; @@ -123,6 +144,7 @@ let respond_with_string t response str = done_waiting when_done_waiting | Streaming _ -> failwith "httpaf.Reqd.respond_with_string: response already started" + | Upgrade _ | Complete _ -> failwith "httpaf.Reqd.respond_with_string: response already complete" @@ -140,6 +162,7 @@ let respond_with_bigstring t response (bstr:Bigstringaf.t) = done_waiting when_done_waiting | Streaming _ -> failwith "httpaf.Reqd.respond_with_bigstring: response already started" + | Upgrade _ | Complete _ -> failwith "httpaf.Reqd.respond_with_bigstring: response already complete" @@ -156,6 +179,7 @@ let unsafe_respond_with_streaming ~flush_headers_immediately t response = response_body | Streaming _ -> failwith "httpaf.Reqd.respond_with_streaming: response already started" + | Upgrade _ | Complete _ -> failwith "httpaf.Reqd.respond_with_streaming: response already complete" @@ -164,6 +188,19 @@ let respond_with_streaming ?(flush_headers_immediately=false) t response = failwith "httpaf.Reqd.respond_with_streaming: invalid state, currently handling error"; unsafe_respond_with_streaming ~flush_headers_immediately t response +let respond_with_upgrade ?reason t headers = + match t.response_state with + | Waiting when_done_waiting -> + let response = Response.create ?reason ~headers `Switching_protocols in + t.response_state <- Upgrade response; + Body.close_reader t.request_body; + done_waiting when_done_waiting + | Streaming _ -> + failwith "httpaf.Reqd.respond_with_streaming: response already started" + | Upgrade _ + | Complete _ -> + failwith "httpaf.Reqd.respond_with_streaming: response already complete" + let report_error t error = t.persistent <- false; Body.close_reader t.request_body; @@ -187,7 +224,7 @@ let report_error t error = | Streaming(_response, response_body), `Exn _ -> Body.close_writer response_body; Writer.close_and_drain t.writer - | (Complete _ | Streaming _ | Waiting _) , _ -> + | (Complete _ | Upgrade _ | Streaming _ | Waiting _) , _ -> (* XXX(seliopou): Once additional logging support is added, log the error * in case it is not spurious. *) () @@ -216,25 +253,36 @@ let on_more_output_available t f = when_done_waiting := Optional_thunk.some f | Streaming(_, response_body) -> Body.when_ready_to_write response_body f + | Upgrade _ | Complete _ -> failwith "httpaf.Reqd.on_more_output_available: response already complete" let persistent_connection t = t.persistent -let requires_input { request_body; _ } = - not (Body.is_closed request_body) +let input_state t : Input_state.t = + match t.response_state with + | Upgrade _ -> Upgrade + | Waiting _ + | Complete _ + | Streaming _ -> + if Body.is_closed t.request_body + then Complete + else Provide +;; -let requires_output { response_state; _ } = - match response_state with - | Complete _ -> false - | Streaming (_, response_body) -> - not (Body.is_closed response_body) - || Body.has_pending_output response_body - | Waiting _ -> true - -let is_complete t = - not (requires_input t || requires_output t) +let output_state t : Output_state.t = + match t.response_state with + | Complete _ -> Complete + | Upgrade _ -> Upgrade + | Waiting _ -> Wait + | Streaming(_, response_body) -> + if Body.has_pending_output response_body + then Ready + else if Body.is_closed response_body + then Complete + else Wait +;; let flush_request_body t = let request_body = request_body t in diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 7fb9d935..0fbf6dfe 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -197,48 +197,48 @@ let set_error_and_handle ?request t error = let report_exn t exn = set_error_and_handle t (`Exn exn) -let advance_request_queue_if_necessary t = - if is_active t then begin +let rec _next_read_operation t = + if not (is_active t) then ( + if Reader.is_closed t.reader + then shutdown t; + Reader.next t.reader + ) else ( let reqd = current_reqd_exn t in - if Reqd.persistent_connection reqd then begin - if Reqd.is_complete reqd then begin + match Reqd.input_state reqd with + | Provide -> Reader.next t.reader + | Wait -> `Yield + | Complete -> + (match Reqd.output_state reqd with + | Complete -> ignore (Queue.take t.request_queue); - if not (Queue.is_empty t.request_queue) - then t.request_handler (current_reqd_exn t); - wakeup_reader t; - end - end else begin - ignore (Queue.take t.request_queue); - Queue.iter Reqd.close_request_body t.request_queue; - Queue.clear t.request_queue; - Queue.push reqd t.request_queue; - wakeup_writer t; - if Reqd.is_complete reqd - then shutdown t - else if not (Reqd.requires_input reqd) - then shutdown_reader t - end - end else if Reader.is_closed t.reader - then shutdown t - -let _next_read_operation t = - advance_request_queue_if_necessary t; - if is_active t then begin - let reqd = current_reqd_exn t in - if Reqd.requires_input reqd then Reader.next t.reader - else if Reqd.persistent_connection reqd then `Yield - else begin - shutdown_reader t; - Reader.next t.reader - end - end else - Reader.next t.reader + if Reqd.persistent_connection reqd then ( + if not (Queue.is_empty t.request_queue) + then t.request_handler (Queue.peek_exn t.request_queue); + wakeup_reader t; + _next_read_operation t; + ) else ( + shutdown t; + Reader.next t.reader + ) + | Wait + | Ready -> + if not (Reqd.persistent_connection reqd) then ( + shutdown_reader t; + Reader.next t.reader + ) else `Yield + | Upgrade -> assert false) + | Upgrade -> + assert (Reqd.output_state reqd = Upgrade); + shutdown t; + `Upgrade + ) +;; let next_read_operation t = match _next_read_operation t with | `Error (`Parse _) -> set_error_and_handle t `Bad_request; `Close | `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close - | (`Read | `Yield | `Close) as operation -> operation + | (`Read | `Yield | `Close | `Upgrade) as operation -> operation let read_with_more t bs ~off ~len more = let call_handler = Queue.is_empty t.request_queue in @@ -269,9 +269,39 @@ let flush_response_body t = ;; let next_write_operation t = - advance_request_queue_if_necessary t; - flush_response_body t; - Writer.next t.writer + if not (is_active t) then ( + if Reader.is_closed t.reader + then shutdown t; + Writer.next t.writer + ) else ( + let reqd = current_reqd_exn t in + match Reqd.output_state reqd with + | Wait -> `Yield + | Ready -> + assert (Reqd.input_state reqd <> Upgrade); + Reqd.flush_response_body reqd; + Writer.next t.writer + | Complete -> + (match Reqd.input_state reqd with + | Complete -> + ignore (Queue.take t.request_queue); + if Reqd.persistent_connection reqd then ( + if not (Queue.is_empty t.request_queue) + then t.request_handler (Queue.peek_exn t.request_queue); + wakeup_reader t + ) else ( + shutdown t + ); + Writer.next t.writer + | Provide + | Wait -> `Yield + | Upgrade -> assert false) + | Upgrade -> + assert (Reqd.input_state reqd = Upgrade); + shutdown t; + `Upgrade(Reqd.request reqd, Reqd.response_exn reqd) + ) +;; let report_write_result t result = Writer.report_result t.writer result @@ -279,11 +309,17 @@ let report_write_result t result = let yield_writer t k = if is_active t then begin let reqd = current_reqd_exn t in - if Reqd.requires_output reqd - then Reqd.on_more_output_available reqd k - else if Reqd.persistent_connection reqd - then on_wakeup_writer t k - else begin shutdown t; k () end + match Reqd.output_state reqd with + | Complete -> + if Reqd.persistent_connection reqd + then on_wakeup_writer t k + else begin + shutdown t; + k () + end + | Wait -> Reqd.on_more_output_available reqd k + | Ready + | Upgrade -> k () end else if Writer.is_closed t.writer then k () else begin on_wakeup_writer t k end diff --git a/lib_test/test_httpaf.ml b/lib_test/test_httpaf.ml index 333565b9..e9a467f6 100644 --- a/lib_test/test_httpaf.ml +++ b/lib_test/test_httpaf.ml @@ -272,46 +272,52 @@ let response_to_string ?body r = Faraday.serialize_to_string f module Read_operation = struct - type t = [ `Read | `Yield | `Close ] + type t = [ `Read | `Yield | `Close | `Upgrade ] + + let pp_hum fmt (t:t) = + let str = + match t with + | `Read -> "Read" + | `Yield -> "Yield" + | `Close -> "Close" + | `Upgrade -> "Upgrade" + in + Format.pp_print_string fmt str + ;; + end - let pp_hum fmt t = - let str = + module Write_operation = struct + type t = + [ `Write of Bigstringaf.t IOVec.t list + | `Upgrade of Request.t * Response.t + | `Yield + | `Close of int ] + + let iovecs_to_string iovecs = + let len = IOVec.lengthv iovecs in + let bytes = Bytes.create len in + let dst_off = ref 0 in + List.iter (fun { IOVec.buffer; off = src_off; len } -> + Bigstringaf.unsafe_blit_to_bytes buffer ~src_off bytes ~dst_off:!dst_off ~len; + dst_off := !dst_off + len) + iovecs; + Bytes.unsafe_to_string bytes + ;; + + let pp_hum fmt t = match t with - | `Read -> "Read" - | `Yield -> "Yield" - | `Close -> "Close" - in - Format.pp_print_string fmt str - ;; -end + | `Write iovecs -> Format.fprintf fmt "Write %S" (iovecs_to_string iovecs) + | `Yield -> Format.pp_print_string fmt "Yield" + | `Close len -> Format.fprintf fmt "Close %i" len + | `Upgrade _ -> Format.pp_print_string fmt "Upgrade" + ;; -module Write_operation = struct - type t = [ `Write of Bigstringaf.t IOVec.t list | `Yield | `Close of int ] - - let iovecs_to_string iovecs = - let len = IOVec.lengthv iovecs in - let bytes = Bytes.create len in - let dst_off = ref 0 in - List.iter (fun { IOVec.buffer; off = src_off; len } -> - Bigstringaf.unsafe_blit_to_bytes buffer ~src_off bytes ~dst_off:!dst_off ~len; - dst_off := !dst_off + len) - iovecs; - Bytes.unsafe_to_string bytes - ;; - - let pp_hum fmt t = - match t with - | `Write iovecs -> Format.fprintf fmt "Write %S" (iovecs_to_string iovecs) - | `Yield -> Format.pp_print_string fmt "Yield" - | `Close len -> Format.fprintf fmt "Close %i" len - ;; - - let to_write_as_string t = - match t with - | `Write iovecs -> Some (iovecs_to_string iovecs) - | `Close _ | `Yield -> None - ;; -end + let to_write_as_string t = + match t with + | `Write iovecs -> Some (iovecs_to_string iovecs) + | `Close _ | `Yield | `Upgrade _ -> None + ;; + end let write_operation = Alcotest.of_pp Write_operation.pp_hum let read_operation = Alcotest.of_pp Read_operation.pp_hum @@ -339,6 +345,11 @@ module Server_connection = struct `Read (next_read_operation t); ;; + let read_upgrade t = + Alcotest.check read_operation "Reader is requesting an upgrade" + `Upgrade (next_read_operation t); + ;; + let reader_yielded t = Alcotest.check read_operation "Reader is in a yield state" `Yield (next_read_operation t); @@ -361,6 +372,12 @@ module Server_connection = struct report_write_result t `Closed; ;; + let write_upgrade ?(msg="upgrade written") t request response = + Alcotest.check write_operation msg + (`Upgrade(request, response)) + (next_write_operation t); + ;; + let writer_yielded t = Alcotest.check write_operation "Writer is in a yield state" `Yield (next_write_operation t); @@ -423,6 +440,13 @@ module Server_connection = struct Body.close_writer resp_body ;; + let read_eof_empty t = + let c = read_eof t Bigstringaf.empty ~off:0 ~len:0 in + Alcotest.(check int) "read_eof with no input returns 0" 0 c; + Alcotest.check read_operation "Shutting down a reader closes it" + `Close (next_read_operation t); + ;; + let test_initial_reader_state () = let t = create default_request_handler in Alcotest.check read_operation "A new reader wants input" @@ -431,15 +455,13 @@ module Server_connection = struct let test_reader_is_closed_after_eof () = let t = create default_request_handler in - let c = read_eof t Bigstringaf.empty ~off:0 ~len:0 in - Alcotest.(check int) "read_eof with no input returns 0" 0 c; + read_eof_empty t; connection_is_shutdown t; let t = create default_request_handler in let c = read t Bigstringaf.empty ~off:0 ~len:0 in Alcotest.(check int) "read with no input returns 0" 0 c; - let c = read_eof t Bigstringaf.empty ~off:0 ~len:0; in - Alcotest.(check int) "read_eof with no input returns 0" 0 c; + read_eof_empty t; connection_is_shutdown t; ;; @@ -784,6 +806,18 @@ Accept-Language: en-US,en;q=0.5\r\n\r\n"; writer_closed t; ;; + let test_upgrade () = + let request_handler reqd = + Reqd.respond_with_upgrade reqd Headers.empty + in + let t = create ~error_handler request_handler in + let request = Request.create `GET "/" ~headers:(Headers.of_list [ "Connection", "upgrade" ]) in + let response = Response.create `Switching_protocols in + read_request t request; + write_upgrade t request response; + read_upgrade t; + ;; + let tests = [ "initial reader state" , `Quick, test_initial_reader_state ; "shutdown reader closed", `Quick, test_reader_is_closed_after_eof @@ -802,6 +836,7 @@ Accept-Language: en-US,en;q=0.5\r\n\r\n"; ; "blocked write on chunked encoding", `Quick, test_blocked_write_on_chunked_encoding ; "writer unexpected eof", `Quick, test_unexpected_eof ; "input shrunk", `Quick, test_input_shrunk + ; "upgrade", `Quick, test_upgrade ] end @@ -832,7 +867,7 @@ module Client_connection = struct let reader_ready t = Alcotest.check read_operation "Reader is ready" - `Read (next_read_operation t :> [`Close | `Read | `Yield]); + `Read (next_read_operation t :> Read_operation.t); ;; let write_string ?(msg="output written") t str = @@ -850,17 +885,17 @@ module Client_connection = struct let writer_yielded t = Alcotest.check write_operation "Writer is in a yield state" - `Yield (next_write_operation t); + `Yield (next_write_operation t :> Write_operation.t); ;; let writer_closed t = Alcotest.check write_operation "Writer is closed" - (`Close 0) (next_write_operation t); + (`Close 0) (next_write_operation t :> Write_operation.t); ;; let connection_is_shutdown t = Alcotest.check read_operation "Reader is closed" - `Close (next_read_operation t :> [`Close | `Read | `Yield]); + `Close (next_read_operation t :> Read_operation.t); writer_closed t; ;; diff --git a/lwt-unix/httpaf_lwt_unix.ml b/lwt-unix/httpaf_lwt_unix.ml index 545fbc6a..30ff18c4 100644 --- a/lwt-unix/httpaf_lwt_unix.ml +++ b/lwt-unix/httpaf_lwt_unix.ml @@ -34,6 +34,7 @@ open Lwt.Infix +(* Based on the Buffer module in httpaf_async.ml. *) module Buffer : sig type t @@ -103,10 +104,29 @@ let shutdown socket command = try Lwt_unix.shutdown socket command with Unix.Unix_error (Unix.ENOTCONN, _, _) -> () -module Config = Httpaf.Config - module Server = struct - let create_connection_handler ?(config=Config.default) ~request_handler ~error_handler = + module Upgrade = struct + type t = + | Ignore + | Raise + | Handle of (Lwt_unix.file_descr -> Httpaf.Request.t -> Httpaf.Response.t -> unit Lwt.t) + + let to_handler = function + | Ignore -> (fun socket _request _response -> Lwt_unix.close socket) + | Raise -> + (fun socket _request _response -> + Lwt.async (fun () -> Lwt_unix.close socket); + failwith "Upgrades not supported by server") + | Handle handler -> handler + end + + let create_connection_handler + ?(config=Httpaf.Config.default) + ~upgrade_handler + ~request_handler + ~error_handler + = + let upgrade_handler = Upgrade.to_handler upgrade_handler in fun client_addr socket -> let module Server_connection = Httpaf.Server_connection in let connection = @@ -139,7 +159,7 @@ module Server = struct | `Yield -> Server_connection.yield_reader connection read_loop; Lwt.return_unit - + | `Upgrade -> Lwt.return_unit | `Close -> Lwt.wakeup_later notify_read_loop_exited (); if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin @@ -167,11 +187,11 @@ module Server = struct writev io_vectors >>= fun result -> Server_connection.report_write_result connection result; write_loop_step () - + | `Upgrade(request, response) -> + upgrade_handler socket request response | `Yield -> Server_connection.yield_writer connection write_loop; Lwt.return_unit - | `Close _ -> Lwt.wakeup_later notify_write_loop_exited (); if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin @@ -201,10 +221,8 @@ module Server = struct Lwt.return_unit end - - module Client = struct - let request ?(config=Config.default) socket request ~error_handler ~response_handler = + let request ?(config=Httpaf.Config.default) socket request ~error_handler ~response_handler = let module Client_connection = Httpaf.Client_connection in let request_body, connection = Client_connection.request ~config request ~error_handler ~response_handler in diff --git a/lwt-unix/httpaf_lwt_unix.mli b/lwt-unix/httpaf_lwt_unix.mli index 6625002c..24d5f7b7 100644 --- a/lwt-unix/httpaf_lwt_unix.mli +++ b/lwt-unix/httpaf_lwt_unix.mli @@ -39,8 +39,16 @@ open Httpaf to [Lwt_io.establish_server_with_client_socket]. For an example, see [examples/lwt_echo_server.ml]. *) module Server : sig + module Upgrade : sig + type t = + | Ignore + | Raise + | Handle of (Lwt_unix.file_descr -> Httpaf.Request.t -> Httpaf.Response.t -> unit Lwt.t) + end + val create_connection_handler : ?config : Config.t + -> upgrade_handler : Upgrade.t -> request_handler : (Unix.sockaddr -> Server_connection.request_handler) -> error_handler : (Unix.sockaddr -> Server_connection.error_handler) -> Unix.sockaddr