Skip to content

Commit

Permalink
irmin-pack: Add read_bytes_exn in Dispatcher and use it instead of
Browse files Browse the repository at this point in the history
`transfer_append_exn` in GC.
  • Loading branch information
Ngoguey42 committed Oct 7, 2022
1 parent 46e6a58 commit d1b74ab
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 64 deletions.
71 changes: 52 additions & 19 deletions src/irmin-pack/unix/dispatcher.ml
Original file line number Diff line number Diff line change
Expand Up @@ -223,23 +223,57 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
| Prefix -> Io.read_exn (get_prefix t) ~off:poff ~len buf
| Suffix -> Suffix.read_exn (Fm.suffix t.fm) ~off:poff ~len buf

let read_in_prefix_and_suffix_exn t ~off ~len buf =
let ( -- ) a b = a - b in
let read_bytes_exn t ~f ~off ~len =
let open Int63.Syntax in
let suffix_start_offset = suffix_start_offset t in
if off < suffix_start_offset && off + Int63.of_int len > suffix_start_offset
then (
let read_in_prefix = suffix_start_offset - off |> Int63.to_int in
let accessor = Accessor.v_exn t ~off ~len:read_in_prefix in
read_exn t accessor buf;
let read_in_suffix = len -- read_in_prefix in
let buf_suffix = Bytes.create read_in_suffix in
let accessor =
Accessor.v_exn t ~off:suffix_start_offset ~len:read_in_suffix
in
read_exn t accessor buf_suffix;
Bytes.blit buf_suffix 0 buf read_in_prefix read_in_suffix)
else read_exn t (Accessor.v_exn t ~off ~len) buf
let bytes_in_prefix =
let prefix_bytes_after_off = suffix_start_offset t - off in
if prefix_bytes_after_off <= Int63.zero then Int63.zero
else min len prefix_bytes_after_off
in
let bytes_in_suffix =
if bytes_in_prefix < len then len - bytes_in_prefix else Int63.zero
in
assert (bytes_in_prefix + bytes_in_suffix = len);
let prefix_accessor_opt =
if bytes_in_prefix > Int63.zero then
Some (Accessor.v_exn t ~off ~len:bytes_in_prefix)
else None
in
let suffix_accessor_opt =
if bytes_in_suffix > Int63.zero then
let off = off + bytes_in_prefix in
Some (Accessor.v_exn t ~off ~len:bytes_in_suffix)
else None
in

(* Now that we have the accessor(s), we're sure the range is valid:
- it doesn't include dead data from the prefix,
- it doesn't go after the end of the suffix.
Go for read. *)
let max_read_size = 8192 in
let buffer = Bytes.create max_read_size in
let max_read_size = Int63.of_int max_read_size in
let rec aux accessor =
if accessor.len = Int63.zero then ()
else if accessor.len < max_read_size then (
read_exn t accessor buffer;
f (Bytes.sub_string buffer 0 (Int63.to_int accessor.len)))
else
let left, right =
( { accessor with len = max_read_size },
{
accessor with
poff = accessor.poff + max_read_size;
len = accessor.len - max_read_size;
} )
in
read_exn t left buffer;
f (Bytes.to_string buffer);
aux right
in
Option.iter aux prefix_accessor_opt;
Option.iter aux suffix_accessor_opt

let create_accessor_exn t ~off ~len =
let len = Int63.of_int len in
Expand All @@ -250,10 +284,9 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
let max_len = Int63.of_int max_len in
Accessor.v_range_exn t ~off ~min_len ~max_len

let create_accessor_to_prefix_exn t ~off ~len =
let create_accessor_to_prefix_exn t ~off ~len =
let len = Int63.of_int len in
Accessor.v_exn t ~off ~len
Accessor.v_in_prefix_exn
Accessor.v_in_prefix_exn t ~off ~len

let shrink_accessor_exn a ~new_len =
let open Int63.Syntax in
Expand Down
31 changes: 23 additions & 8 deletions src/irmin-pack/unix/dispatcher_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module type S = sig
type t
type location = private Prefix | Suffix [@@deriving irmin]

type accessor = private { poff : int63; len : int; location : location }
type accessor = private { poff : int63; len : int63; location : location }
[@@deriving irmin]
(** An [accessor] designates a valid readable area in one of the pack files.
Expand All @@ -48,6 +48,11 @@ module type S = sig
except that the precise length of the span will be decided during the
call. *)

val create_accessor_to_prefix_exn :
Mapping_file.t -> off:int63 -> len:int -> accessor
(** [create_accessor_to_prefix_exn mapping ~off ~len] returns an accessor for
the prefix file associated with [mapping]. *)

val shrink_accessor_exn : accessor -> new_len:int -> accessor
(** [shrink_accessor_exn a ~new_len] is [a] where the length is smaller than
in [a].*)
Expand Down Expand Up @@ -83,14 +88,24 @@ module type S = sig
(** [offset_of_suffix_poff t suffix_off] converts a suffix offset into a
(global) offset. *)

val read_in_prefix_and_suffix_exn : t -> off:int63 -> len:int -> bytes -> unit
(** Simlar to [read_exn] but if [off + len] is greater than the end of the
prefix, it will read the remaining in the prefix. *)
val read_bytes_exn : t -> f:(string -> unit) -> off:int63 -> len:int63 -> unit
(** [read_bytes_exn] reads a slice of the global offset space defined by [off]
and [len].
val create_accessor_to_prefix_exn :
Mapping_file.t -> off:int63 -> len:int -> accessor
(** [create_accessor_to_prefix_exn mapping ~off ~len] returns an accessor for
the prefix file associated with [mapping]. *)
The calls to [f] ignore the objects boundaries (i.e. the string passed to
[f] will most of the time not be the beginning of an object).
The strings passed to [f] are safe. They can be kept around, they are not
the result of an [unsafe_to_string] conversion.
The call will fail if the [(off, len)] range is invalid. It will succeed
in these cases:
- If the range designates a slice of the suffix.
- If the range designates a slice of contiguous live bytes in the prefix
- If the range designates a slice of contiguous live bytes that starts in
the prefix and ends in the suffix. This implies that the last chunk of
the prefix is contiguous to the start of the suffix. *)
end

module type Sigs = sig
Expand Down
43 changes: 6 additions & 37 deletions src/irmin-pack/unix/gc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -323,23 +323,13 @@ end
module Worker = struct
module Payload = Control_file.Latest_payload

let buffer_size = 8192

exception Pack_error = Errors.Pack_error

module type S = sig
module Args : Args

val run_and_output_result : generation:int -> string -> Args.key -> unit

val transfer_append_exn :
dispatcher:Args.Dispatcher.t ->
append_exn:(string -> unit) ->
off:int63 ->
len:int63 ->
bytes ->
unit

type gc_output = (Stats.Latest_gc.worker, Args.Errs.t) result
[@@deriving irmin]
end
Expand Down Expand Up @@ -370,25 +360,6 @@ module Worker = struct

let string_of_key = Irmin.Type.to_string key_t

let transfer_append_exn ~dispatcher ~append_exn ~(off : int63)
~(len : int63) buffer =
let read_exn = Dispatcher.read_in_prefix_and_suffix_exn dispatcher in
let buffer_size = Bytes.length buffer |> Int63.of_int in
let rec aux off len_remaining =
let open Int63.Syntax in
let min a b = if a < b then a else b in
let len = min buffer_size len_remaining in
let len' = Int63.to_int len in
read_exn ~off ~len:len' buffer;
let () =
if len = buffer_size then append_exn (Bytes.to_string buffer)
else append_exn (Bytes.sub_string buffer 0 len')
in
let len_remaining = len_remaining - len in
if len_remaining > Int63.zero then aux (off + len) len_remaining
in
aux off len

(** [iter_from_node_key node_key _ _ ~f] calls [f] with the key of the node
and iterates over its children.
Expand Down Expand Up @@ -567,12 +538,11 @@ module Worker = struct
(* Step 5. Transfer to the new prefix, flush and close. *)
[%log.debug "GC: transfering to the new prefix"];
stats := Worker_stats.finish_current_step !stats "prefix: transfer";
let buffer = Bytes.create buffer_size in
(* Step 5.1. Transfer all. *)
let append_exn = Ao.append_exn prefix in
let f ~off ~len =
let len = Int63.of_int len in
transfer_append_exn ~dispatcher ~append_exn ~off ~len buffer
Dispatcher.read_bytes_exn dispatcher ~f:append_exn ~off ~len
in
let () = Mapping_file.iter_exn mapping f in
Ao.flush prefix |> Errs.raise_if_error
Expand Down Expand Up @@ -606,7 +576,6 @@ module Worker = struct
(* Step 6. Create the new suffix and prepare 2 functions for read and write
operations. *)
stats := Worker_stats.finish_current_step !stats "suffix: start";
let buffer = Bytes.create buffer_size in
[%log.debug "GC: creating new suffix"];
let suffix = create_new_suffix ~root ~generation in
Errors.finalise_exn (fun _outcome ->
Expand All @@ -615,7 +584,6 @@ module Worker = struct
|> Errs.log_if_error "GC: Close suffix")
@@ fun () ->
let append_exn = Ao.append_exn suffix in
let transfer_exn = transfer_append_exn ~dispatcher ~append_exn buffer in

(* Step 7. Transfer to the next suffix. *)
[%log.debug "GC: transfering to the new suffix"];
Expand All @@ -637,7 +605,9 @@ module Worker = struct
(num_iterations - i + 1)
Int63.pp off Int63.pp len];
stats := Worker_stats.add_suffix_transfer !stats len;
let () = transfer_exn ~off ~len in
let () =
Dispatcher.read_bytes_exn dispatcher ~f:append_exn ~off ~len
in
(* Check how many bytes are left, [4096*5] is selected because it is roughly the
number of bytes that requires a read from the block device on ext4 *)
if Int63.to_int len < 4096 * 5 then end_offset
Expand Down Expand Up @@ -788,13 +758,12 @@ module Make (Args : Args) = struct
Ao.close new_suffix
|> Errs.log_if_error "GC: Close suffix after copy latest newies")
@@ fun () ->
let buffer = Bytes.create 8192 in
let append_exn = Ao.append_exn new_suffix in
let flush_and_raise () = Ao.flush new_suffix |> Errs.raise_if_error in
let* () =
Errs.catch (fun () ->
Worker.transfer_append_exn ~dispatcher:t.dispatcher ~append_exn
~off:new_suffix_end_offset ~len:remaining buffer;
Dispatcher.read_bytes_exn t.dispatcher ~f:append_exn
~off:new_suffix_end_offset ~len:remaining;
flush_and_raise ())
in
Ok old_suffix_end_offset
Expand Down

0 comments on commit d1b74ab

Please sign in to comment.