Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming reads and writes experiments #10

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 96 additions & 13 deletions lib/oneFFS.ml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ module Make(B : Mirage_block.S) = struct
info : Mirage_block.info;
mutable f : Header.t option;
empty_header : Cstruct.t;
mutex : Lwt_mutex.t;
}

type error = [ `Block of B.error | `Bad_checksum ]
Expand All @@ -82,6 +83,7 @@ module Make(B : Mirage_block.S) = struct

let write t s =
let (let*?) = Lwt_result.bind in
Lwt_mutex.with_lock t.mutex @@ fun () ->
(* First invalidate current file *)
t.f <- None;
let*? () =
Expand All @@ -103,7 +105,65 @@ module Make(B : Mirage_block.S) = struct
t.f <- Some header;
Lwt_result.return ()

let stream_write t f =
let (let*?) = Lwt_result.bind in
Lwt_mutex.with_lock t.mutex @@ fun () ->
(* First invalidate current file *)
t.f <- None;
let*? () =
B.write t.b 0L [t.empty_header]
in
let mutex = Lwt_mutex.create () in
let sector_size = t.info.sector_size in
let buf = Cstruct.create sector_size
and written = ref 0
and crc = ref Checkseum.Crc32.default
and pending = ref 0 in
let rec append data off =
if String.length data = off then
Lwt.return (Ok ())
else
let l = min (sector_size - !pending) (String.length data - off) in
Cstruct.blit_from_string data 0 buf !pending l;
pending := !pending + l;
if !pending = sector_size then
let sector = Int64.of_int (!written / sector_size + 1) in
let*? () = B.write t.b sector [ buf ] in
pending := 0;
written := !written + sector_size;
crc := Checkseum.Crc32.digest_bigstring buf.buffer buf.off sector_size !crc;
append data (off + l)
else
Lwt.return (Ok ())
in
let append data =
Lwt_mutex.with_lock mutex @@ fun () ->
append data 0
in
let*? v = f append in
let*? () =
if !pending > 0 then
let sector = Int64.of_int (!written / sector_size + 1) in
Cstruct.memset (Cstruct.shift buf !pending) 0;
let*? () = B.write t.b sector [ buf ] in
written := !written + !pending;
crc := Checkseum.Crc32.digest_bigstring buf.buffer buf.off sector_size !crc;
Lwt.return (Ok ())
else
Lwt.return (Ok ())
in
let hdr = { Header.length = !written; file_crc = !crc } in
Header.marshal hdr buf;
let*? () = B.write t.b 0L [ buf ] in
t.f <- Some hdr;
Lwt.return (Ok v)

let b_read b sector_start bufs =
Lwt_result.map_error (fun e -> `Block e)
(B.read b sector_start bufs)

let read t =
let (let*?) = Lwt_result.bind in
match t.f with
| None -> Lwt_result.return None
| Some { Header.length; file_crc } ->
Expand All @@ -117,19 +177,41 @@ module Make(B : Mirage_block.S) = struct
List.init sectors
(fun i -> Cstruct.sub buf (i * sector_size) sector_size)
in
let* r = B.read t.b 1L bufs in
match r with
| Error e -> Lwt_result.fail (`Block e)
| Ok () ->
let crc =
Checkseum.Crc32.digest_bigstring buf.buffer buf.off length
Checkseum.Crc32.default
in
if Optint.equal crc file_crc then
let s = Cstruct.to_string ~len:length buf in
Lwt_result.return (Some s)
let*? () = b_read t.b 1L bufs in
let crc =
Checkseum.Crc32.digest_bigstring buf.buffer buf.off length
Checkseum.Crc32.default
in
if Optint.equal crc file_crc then
let s = Cstruct.to_string ~len:length buf in
Lwt_result.return (Some s)
else
Lwt_result.fail `Bad_checksum

let stream_read t =
let (let*?) = Lwt_result.bind in
match t.f with
| None -> None
| Some { Header.length; file_crc } ->
let off = ref 0
and crc = ref Checkseum.Crc32.default in
let buf = Cstruct.create t.info.sector_size in
let read () =
if !off = length then
if Checkseum.Crc32.equal file_crc !crc then
Lwt.return (Ok `End_of_file)
else
Lwt.return (Ok `Bad_checksum)
else
Lwt_result.fail `Bad_checksum
let sector = Int64.of_int (!off / t.info.sector_size) in
let len = min t.info.sector_size (length - !off) in
off := !off + len;
let*? () = b_read t.b sector [ buf ] in
let data = Cstruct.to_string ~off:0 ~len buf in
crc := Checkseum.Crc32.digest_string data 0 (String.length data) !crc;
Lwt.return (Ok (`Data data))
in
Some read

let format b =
let* info = B.get_info b in
Expand Down Expand Up @@ -161,5 +243,6 @@ module Make(B : Mirage_block.S) = struct
(* Reuse the buffer for the empty header *)
Cstruct.memset buf 0;
Cstruct.blit_from_string Header.empty 0 buf 0 (String.length Header.empty);
Lwt.return { b; info; f; empty_header = buf; }
let mutex = Lwt_mutex.create () in
Lwt.return { b; info; f; empty_header = buf; mutex }
end
22 changes: 22 additions & 0 deletions lib/oneFFS.mli
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,28 @@ module Make(B : Mirage_block.S) : sig
(** [read fs] reads the data stored if any. An error is returned if the
checksum is bad or if the read fails. *)

val stream_write : t -> ((string -> (unit, write_error) result Lwt.t) ->
('a, write_error) result Lwt.t) ->
('a, write_error) result Lwt.t
(** [stream_write t f] calls [f append] where [append] is a function that
appends data to a fresh file. Once the [f append] task is resolved the file
is committed and the result is returned. *)

val stream_read : t ->
(unit ->
([> `Bad_checksum | `Data of string | `End_of_file ],
[> `Block of B.error ])
Lwt_result.t)
option
(** [stream_read t] is [None] if [is_set t] is false. Otherwise, it is
[Some read] where [read] can be called repeatedly to read sectors of the
file at a time. [read ()] returns [Ok (`Data data)] when there is more
data to be read. At the end of the file either [Ok `End_of_file] or [Ok
`Bad_checksum] is returned depending on whether the checksum for the data
was as expected.

It is an error to write while reading. *)

val is_set : t -> bool
(** [is_set fs] is true if [fs] has any data. *)

Expand Down