Skip to content

Commit 2ee5645

Browse files
committed
Deflatemod: Use Ocsigen_response instead of Ocsigen_stream
Rewrite 'Deflatemod' to operate on 'Ocsigen_response.Body' directly instead of on 'Ocsigen_stream'. 'Ocsigen_stream' will no longer be compatible with Cohttp-eio's response type.
1 parent b84a161 commit 2ee5645

File tree

1 file changed

+53
-107
lines changed

1 file changed

+53
-107
lines changed

src/extensions/deflatemod.ml

Lines changed: 53 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -59,131 +59,77 @@ let gzip_header =
5959
type output_buffer =
6060
{ stream : Zlib.stream
6161
; buf : bytes
62-
; mutable pos : int
63-
; mutable avail : int
62+
; flush : string -> unit Lwt.t
6463
; mutable size : int32
65-
; mutable crc : int32
66-
; mutable add_trailer : bool }
64+
; mutable crc : int32 }
6765

68-
let write_int32 oz n =
66+
let write_int32 buf offset n =
6967
for i = 0 to 3 do
70-
Bytes.set oz.buf (oz.pos + i)
68+
Bytes.set buf (offset + i)
7169
(Char.chr (Int32.to_int (Int32.shift_right_logical n (8 * i)) land 0xff))
72-
done;
73-
oz.pos <- oz.pos + 4;
74-
oz.avail <- oz.avail - 4;
75-
assert (oz.avail >= 0)
70+
done
7671

77-
(* puts in oz the content of buf, from pos to pos + len ;
78-
* f is the continuation of the current stream *)
79-
let rec output oz f buf pos len =
80-
assert (pos >= 0 && len >= 0 && pos + len <= String.length buf);
81-
if oz.avail = 0
82-
then (
83-
let cont () = output oz f buf pos len in
84-
Logs.info ~src:section (fun fmt ->
85-
fmt "Flushing because output buffer is full");
86-
flush oz cont)
87-
else if len = 0
88-
then next_cont oz f
72+
let compress_flush oz used_out = oz.flush (Bytes.sub_string oz.buf 0 used_out)
73+
74+
(* gzip trailer *)
75+
let write_trailer oz =
76+
write_int32 oz.buf 0 oz.crc;
77+
write_int32 oz.buf 4 oz.size;
78+
compress_flush oz 8
79+
80+
(* puts in oz the content of buf, from pos to pos + len ; *)
81+
let rec compress_output oz inbuf pos len =
82+
if len = 0
83+
then Lwt.return_unit
8984
else
9085
let (_ : bool), used_in, used_out =
9186
try
92-
Zlib.deflate oz.stream
93-
(Bytes.unsafe_of_string buf)
94-
pos len oz.buf oz.pos oz.avail Zlib.Z_NO_FLUSH
87+
Zlib.deflate_string oz.stream inbuf pos len oz.buf 0
88+
(Bytes.length oz.buf) Zlib.Z_NO_FLUSH
9589
with Zlib.Error (s, s') ->
9690
raise
9791
(Ocsigen_stream.Stream_error
9892
("Error during compression: " ^ s ^ " " ^ s'))
9993
in
100-
oz.pos <- oz.pos + used_out;
101-
oz.avail <- oz.avail - used_out;
102-
oz.size <- Int32.add oz.size (Int32.of_int used_in);
103-
oz.crc <- Zlib.update_crc_string oz.crc buf pos used_in;
104-
output oz f buf (pos + used_in) (len - used_in)
94+
compress_flush oz used_out >>= fun () ->
95+
compress_output oz inbuf (pos + used_in) (len - used_in)
10596

106-
(* Flush oz, ie. produces a new_stream with the content of oz, cleans it
107-
* and returns the continuation of the stream *)
108-
and flush oz cont =
109-
let len = oz.pos in
110-
if len = 0
111-
then cont ()
112-
else
113-
let buf_len = Bytes.length oz.buf in
114-
let s =
115-
if len = buf_len
116-
then Bytes.to_string oz.buf
117-
else Bytes.sub_string oz.buf 0 len
118-
in
119-
Logs.info ~src:section (fun fmt -> fmt "Flushing!");
120-
oz.pos <- 0;
121-
oz.avail <- buf_len;
122-
Ocsigen_stream.cont s cont
123-
124-
and next_cont oz stream =
125-
Ocsigen_stream.next (stream : string Ocsigen_stream.stream) >>= fun e ->
126-
match e with
127-
| Ocsigen_stream.Finished None ->
128-
Logs.info ~src:section (fun fmt ->
129-
fmt "End of stream: big cleaning for zlib");
130-
(* loop until there is nothing left to compress and flush *)
131-
let rec finish () =
132-
(* buffer full *)
133-
if oz.avail = 0
134-
then flush oz finish
135-
else
136-
(* no more input, deflates only what were left because output buffer
137-
* was full *)
138-
let finished, (_ : int), used_out =
139-
Zlib.deflate oz.stream oz.buf 0 0 oz.buf oz.pos oz.avail
140-
Zlib.Z_FINISH
141-
in
142-
oz.pos <- oz.pos + used_out;
143-
oz.avail <- oz.avail - used_out;
144-
if not finished then finish () else write_trailer ()
145-
and write_trailer () =
146-
if oz.add_trailer && oz.avail < 8
147-
then flush oz write_trailer
148-
else (
149-
if oz.add_trailer then (write_int32 oz oz.crc; write_int32 oz oz.size);
150-
Logs.info ~src:section (fun fmt ->
151-
fmt "Zlib.deflate finished, last flush");
152-
flush oz (fun () -> Ocsigen_stream.empty None))
153-
in
154-
finish ()
155-
| Ocsigen_stream.Finished (Some s) -> next_cont oz s
156-
| Ocsigen_stream.Cont (s, f) -> output oz f s 0 (String.length s)
97+
let rec compress_finish oz =
98+
(* loop until there is nothing left to compress and flush *)
99+
let finished, (_ : int), used_out =
100+
Zlib.deflate oz.stream oz.buf 0 0 oz.buf 0 (Bytes.length oz.buf)
101+
Zlib.Z_FINISH
102+
in
103+
compress_flush oz used_out >>= fun () ->
104+
if not finished then compress_finish oz else Lwt.return_unit
157105

158106
(* deflate param : true = deflate ; false = gzip (no header in this case) *)
159-
let compress deflate stream : string Ocsigen_stream.t =
107+
let compress_body deflate body =
108+
fun flush ->
160109
let zstream = Zlib.deflate_init !compress_level deflate in
161-
let finalize status =
162-
Ocsigen_stream.finalize stream status >>= fun _e ->
163-
(try Zlib.deflate_end zstream
164-
with
165-
(* ignore errors, deflate_end cleans everything anyway *)
166-
| Zlib.Error _ ->
167-
());
168-
Lwt.return (Logs.info ~src:section (fun fmt -> fmt "Zlib stream closed"))
169-
in
170110
let oz =
171111
let buffer_size = !buffer_size in
172112
{ stream = zstream
173113
; buf = Bytes.create buffer_size
174-
; pos = 0
175-
; avail = buffer_size
114+
; flush
176115
; size = 0l
177-
; crc = 0l
178-
; add_trailer = not deflate }
116+
; crc = 0l }
179117
in
180-
let new_stream () = next_cont oz (Ocsigen_stream.get stream) in
181-
Logs.info ~src:section (fun fmt -> fmt "Zlib stream initialized");
182-
if deflate
183-
then Ocsigen_stream.make ~finalize new_stream
184-
else
185-
Ocsigen_stream.make ~finalize (fun () ->
186-
Ocsigen_stream.cont gzip_header new_stream)
118+
(if deflate then Lwt.return_unit else flush gzip_header) >>= fun () ->
119+
body (fun inbuf ->
120+
let len = String.length inbuf in
121+
oz.size <- Int32.add oz.size (Int32.of_int len);
122+
oz.crc <- Zlib.update_crc_string oz.crc inbuf 0 len;
123+
compress_output oz inbuf 0 len)
124+
>>= fun () ->
125+
compress_finish oz >>= fun () ->
126+
(if deflate then Lwt.return_unit else write_trailer oz) >>= fun () ->
127+
(try Zlib.deflate_end zstream
128+
with
129+
(* ignore errors, deflate_end cleans everything anyway *)
130+
| Zlib.Error _ ->
131+
());
132+
Lwt.return_unit
187133

188134
(* We implement Content-Encoding, not Transfer-Encoding *)
189135
type encoding = Deflate | Gzip | Id | Star | Not_acceptable
@@ -252,8 +198,8 @@ let stream_filter contentencoding url deflate choice res =
252198
match Ocsigen_header.Mime_type.parse contenttype with
253199
| None, _ | _, None -> Lwt.return res
254200
| Some a, Some b when should_compress (a, b) url choice ->
255-
let response, body = Ocsigen_response.to_cohttp res in
256201
let response =
202+
let response = Ocsigen_response.response res in
257203
let headers = Cohttp.Response.headers response in
258204
let headers =
259205
let name = Ocsigen_header.Name.(to_string etag) in
@@ -273,10 +219,10 @@ let stream_filter contentencoding url deflate choice res =
273219
Cohttp.Response.headers
274220
; Cohttp.Response.encoding = Cohttp.Transfer.Chunked }
275221
and body =
276-
Cohttp_lwt.Body.to_stream body
277-
|> Ocsigen_stream.of_lwt_stream |> compress deflate
278-
|> Ocsigen_stream.to_lwt_stream
279-
|> Cohttp_lwt.Body.of_stream
222+
Ocsigen_response.Body.make Cohttp.Transfer.Chunked
223+
(compress_body deflate
224+
(Ocsigen_response.Body.write
225+
(Ocsigen_response.body res)))
280226
in
281227
Lwt.return (Ocsigen_response.update res ~body ~response)
282228
| _ -> Lwt.return res)

0 commit comments

Comments
 (0)