Skip to content

Commit

Permalink
Add ZSTD compression (#430)
Browse files Browse the repository at this point in the history
* Add support for zstd and split compression

Add support for using zstd as an alternative to native, lz4.

Upgrade lz4 to v1.9.4 (with ARM enhancements).

Allow for split compression algorithms - i.e. use native on journal, but lz4 on ledger.

* Switch to AdRoll zstd

Development appears to be active and ongoing.  No issues running on different linux flavours.

* Use realistic bucket name

* Update README.md

* Switch branch

* Add comment following review
  • Loading branch information
martinsumner authored Jan 23, 2024
1 parent c294570 commit 999ce8b
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 73 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ In order to contribute to leveled, fork the repository, make a branch for your c

To have rebar3 execute the full set of tests, run:

```rebar3 as test do xref, dialyzer, cover --reset, eunit --cover, ct --cover, cover --verbose```
```./rebar3 do xref, dialyzer, cover --reset, eunit --cover, ct --cover, cover --verbose```

For those with a Quickcheck license, property-based tests can also be run using:

```rebar3 as eqc do eunit --module=leveled_simpleeqc, eunit --module=leveled_statemeqc```
```./rebar3 as eqc do eunit --module=leveled_simpleeqc, eunit --module=leveled_statemeqc```
16 changes: 13 additions & 3 deletions priv/leveled.schema
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,23 @@
]}.

%% @doc Compression method
%% Can be lz4 or native (which will use the Erlang native zlib compression)
%% within term_to_binary
%% Can be lz4, zstd or native (which will use the Erlang native zlib
%% compression) within term_to_binary
{mapping, "leveled.compression_method", "leveled.compression_method", [
{datatype, {enum, [native, lz4, none]}},
{datatype, {enum, [native, lz4, zstd, none]}},
{default, native}
]}.

%% @doc Ledger compression
%% If an alternative compression method is preferred specifically for the
%% ledger, it can be specified here. Default is as_store - use whatever method
%% has been defined in leveled.compression.method. Alternatives are native,
%% lz4, ztsd and none
{mapping, "leveled.ledger_compression", "leveled.ledger_compression", [
{datatype, {enum, [as_store, native, lz4, zstd, none]}},
{default, as_store}
]}.

%% @doc Compression point
%% The point at which compression is applied to the Journal (the Ledger is
%% always compressed). Use on_receipt or on_compact. on_compact is suitable
Expand Down
12 changes: 11 additions & 1 deletion priv/leveled_multi.schema
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,21 @@
%% Can be lz4 or native (which will use the Erlang native zlib compression)
%% within term_to_binary
{mapping, "multi_backend.$name.leveled.compression_method", "riak_kv.multi_backend", [
{datatype, {enum, [native, lz4, none]}},
{datatype, {enum, [native, lz4, zstd, none]}},
{default, native},
hidden
]}.

%% @doc Ledger compression
%% If an alternative compression method is preferred specifically for the
%% ledger, it can be specified here. Default is as_store - use whatever method
%% has been defined in leveled.compression.method. Alternatives are native,
%% lz4, ztsd and none
{mapping, "multi_backend.$name.ledger_compression", "riak_kv.multi_backend", [
{datatype, {enum, [as_store, native, lz4, zstd, none]}},
{default, as_store}
]}.

%% @doc Compression point
%% The point at which compression is applied to the Journal (the Ledger is
%% always compressed). Use on_receipt or on_compact. on_compact is suitable
Expand Down
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
]}.

{deps, [
{lz4, ".*", {git, "https://github.com/martinsumner/erlang-lz4", {tag, "0.2.5"}}}
{lz4, ".*", {git, "https://github.com/martinsumner/erlang-lz4", {branch, "develop-3.1"}}},
{zstd, ".*", {git, "https://github.com/nhs-riak/zstd-erlang", {branch, "nhse-develop"}}}
]}.

{ct_opts, [{dir, ["test/end_to_end"]}]}.
1 change: 1 addition & 0 deletions src/leveled.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
{registered, []},
{applications, [
lz4,
zstd,
kernel,
stdlib
]},
Expand Down
23 changes: 17 additions & 6 deletions src/leveled_bookie.erl
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
{max_pencillercachesize, ?MAX_PCL_CACHE_SIZE},
{ledger_preloadpagecache_level, ?SST_PAGECACHELEVEL_LOOKUP},
{compression_method, ?COMPRESSION_METHOD},
{ledger_compression, as_store},
{compression_point, ?COMPRESSION_POINT},
{compression_level, ?COMPRESSION_LEVEL},
{log_level, ?LOG_LEVEL},
Expand Down Expand Up @@ -292,13 +293,15 @@
% To which level of the ledger should the ledger contents be
% pre-loaded into the pagecache (using fadvise on creation and
% startup)
{compression_method, native|lz4|none} |
{compression_method, native|lz4|zstd|none} |
% Compression method and point allow Leveled to be switched from
% using bif based compression (zlib) to using nif based compression
% (lz4). To disable compression use none. This will disable in
% the ledger as well as the journla (both on_receipt and
% on_compact).
% (lz4 or zstd).
% Defaults to ?COMPRESSION_METHOD
{ledger_compression, as_store|native|lz4|zstd|none} |
% Define an alternative to the compression method to be used by the
% ledger only. Default is as_store - use the method defined as
% compression_method for the whole store
{compression_point, on_compact|on_receipt} |
% The =compression point can be changed between on_receipt (all
% values are compressed as they are received), to on_compact where
Expand Down Expand Up @@ -1812,6 +1815,14 @@ set_options(Opts, Monitor) ->
true = SFL_CompPerc >= 0.0,

CompressionMethod = proplists:get_value(compression_method, Opts),
JournalCompression = CompressionMethod,
LedgerCompression =
case proplists:get_value(ledger_compression, Opts) of
as_store ->
CompressionMethod;
AltMethod ->
AltMethod
end,
CompressOnReceipt =
case proplists:get_value(compression_point, Opts) of
on_receipt ->
Expand All @@ -1835,7 +1846,7 @@ set_options(Opts, Monitor) ->
maxrunlength_compactionperc = MRL_CompPerc,
waste_retention_period = WRP,
snaptimeout_long = SnapTimeoutLong,
compression_method = CompressionMethod,
compression_method = JournalCompression,
compress_on_receipt = CompressOnReceipt,
score_onein = ScoreOneIn,
cdb_options =
Expand All @@ -1854,7 +1865,7 @@ set_options(Opts, Monitor) ->
snaptimeout_long = SnapTimeoutLong,
sst_options =
#sst_options{
press_method = CompressionMethod,
press_method = LedgerCompression,
press_level = CompressionLevel,
log_options = leveled_log:get_opts(),
max_sstslots = MaxSSTSlots,
Expand Down
59 changes: 36 additions & 23 deletions src/leveled_codec.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@
accumulate_index/2,
count_tombs/2]).

-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w").
-define(NRT_IDX, "$aae.").

-type tag() ::
leveled_head:object_tag()|?IDX_TAG|?HEAD_TAG|atom().
-type key() ::
Expand Down Expand Up @@ -108,7 +105,7 @@
-type object_spec() ::
object_spec_v0()|object_spec_v1().
-type compression_method() ::
lz4|native|none.
lz4|native|zstd|none.
-type index_specs() ::
list({add|remove, any(), any()}).
-type journal_keychanges() ::
Expand Down Expand Up @@ -489,7 +486,6 @@ get_tagstrategy(Tag, Strategy) ->
to_inkerkey(LedgerKey, SQN) ->
{SQN, ?INKT_STND, LedgerKey}.


-spec to_inkerkv(ledger_key(), non_neg_integer(), any(), journal_keychanges(),
compression_method(), boolean()) -> {journal_key(), any()}.
%% @doc
Expand Down Expand Up @@ -524,7 +520,6 @@ from_inkerkv(Object, ToIgnoreKeyChanges) ->
Object
end.


-spec create_value_for_journal({any(), journal_keychanges()|binary()},
boolean(), compression_method()) -> binary().
%% @doc
Expand All @@ -549,14 +544,14 @@ maybe_compress(JournalBin, PressMethod) ->
<<JBin0:Length0/binary,
KeyChangeLength:32/integer,
Type:8/integer>> = JournalBin,
{IsBinary, IsCompressed, IsLz4} = decode_valuetype(Type),
{IsBinary, IsCompressed, CompMethod} = decode_valuetype(Type),
case IsCompressed of
true ->
JournalBin;
false ->
Length1 = Length0 - KeyChangeLength,
<<OBin2:Length1/binary, KCBin2:KeyChangeLength/binary>> = JBin0,
V0 = {deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4),
V0 = {deserialise_object(OBin2, IsBinary, IsCompressed, CompMethod),
binary_to_term(KCBin2)},
create_value_for_journal(V0, true, PressMethod)
end.
Expand All @@ -568,6 +563,8 @@ serialise_object(Object, true, Method) when is_binary(Object) ->
lz4 ->
{ok, Bin} = lz4:pack(Object),
Bin;
zstd ->
zstd:compress(Object);
native ->
zlib:compress(Object);
none ->
Expand All @@ -590,35 +587,42 @@ revert_value_from_journal(JournalBin, ToIgnoreKeyChanges) ->
<<JBin0:Length0/binary,
KeyChangeLength:32/integer,
Type:8/integer>> = JournalBin,
{IsBinary, IsCompressed, IsLz4} = decode_valuetype(Type),
{IsBinary, IsCompressed, CompMethod} = decode_valuetype(Type),
Length1 = Length0 - KeyChangeLength,
case ToIgnoreKeyChanges of
true ->
<<OBin2:Length1/binary, _KCBin2:KeyChangeLength/binary>> = JBin0,
{deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4),
{deserialise_object(OBin2, IsBinary, IsCompressed, CompMethod),
{[], infinity}};
false ->
<<OBin2:Length1/binary, KCBin2:KeyChangeLength/binary>> = JBin0,
{deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4),
{deserialise_object(OBin2, IsBinary, IsCompressed, CompMethod),
binary_to_term(KCBin2)}
end.

deserialise_object(Binary, true, true, true) ->
deserialise_object(Binary, true, true, lz4) ->
{ok, Deflated} = lz4:unpack(Binary),
Deflated;
deserialise_object(Binary, true, true, false) ->
deserialise_object(Binary, true, true, zstd) ->
zstd:decompress(Binary);
deserialise_object(Binary, true, true, native) ->
zlib:uncompress(Binary);
deserialise_object(Binary, true, false, _IsLz4) ->
deserialise_object(Binary, true, false, _) ->
Binary;
deserialise_object(Binary, false, _, _IsLz4) ->
deserialise_object(Binary, false, _, _) ->
binary_to_term(Binary).

-spec encode_valuetype(boolean(), boolean(), native|lz4|zstd|none) -> 0..15.
%% @doc Note that IsCompressed will be based on the compression_point
%% configuration option when the object is first stored (i.e. only `true` if
%% this is set to `on_receipt`). On compaction this will be set to true.
encode_valuetype(IsBinary, IsCompressed, Method) ->
Bit3 =
{Bit3, Bit4} =
case Method of
lz4 -> 4;
native -> 0;
none -> 0
lz4 -> {4, 0};
zstd -> {4, 8};
native -> {0, 0};
none -> {0, 0}
end,
Bit2 =
case IsBinary of
Expand All @@ -630,17 +634,26 @@ encode_valuetype(IsBinary, IsCompressed, Method) ->
true -> 1;
false -> 0
end,
Bit1 + Bit2 + Bit3.
Bit1 + Bit2 + Bit3 + Bit4.


-spec decode_valuetype(integer()) -> {boolean(), boolean(), boolean()}.
-spec decode_valuetype(integer())
-> {boolean(), boolean(), compression_method()}.
%% @doc
%% Check bit flags to confirm how the object has been serialised
decode_valuetype(TypeInt) ->
IsCompressed = TypeInt band 1 == 1,
IsBinary = TypeInt band 2 == 2,
IsLz4 = TypeInt band 4 == 4,
{IsBinary, IsCompressed, IsLz4}.
CompressionMethod =
case TypeInt band 12 of
0 ->
native;
4 ->
lz4;
12 ->
zstd
end,
{IsBinary, IsCompressed, CompressionMethod}.

-spec from_journalkey(journal_key()) -> {integer(), ledger_key()}.
%% @doc
Expand Down
Loading

0 comments on commit 999ce8b

Please sign in to comment.