From 999ce8ba5b18f84620f1dcd48f9033f9292f2551 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 23 Jan 2024 16:25:03 +0000 Subject: [PATCH] Add ZSTD compression (#430) * Add support for zstd and split compression Add support for using zstd as an alternative to native, lz4. Upgrade lz4 to v1.9.4 (with ARM enhancements). Allow for split compression algorithms - i.e. use native on journal, but lz4 on ledger. * Switch to AdRoll zstd Development appears to be active and ongoing. No issues running on different linux flavours. * Use realistic bucket name * Update README.md * Switch branch * Add comment following review --- README.md | 4 +-- priv/leveled.schema | 16 +++++++-- priv/leveled_multi.schema | 12 ++++++- rebar.config | 3 +- src/leveled.app.src | 1 + src/leveled_bookie.erl | 23 +++++++++---- src/leveled_codec.erl | 59 ++++++++++++++++++++------------- src/leveled_sst.erl | 54 +++++++++++++++++++++--------- test/end_to_end/basic_SUITE.erl | 20 ++++++++--- test/end_to_end/perf_SUITE.erl | 37 ++++++++++++--------- 10 files changed, 156 insertions(+), 73 deletions(-) diff --git a/README.md b/README.md index f71b605e..c85fba13 100644 --- a/README.md +++ b/README.md @@ -78,8 +78,8 @@ In order to contribute to leveled, fork the repository, make a branch for your c To have rebar3 execute the full set of tests, run: -```rebar3 as test do xref, dialyzer, cover --reset, eunit --cover, ct --cover, cover --verbose``` +```./rebar3 do xref, dialyzer, cover --reset, eunit --cover, ct --cover, cover --verbose``` For those with a Quickcheck license, property-based tests can also be run using: -```rebar3 as eqc do eunit --module=leveled_simpleeqc, eunit --module=leveled_statemeqc``` +```./rebar3 as eqc do eunit --module=leveled_simpleeqc, eunit --module=leveled_statemeqc``` diff --git a/priv/leveled.schema b/priv/leveled.schema index 3c404bbe..f27d88e4 100644 --- a/priv/leveled.schema +++ b/priv/leveled.schema @@ -43,13 +43,23 @@ ]}. %% @doc Compression method -%% Can be lz4 or native (which will use the Erlang native zlib compression) -%% within term_to_binary +%% Can be lz4, zstd or native (which will use the Erlang native zlib +%% compression) within term_to_binary {mapping, "leveled.compression_method", "leveled.compression_method", [ - {datatype, {enum, [native, lz4, none]}}, + {datatype, {enum, [native, lz4, zstd, none]}}, {default, native} ]}. +%% @doc Ledger compression +%% If an alternative compression method is preferred specifically for the +%% ledger, it can be specified here. Default is as_store - use whatever method +%% has been defined in leveled.compression.method. Alternatives are native, +%% lz4, ztsd and none +{mapping, "leveled.ledger_compression", "leveled.ledger_compression", [ + {datatype, {enum, [as_store, native, lz4, zstd, none]}}, + {default, as_store} +]}. + %% @doc Compression point %% The point at which compression is applied to the Journal (the Ledger is %% always compressed). Use on_receipt or on_compact. on_compact is suitable diff --git a/priv/leveled_multi.schema b/priv/leveled_multi.schema index fb589eff..06d4d564 100644 --- a/priv/leveled_multi.schema +++ b/priv/leveled_multi.schema @@ -39,11 +39,21 @@ %% Can be lz4 or native (which will use the Erlang native zlib compression) %% within term_to_binary {mapping, "multi_backend.$name.leveled.compression_method", "riak_kv.multi_backend", [ - {datatype, {enum, [native, lz4, none]}}, + {datatype, {enum, [native, lz4, zstd, none]}}, {default, native}, hidden ]}. +%% @doc Ledger compression +%% If an alternative compression method is preferred specifically for the +%% ledger, it can be specified here. Default is as_store - use whatever method +%% has been defined in leveled.compression.method. Alternatives are native, +%% lz4, ztsd and none +{mapping, "multi_backend.$name.ledger_compression", "riak_kv.multi_backend", [ + {datatype, {enum, [as_store, native, lz4, zstd, none]}}, + {default, as_store} +]}. + %% @doc Compression point %% The point at which compression is applied to the Journal (the Ledger is %% always compressed). Use on_receipt or on_compact. on_compact is suitable diff --git a/rebar.config b/rebar.config index 2a55d968..8977b9c8 100644 --- a/rebar.config +++ b/rebar.config @@ -25,7 +25,8 @@ ]}. {deps, [ - {lz4, ".*", {git, "https://github.com/martinsumner/erlang-lz4", {tag, "0.2.5"}}} + {lz4, ".*", {git, "https://github.com/martinsumner/erlang-lz4", {branch, "develop-3.1"}}}, + {zstd, ".*", {git, "https://github.com/nhs-riak/zstd-erlang", {branch, "nhse-develop"}}} ]}. {ct_opts, [{dir, ["test/end_to_end"]}]}. diff --git a/src/leveled.app.src b/src/leveled.app.src index 1e42316e..3b0c0916 100644 --- a/src/leveled.app.src +++ b/src/leveled.app.src @@ -5,6 +5,7 @@ {registered, []}, {applications, [ lz4, + zstd, kernel, stdlib ]}, diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 5af6fd87..429480a0 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -125,6 +125,7 @@ {max_pencillercachesize, ?MAX_PCL_CACHE_SIZE}, {ledger_preloadpagecache_level, ?SST_PAGECACHELEVEL_LOOKUP}, {compression_method, ?COMPRESSION_METHOD}, + {ledger_compression, as_store}, {compression_point, ?COMPRESSION_POINT}, {compression_level, ?COMPRESSION_LEVEL}, {log_level, ?LOG_LEVEL}, @@ -292,13 +293,15 @@ % To which level of the ledger should the ledger contents be % pre-loaded into the pagecache (using fadvise on creation and % startup) - {compression_method, native|lz4|none} | + {compression_method, native|lz4|zstd|none} | % Compression method and point allow Leveled to be switched from % using bif based compression (zlib) to using nif based compression - % (lz4). To disable compression use none. This will disable in - % the ledger as well as the journla (both on_receipt and - % on_compact). + % (lz4 or zstd). % Defaults to ?COMPRESSION_METHOD + {ledger_compression, as_store|native|lz4|zstd|none} | + % Define an alternative to the compression method to be used by the + % ledger only. Default is as_store - use the method defined as + % compression_method for the whole store {compression_point, on_compact|on_receipt} | % The =compression point can be changed between on_receipt (all % values are compressed as they are received), to on_compact where @@ -1812,6 +1815,14 @@ set_options(Opts, Monitor) -> true = SFL_CompPerc >= 0.0, CompressionMethod = proplists:get_value(compression_method, Opts), + JournalCompression = CompressionMethod, + LedgerCompression = + case proplists:get_value(ledger_compression, Opts) of + as_store -> + CompressionMethod; + AltMethod -> + AltMethod + end, CompressOnReceipt = case proplists:get_value(compression_point, Opts) of on_receipt -> @@ -1835,7 +1846,7 @@ set_options(Opts, Monitor) -> maxrunlength_compactionperc = MRL_CompPerc, waste_retention_period = WRP, snaptimeout_long = SnapTimeoutLong, - compression_method = CompressionMethod, + compression_method = JournalCompression, compress_on_receipt = CompressOnReceipt, score_onein = ScoreOneIn, cdb_options = @@ -1854,7 +1865,7 @@ set_options(Opts, Monitor) -> snaptimeout_long = SnapTimeoutLong, sst_options = #sst_options{ - press_method = CompressionMethod, + press_method = LedgerCompression, press_level = CompressionLevel, log_options = leveled_log:get_opts(), max_sstslots = MaxSSTSlots, diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 1c92344b..633b1e74 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -52,9 +52,6 @@ accumulate_index/2, count_tombs/2]). --define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w"). --define(NRT_IDX, "$aae."). - -type tag() :: leveled_head:object_tag()|?IDX_TAG|?HEAD_TAG|atom(). -type key() :: @@ -108,7 +105,7 @@ -type object_spec() :: object_spec_v0()|object_spec_v1(). -type compression_method() :: - lz4|native|none. + lz4|native|zstd|none. -type index_specs() :: list({add|remove, any(), any()}). -type journal_keychanges() :: @@ -489,7 +486,6 @@ get_tagstrategy(Tag, Strategy) -> to_inkerkey(LedgerKey, SQN) -> {SQN, ?INKT_STND, LedgerKey}. - -spec to_inkerkv(ledger_key(), non_neg_integer(), any(), journal_keychanges(), compression_method(), boolean()) -> {journal_key(), any()}. %% @doc @@ -524,7 +520,6 @@ from_inkerkv(Object, ToIgnoreKeyChanges) -> Object end. - -spec create_value_for_journal({any(), journal_keychanges()|binary()}, boolean(), compression_method()) -> binary(). %% @doc @@ -549,14 +544,14 @@ maybe_compress(JournalBin, PressMethod) -> <> = JournalBin, - {IsBinary, IsCompressed, IsLz4} = decode_valuetype(Type), + {IsBinary, IsCompressed, CompMethod} = decode_valuetype(Type), case IsCompressed of true -> JournalBin; false -> Length1 = Length0 - KeyChangeLength, <> = JBin0, - V0 = {deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4), + V0 = {deserialise_object(OBin2, IsBinary, IsCompressed, CompMethod), binary_to_term(KCBin2)}, create_value_for_journal(V0, true, PressMethod) end. @@ -568,6 +563,8 @@ serialise_object(Object, true, Method) when is_binary(Object) -> lz4 -> {ok, Bin} = lz4:pack(Object), Bin; + zstd -> + zstd:compress(Object); native -> zlib:compress(Object); none -> @@ -590,35 +587,42 @@ revert_value_from_journal(JournalBin, ToIgnoreKeyChanges) -> <> = JournalBin, - {IsBinary, IsCompressed, IsLz4} = decode_valuetype(Type), + {IsBinary, IsCompressed, CompMethod} = decode_valuetype(Type), Length1 = Length0 - KeyChangeLength, case ToIgnoreKeyChanges of true -> <> = JBin0, - {deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4), + {deserialise_object(OBin2, IsBinary, IsCompressed, CompMethod), {[], infinity}}; false -> <> = JBin0, - {deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4), + {deserialise_object(OBin2, IsBinary, IsCompressed, CompMethod), binary_to_term(KCBin2)} end. -deserialise_object(Binary, true, true, true) -> +deserialise_object(Binary, true, true, lz4) -> {ok, Deflated} = lz4:unpack(Binary), Deflated; -deserialise_object(Binary, true, true, false) -> +deserialise_object(Binary, true, true, zstd) -> + zstd:decompress(Binary); +deserialise_object(Binary, true, true, native) -> zlib:uncompress(Binary); -deserialise_object(Binary, true, false, _IsLz4) -> +deserialise_object(Binary, true, false, _) -> Binary; -deserialise_object(Binary, false, _, _IsLz4) -> +deserialise_object(Binary, false, _, _) -> binary_to_term(Binary). +-spec encode_valuetype(boolean(), boolean(), native|lz4|zstd|none) -> 0..15. +%% @doc Note that IsCompressed will be based on the compression_point +%% configuration option when the object is first stored (i.e. only `true` if +%% this is set to `on_receipt`). On compaction this will be set to true. encode_valuetype(IsBinary, IsCompressed, Method) -> - Bit3 = + {Bit3, Bit4} = case Method of - lz4 -> 4; - native -> 0; - none -> 0 + lz4 -> {4, 0}; + zstd -> {4, 8}; + native -> {0, 0}; + none -> {0, 0} end, Bit2 = case IsBinary of @@ -630,17 +634,26 @@ encode_valuetype(IsBinary, IsCompressed, Method) -> true -> 1; false -> 0 end, - Bit1 + Bit2 + Bit3. + Bit1 + Bit2 + Bit3 + Bit4. --spec decode_valuetype(integer()) -> {boolean(), boolean(), boolean()}. +-spec decode_valuetype(integer()) + -> {boolean(), boolean(), compression_method()}. %% @doc %% Check bit flags to confirm how the object has been serialised decode_valuetype(TypeInt) -> IsCompressed = TypeInt band 1 == 1, IsBinary = TypeInt band 2 == 2, - IsLz4 = TypeInt band 4 == 4, - {IsBinary, IsCompressed, IsLz4}. + CompressionMethod = + case TypeInt band 12 of + 0 -> + native; + 4 -> + lz4; + 12 -> + zstd + end, + {IsBinary, IsCompressed, CompressionMethod}. -spec from_journalkey(journal_key()) -> {integer(), ledger_key()}. %% @doc diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index b9a705bc..9f55b332 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -151,7 +151,7 @@ -type slot_index_value() :: #slot_index_value{}. -type press_method() - :: lz4|native|none. + :: lz4|native|zstd|none. -type range_endpoint() :: all|leveled_codec:ledger_key(). -type slot_pointer() @@ -1489,14 +1489,15 @@ read_file(Filename, State, LoadPageCache) -> Bloom}. gen_fileversion(PressMethod, IdxModDate, CountOfTombs) -> - % Native or none can be treated the same once written, as reader - % does not need to know as compression info will be in header of the + % Native or none can be treated the same once written, as reader + % does not need to know as compression info will be in header of the % block - Bit1 = - case PressMethod of + Bit1 = + case PressMethod of lz4 -> 1; native -> 0; - none -> 0 + none -> 0; + zstd -> 0 end, Bit2 = case IdxModDate of @@ -1505,18 +1506,25 @@ gen_fileversion(PressMethod, IdxModDate, CountOfTombs) -> false -> 0 end, - Bit3 = + Bit3 = case CountOfTombs of not_counted -> 0; _ -> 4 end, - Bit1 + Bit2 + Bit3. + Bit4 = + case PressMethod of + zstd -> + 8; + _ -> + 0 + end, + Bit1 + Bit2 + Bit3 + Bit4. imp_fileversion(VersionInt, State) -> - UpdState0 = - case VersionInt band 1 of + UpdState0 = + case VersionInt band 1 of 0 -> State#state{compression_method = native}; 1 -> @@ -1529,11 +1537,18 @@ imp_fileversion(VersionInt, State) -> 2 -> UpdState0#state{index_moddate = true} end, - case VersionInt band 4 of - 0 -> - UpdState1; - 4 -> - UpdState1#state{tomb_count = 0} + UpdState2 = + case VersionInt band 4 of + 0 -> + UpdState1; + 4 -> + UpdState1#state{tomb_count = 0} + end, + case VersionInt band 8 of + 0 -> + UpdState2; + 8 -> + UpdState2#state{compression_method = zstd} end. open_reader(Filename, LoadPageCache) -> @@ -1658,12 +1673,15 @@ serialise_block(Term, native) -> Bin = term_to_binary(Term, ?BINARY_SETTINGS), CRC32 = hmac(Bin), <>; +serialise_block(Term, zstd) -> + Bin = zstd:compress(term_to_binary(Term)), + CRC32 = hmac(Bin), + <>; serialise_block(Term, none) -> Bin = term_to_binary(Term), CRC32 = hmac(Bin), <>. - -spec deserialise_block(binary(), press_method()) -> any(). %% @doc %% Convert binary to term @@ -1686,6 +1704,8 @@ deserialise_block(_Bin, _PM) -> deserialise_checkedblock(Bin, lz4) -> {ok, Bin0} = lz4:unpack(Bin), binary_to_term(Bin0); +deserialise_checkedblock(Bin, zstd) -> + binary_to_term(zstd:decompress(Bin)); deserialise_checkedblock(Bin, _Other) -> % native or none can be treated the same binary_to_term(Bin). @@ -4207,6 +4227,7 @@ stop_whenstarter_stopped_testto() -> corrupted_block_range_test() -> corrupted_block_rangetester(native, 100), corrupted_block_rangetester(lz4, 100), + corrupted_block_rangetester(zstd, 100), corrupted_block_rangetester(none, 100). corrupted_block_rangetester(PressMethod, TestCount) -> @@ -4251,6 +4272,7 @@ corrupted_block_rangetester(PressMethod, TestCount) -> corrupted_block_fetch_test() -> corrupted_block_fetch_tester(native), corrupted_block_fetch_tester(lz4), + corrupted_block_fetch_tester(zstd), corrupted_block_fetch_tester(none). corrupted_block_fetch_tester(PressMethod) -> diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 56d5657e..41b106c9 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -1036,25 +1036,36 @@ remove_journal_test(_Config) -> ok = leveled_bookie:book_destroy(Bookie3). - many_put_fetch_switchcompression(_Config) -> + {T0, ok} = + timer:tc(fun many_put_fetch_switchcompression_tester/1, [native]), + {T1, ok} = + timer:tc(fun many_put_fetch_switchcompression_tester/1, [lz4]), + {T2, ok} = + timer:tc(fun many_put_fetch_switchcompression_tester/1, [zstd]), + io:format("Test timings native=~w lz4=~w, zstd=~w", [T0, T1, T2]). + +many_put_fetch_switchcompression_tester(CompressionMethod) -> RootPath = testutil:reset_filestructure(), StartOpts1 = [{root_path, RootPath}, {max_pencillercachesize, 16000}, {max_journalobjectcount, 30000}, {compression_level, 3}, {sync_strategy, testutil:sync_strategy()}, - {compression_method, native}], + {compression_method, native}, + {ledger_compression, none}], StartOpts2 = [{root_path, RootPath}, {max_pencillercachesize, 24000}, {max_journalobjectcount, 30000}, {sync_strategy, testutil:sync_strategy()}, - {compression_method, lz4}], + {compression_method, CompressionMethod}, + {ledger_compression, as_store}], StartOpts3 = [{root_path, RootPath}, {max_pencillercachesize, 16000}, {max_journalobjectcount, 30000}, {sync_strategy, testutil:sync_strategy()}, - {compression_method, none}], + {compression_method, none}, + {ledger_compression, as_store}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), @@ -1173,7 +1184,6 @@ many_put_fetch_switchcompression(_Config) -> ok = leveled_bookie:book_destroy(Bookie6). - safereaderror_startup(_Config) -> RootPath = testutil:reset_filestructure(), StartOpts1 = [{root_path, RootPath}, diff --git a/test/end_to_end/perf_SUITE.erl b/test/end_to_end/perf_SUITE.erl index 1394f0a1..9a3fabd1 100644 --- a/test/end_to_end/perf_SUITE.erl +++ b/test/end_to_end/perf_SUITE.erl @@ -8,41 +8,45 @@ all() -> [riak_ctperf]. suite() -> [{timetrap, {hours, 16}}]. - -% For full performance test riak_fullperf(_Config) -> - R2A = riak_load_tester(<<"B0">>, 2000000, 2048, [], native), + riak_fullperf(2048, zstd, as_store). + +riak_fullperf(ObjSize, PM, LC) -> + Bucket = {<<"SensibleBucketTypeName">>, <<"SensibleBucketName0">>}, + R2A = riak_load_tester(Bucket, 2000000, ObjSize, [], PM, LC), output_result(R2A), - R2B = riak_load_tester(<<"B0">>, 2000000, 2048, [], native), + R2B = riak_load_tester(Bucket, 2000000, ObjSize, [], PM, LC), output_result(R2B), - R2C = riak_load_tester(<<"B0">>, 2000000, 2048, [], native), + R2C = riak_load_tester(Bucket, 2000000, ObjSize, [], PM, LC), output_result(R2C), - R5A = riak_load_tester(<<"B0">>, 5000000, 2048, [], native), + R5A = riak_load_tester(Bucket, 5000000, ObjSize, [], PM, LC), output_result(R5A), - R5B = riak_load_tester(<<"B0">>, 5000000, 2048, [], native), + R5B = riak_load_tester(Bucket, 5000000, ObjSize, [], PM, LC), output_result(R5B), - R10 = riak_load_tester(<<"B0">>, 10000000, 2048, [], native), + R10 = riak_load_tester(Bucket, 10000000, ObjSize, [], PM, LC), output_result(R10) . riak_profileperf(_Config) -> riak_load_tester( - <<"B0">>, + {<<"SensibleBucketTypeName">>, <<"SensibleBucketName0">>}, 2000000, 2048, [load, head, get, query, mini_query, full, guess, estimate, update], - native). + zstd, + as_store + ). % For standard ct test runs riak_ctperf(_Config) -> - riak_load_tester(<<"B0">>, 400000, 1024, [], native). + riak_load_tester(<<"B0">>, 400000, 1024, [], native, as_store). -riak_load_tester(Bucket, KeyCount, ObjSize, ProfileList, PressMethod) -> +riak_load_tester(Bucket, KeyCount, ObjSize, ProfileList, PM, LC) -> ct:log( ?INFO, - "Basic riak test with KeyCount ~w ObjSize ~w", - [KeyCount, ObjSize] + "Basic riak test with KeyCount ~w ObjSize ~w PressMethod ~w Ledger ~w", + [KeyCount, ObjSize, PM, LC] ), IndexCount = 100000, @@ -55,7 +59,8 @@ riak_load_tester(Bucket, KeyCount, ObjSize, ProfileList, PressMethod) -> [{root_path, RootPath}, {sync_strategy, testutil:sync_strategy()}, {log_level, warn}, - {compression_method, PressMethod}, + {compression_method, PM}, + {ledger_compression, LC}, {forced_logs, [b0015, b0016, b0017, b0018, p0032, sst12]} ], @@ -175,7 +180,7 @@ riak_load_tester(Bucket, KeyCount, ObjSize, ProfileList, PressMethod) -> {_Inker, _Pcl, SSTPids, _PClerk, CDBPids, _IClerk} = get_pids(Bookie1), leveled_bookie:book_destroy(Bookie1), - {KeyCount, ObjSize, PressMethod, + {KeyCount, ObjSize, {PM, LC}, TotalLoadTime, TotalHeadTime, TotalGetTime, TotalQueryTime, TotalMiniQueryTime, FullFoldTime, SegFoldTime,