-
Notifications
You must be signed in to change notification settings - Fork 34
/
leveled_bookie.erl
3608 lines (3334 loc) · 140 KB
/
leveled_bookie.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
%% -------- Overview ---------
%%
%% Leveled is based on the LSM-tree similar to leveldb, except that:
%% - Keys, Metadata and Values are not persisted together - the Keys and
%% Metadata are kept in a tree-based ledger, whereas the values are stored
%% only in a sequential Journal.
%% - Different file formats are used for Journal (based on DJ Bernstein
%% constant database), and the ledger (based on sst)
%% - It is not intended to be general purpose, but be primarily suited for
%% use as a Riak backend in specific circumstances (relatively large values,
%% and frequent use of iterators)
%% - The Journal is an extended nursery log in leveldb terms. It is keyed
%% on the sequence number of the write
%% - The ledger is a merge tree, where the key is the actual object key, and
%% the value is the metadata of the object including the sequence number
%%
%%
%% -------- Actors ---------
%%
%% The store is fronted by a Bookie, who takes support from different actors:
%% - An Inker who persists new data into the journal, and returns items from
%% the journal based on sequence number
%% - A Penciller who periodically redraws the ledger, that associates keys with
%% sequence numbers and other metadata, as well as secondary keys (for index
%% queries)
%% - One or more Clerks, who may be used by either the inker or the penciller
%% to fulfill background tasks
%%
%% Both the Inker and the Penciller maintain a manifest of the files which
%% represent the current state of the Journal and the Ledger repsectively.
%% For the Inker the manifest maps ranges of sequence numbers to cdb files.
%% For the Penciller the manifest maps key ranges to files at each level of
%% the Ledger.
%%
-module(leveled_bookie).
-behaviour(gen_server).
-include("leveled.hrl").
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3,
book_start/1,
book_start/4,
book_plainstart/1,
book_put/5,
book_put/6,
book_put/8,
book_tempput/7,
book_mput/2,
book_mput/3,
book_delete/4,
book_get/3,
book_get/4,
book_head/3,
book_head/4,
book_sqn/3,
book_sqn/4,
book_headonly/4,
book_snapshot/4,
book_compactjournal/2,
book_islastcompactionpending/1,
book_trimjournal/1,
book_hotbackup/1,
book_close/1,
book_destroy/1,
book_isempty/2,
book_logsettings/1,
book_loglevel/2,
book_addlogs/2,
book_removelogs/2,
book_headstatus/1
]).
%% folding API
-export([
book_returnfolder/2,
book_indexfold/5,
book_bucketlist/4,
book_keylist/3,
book_keylist/4,
book_keylist/5,
book_keylist/6,
book_objectfold/4,
book_objectfold/5,
book_objectfold/6,
book_headfold/6,
book_headfold/7,
book_headfold/9
]).
-export([empty_ledgercache/0,
snapshot_store/7,
fetch_value/2,
journal_notfound/4]).
-ifdef(TEST).
-export([book_returnactors/1]).
-endif.
-define(DUMMY, dummy). % Dummy key used for mput operations
-define(OPTION_DEFAULTS,
[{root_path, undefined},
{snapshot_bookie, undefined},
{cache_size, ?CACHE_SIZE},
{cache_multiple, ?MAX_CACHE_MULTTIPLE},
{max_journalsize, 1000000000},
{max_journalobjectcount, 200000},
{max_sstslots, 256},
{max_mergebelow, 24},
{sync_strategy, ?DEFAULT_SYNC_STRATEGY},
{head_only, false},
{waste_retention_period, undefined},
{max_run_length, undefined},
{singlefile_compactionpercentage, 30.0},
{maxrunlength_compactionpercentage, 70.0},
{journalcompaction_scoreonein, 1},
{reload_strategy, []},
{max_pencillercachesize, ?MAX_PCL_CACHE_SIZE},
{ledger_preloadpagecache_level, ?SST_PAGECACHELEVEL_LOOKUP},
{compression_method, ?COMPRESSION_METHOD},
{ledger_compression, as_store},
{compression_point, ?COMPRESSION_POINT},
{compression_level, ?COMPRESSION_LEVEL},
{log_level, ?LOG_LEVEL},
{forced_logs, []},
{database_id, ?DEFAULT_DBID},
{override_functions, []},
{snapshot_timeout_short, ?SNAPTIMEOUT_SHORT},
{snapshot_timeout_long, ?SNAPTIMEOUT_LONG},
{stats_percentage, ?DEFAULT_STATS_PERC},
{stats_logfrequency,
element(1, leveled_monitor:get_defaults())},
{monitor_loglist,
element(2, leveled_monitor:get_defaults())}]).
-record(ledger_cache, {mem :: ets:tab(),
loader = leveled_tree:empty(?CACHE_TYPE)
:: tuple()|empty_cache,
load_queue = [] :: list(),
index = leveled_pmem:new_index(),
min_sqn = infinity :: integer()|infinity,
max_sqn = 0 :: integer()}).
-record(state, {inker :: pid() | null,
penciller :: pid() | undefined,
cache_size :: pos_integer() | undefined,
cache_multiple :: pos_integer() | undefined,
ledger_cache = #ledger_cache{} :: ledger_cache(),
is_snapshot :: boolean() | undefined,
slow_offer = false :: boolean(),
head_only = false :: boolean(),
head_lookup = true :: boolean(),
ink_checking = ?MAX_KEYCHECK_FREQUENCY :: integer(),
bookie_monref :: reference() | undefined,
monitor = {no_monitor, 0} :: leveled_monitor:monitor()}).
-type book_state() :: #state{}.
-type sync_mode() :: sync|none|riak_sync.
-type ledger_cache() :: #ledger_cache{}.
-type open_options() ::
%% For full description of options see ../docs/STARTUP_OPTIONS.md
[{root_path, string()|undefined} |
% Folder to be used as the root path for storing all the database
% information. Should be undefined is snapshot_bookie is a pid()
% TODO: Some sort of split root path to allow for mixed classes of
% storage (e.g. like eleveldb tiered storage - only with
% separation between ledger and non-current journal)
{snapshot_bookie, undefined|pid()} |
% Is the bookie being started required to a be a snapshot of an
% existing bookie, rather than a new bookie. The bookie to be
% snapped should have its pid passed as the startup option in this
% case
{cache_size, pos_integer()} |
% The size of the Bookie's memory, the cache of the recent
% additions to the ledger. Defaults to ?CACHE_SIZE, plus some
% randomised jitter (randomised jitter will still be added to
% configured values)
% The minimum value is 100 - any lower value will be ignored
{cache_multiple, pos_integer()} |
% A multiple of the cache size beyond which the cache should not
% grow even if the penciller is busy. A pasue will be returned for
% every PUT when this multiple of the cache_size is reached
{max_journalsize, pos_integer()} |
% The maximum size of a journal file in bytes. The absolute
% maximum must be 4GB due to 4 byte file pointers being used
{max_journalobjectcount, pos_integer()} |
% The maximum size of the journal by count of the objects. The
% journal must remain within the limit set by both this figures and
% the max_journalsize
{max_sstslots, pos_integer()} |
% The maximum number of slots in a SST file. All testing is done
% at a size of 256 (except for Quickcheck tests}, altering this
% value is not recommended
{max_mergeblow, pos_integer()|infinity} |
% The maximum number of files for a single file to be merged into
% within the ledger. If less than this, the merge will continue
% without a maximum. If this or more overlapping below, only up
% to max_mergebelow div 2 additions should be created (the merge
% should be partial)
{sync_strategy, sync_mode()} |
% Should be sync if it is necessary to flush to disk after every
% write, or none if not (allow the OS to schecdule). This has a
% significant impact on performance which can be mitigated
% partially in hardware (e.g through use of FBWC).
% riak_sync is used for backwards compatability with OTP16 - and
% will manually call sync() after each write (rather than use the
% O_SYNC option on startup)
{head_only, false|with_lookup|no_lookup} |
% When set to true, there are three fundamental changes as to how
% leveled will work:
% - Compaction of the journalwill be managed by simply removing any
% journal file thathas a highest sequence number persisted to the
% ledger;
% - GETs are not supported, only head requests;
% - PUTs should arrive batched object specs using the book_mput/2
% function.
% head_only mode is disabled with false (default). There are two
% different modes in which head_only can run with_lookup or
% no_lookup and heaD_only mode is enabled by passing one of these
% atoms:
% - with_lookup assumes that individual objects may need to be
% fetched;
% - no_lookup prevents individual objects from being fetched, so
% that the store can only be used for folds (without segment list
% acceleration)
{waste_retention_period, undefined|pos_integer()} |
% If a value is not required in the journal (i.e. it has been
% replaced and is now to be removed for compaction) for how long
% should it be retained. For example should it be kept for a
% period until the operator cna be sure a backup has been
% completed?
% If undefined, will not retian waste, otherwise the period is the
% number of seconds to wait
{max_run_length, undefined|pos_integer()} |
% The maximum number of consecutive files that can be compacted in
% one compaction operation.
% Defaults to leveled_iclerk:?MAX_COMPACTION_RUN (if undefined)
{singlefile_compactionpercentage, float()} |
% What is the percentage of space to be recovered from compacting
% a single file, before that file can be a compaction candidate in
% a compaction run of length 1
{maxrunlength_compactionpercentage, float()} |
% What is the percentage of space to be recovered from compacting
% a run of max_run_length, before that run can be a compaction
% candidate. For runs between 1 and max_run_length, a
% proportionate score is calculated
{journalcompaction_scoreonein, pos_integer()} |
% When scoring for compaction run a probability (1 in x) of whether
% any file will be scored this run. If not scored a cached score
% will be used, and the cached score is the average of the latest
% score and the rolling average of previous scores
{reload_strategy, list()} |
% The reload_strategy is exposed as an option as currently no firm
% decision has been made about how recovery from failure should
% work. For instance if we were to trust everything as permanent
% in the Ledger once it is persisted, then there would be no need
% to retain a skinny history of key changes in the Journal after
% compaction. If, as an alternative we assume the Ledger is never
% permanent, and retain the skinny hisory - then backups need only
% be made against the Journal. The skinny history of key changes
% is primarily related to the issue of supporting secondary indexes
% in Riak.
%
% These two strategies are referred to as recovr (assume we can
% recover any deltas from a lost ledger and a lost history through
% resilience outside of the store), or retain (retain a history of
% key changes, even when the object value has been compacted).
%
% There is a third strategy, which is recalc, where on reloading
% the Ledger from the Journal, the key changes are recalculated by
% comparing the extracted metadata from the Journal object, with the
% extracted metadata from the current Ledger object it is set to
% replace (should one be present). Implementing the recalc
% strategy requires a override function for
% `leveled_head:diff_indexspecs/3`.
% A function for the ?RIAK_TAG is provided and tested.
%
% reload_strategy options are a list - to map from a tag to the
% strategy (recovr|retain|recalc). Defualt strategies are:
% [{?RIAK_TAG, retain}, {?STD_TAG, retain}]
{max_pencillercachesize, pos_integer()|undefined} |
% How many ledger keys should the penciller retain in memory
% between flushing new level zero files.
% Defaults to ?MAX_PCL_CACHE_SIZE when undefined
% The minimum size 400 - attempt to set this vlaue lower will be
% ignored. As a rule the value should be at least 4 x the Bookie's
% cache size
{ledger_preloadpagecache_level, pos_integer()} |
% To which level of the ledger should the ledger contents be
% pre-loaded into the pagecache (using fadvise on creation and
% startup)
{compression_method, native|lz4|zstd|none} |
% Compression method and point allow Leveled to be switched from
% using bif based compression (zlib) to using nif based compression
% (lz4 or zstd).
% Defaults to ?COMPRESSION_METHOD
{ledger_compression, as_store|native|lz4|zstd|none} |
% Define an alternative to the compression method to be used by the
% ledger only. Default is as_store - use the method defined as
% compression_method for the whole store
{compression_point, on_compact|on_receipt} |
% The =compression point can be changed between on_receipt (all
% values are compressed as they are received), to on_compact where
% values are originally stored uncompressed (speeding PUT times),
% and are only compressed when they are first subject to compaction
% Defaults to ?COMPRESSION_POINT
{compression_level, 0..7} |
% At what level of the LSM tree in the ledger should compression be
% enabled.
% Defaults to ?COMPRESSION_LEVEL
{log_level, debug|info|warn|error|critical} |
% Set the log level. The default log_level of info is noisy - the
% current implementation was targetted at environments that have
% facilities to index large proportions of logs and allow for
% dynamic querying of those indexes to output relevant stats.
%
% As an alternative a higher log_level can be used to reduce this
% 'noise', however, there is currently no separate stats facility
% to gather relevant information outside of info level logs. So
% moving to higher log levels will at present make the operator
% blind to sample performance statistics of leveled sub-components
% etc
{forced_logs, list(atom())} |
% Forced logs allow for specific info level logs, such as those
% logging stats to be logged even when the default log level has
% been set to a higher log level. Using:
% {forced_logs,
% [b0015, b0016, b0017, b0018, p0032, sst12]}
% Will log all timing points even when log_level is not set to
% support info
{database_id, non_neg_integer()} |
% Integer database ID to be used in logs
{override_functions, list(leveled_head:appdefinable_function_tuple())} |
% Provide a list of override functions that will be used for
% user-defined tags
{snapshot_timeout_short, pos_integer()} |
% Time in seconds before a snapshot that has not been shutdown is
% assumed to have failed, and so requires to be torndown. The
% short timeout is applied to queries where long_running is set to
% false
{snapshot_timeout_long, pos_integer()} |
% Time in seconds before a snapshot that has not been shutdown is
% assumed to have failed, and so requires to be torndown. The
% short timeout is applied to queries where long_running is set to
% true
{stats_percentage, 0..100} |
% Probability that stats will be collected for an individual
% request.
{stats_logfrequency, pos_integer()} |
% Time in seconds before logging the next timing log. This covers
% the logs associated with the timing of GET/PUTs in various parts
% of the system. There are 7 such logs - so setting to 30s will
% mean that each inidividual log will occur every 210s
{monitor_loglist, list(leveled_monitor:log_type())}
].
-type load_item() ::
{
leveled_codec:journal_key_tag()|null,
leveled_codec:primary_key()|?DUMMY,
leveled_codec:sqn(),
dynamic(),
leveled_codec:journal_keychanges(),
integer()
}.
-type initial_loadfun() ::
fun((leveled_codec:journal_key(),
dynamic(),
non_neg_integer(),
{non_neg_integer(), non_neg_integer(), list(load_item())},
fun((any()) -> {binary(), non_neg_integer()})) ->
{loop|stop,
{
non_neg_integer(),
non_neg_integer(),
list(load_item())
}
}
).
-export_type([initial_loadfun/0, ledger_cache/0]).
%%%============================================================================
%%% API
%%%============================================================================
-spec book_start(string(), integer(), integer(), sync_mode()) -> {ok, pid()}.
%% @doc Start a Leveled Key/Value store - limited options support.
%%
%% The most common startup parameters are extracted out from the options to
%% provide this startup method. This will start a KV store from the previous
%% store at root path - or an empty one if there is no store at the path.
%%
%% Fiddling with the LedgerCacheSize and JournalSize may improve performance,
%% but these are primarily exposed to support special situations (e.g. very
%% low memory installations), there should not be huge variance in outcomes
%% from modifying these numbers.
%%
%% The sync_strategy determines if the store is going to flush writes to disk
%% before returning an ack. There are three settings currrently supported:
%% - sync - sync to disk by passing the sync flag to the file writer (only
%% works in OTP 18)
%% - riak_sync - sync to disk by explicitly calling data_sync after the write
%% - none - leave it to the operating system to control flushing
%%
%% On startup the Bookie must restart both the Inker to load the Journal, and
%% the Penciller to load the Ledger. Once the Penciller has started, the
%% Bookie should request the highest sequence number in the Ledger, and then
%% and try and rebuild any missing information from the Journal.
%%
%% To rebuild the Ledger it requests the Inker to scan over the files from
%% the sequence number and re-generate the Ledger changes - pushing the changes
%% directly back into the Ledger.
book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) ->
book_start(set_defaults([{root_path, RootPath},
{cache_size, LedgerCacheSize},
{max_journalsize, JournalSize},
{sync_strategy, SyncStrategy}])).
-spec book_start(list(tuple())) -> {ok, pid()}.
%% @doc Start a Leveled Key/Value store - full options support.
%%
%% For full description of options see ../docs/STARTUP_OPTIONS.md and also
%% comments on the open_options() type
book_start(Opts) ->
{ok, Bookie} =
gen_server:start_link(?MODULE, [set_defaults(Opts)], []),
{ok, Bookie}.
-spec book_plainstart(list(tuple())) -> {ok, pid()}.
%% @doc
%% Start used in tests to start without linking
book_plainstart(Opts) ->
{ok, Bookie} =
gen_server:start(?MODULE, [set_defaults(Opts)], []),
{ok, Bookie}.
-spec book_tempput(pid(), leveled_codec:key(), leveled_codec:key(), any(),
leveled_codec:index_specs(),
leveled_codec:tag(), integer()) -> ok|pause.
%% @doc Put an object with an expiry time
%%
%% Put an item in the store but with a Time To Live - the time when the object
%% should expire, in gregorian_seconds (add the required number of seconds to
%% leveled_util:integer_time/1).
%%
%% There exists the possibility of per object expiry times, not just whole
%% store expiry times as has traditionally been the feature in Riak. Care
%% will need to be taken if implementing per-object times about the choice of
%% reload_strategy. If expired objects are to be compacted entirely, then the
%% history of KeyChanges will be lost on reload.
book_tempput(
Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) when is_integer(TTL) ->
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL).
%% @doc - Standard PUT
%%
%% A PUT request consists of
%% - A Primary Key and a Value
%% - IndexSpecs - a set of secondary key changes associated with the
%% transaction
%% - A tag indicating the type of object. Behaviour for metadata extraction,
%% and ledger compaction will vary by type. There are three currently
%% implemented types i (Index), o (Standard), o_rkv (Riak). Keys added with
%% Index tags are not fetchable (as they will not be hashed), but are
%% extractable via range query.
%%
%% The extended-arity book_put functions support the addition of an object
%% TTL and a `sync` boolean to flush this PUT (and any other buffered PUTs to
%% disk when the sync_stategy is `none`.
%%
%% The Bookie takes the request and passes it first to the Inker to add the
%% request to the journal.
%%
%% The inker will pass the PK/Value/IndexSpecs to the current (append only)
%% CDB journal file to persist the change. The call should return either 'ok'
%% or 'roll'. 'roll' indicates that the CDB file has insufficient capacity for
%% this write, and a new journal file should be created (with appropriate
%% manifest changes to be made).
%%
%% The inker will return the SQN which the change has been made at, as well as
%% the object size on disk within the Journal.
%%
%% Once the object has been persisted to the Journal, the Ledger can be updated.
%% The Ledger is updated by the Bookie applying a function (extract_metadata/4)
%% to the Value to return the Object Metadata, a function to generate a hash
%% of the Value and also taking the Primary Key, the IndexSpecs, the Sequence
%% Number in the Journal and the Object Size (returned from the Inker).
%%
%% A set of Ledger Key changes are then generated and placed in the Bookie's
%% Ledger Key cache.
%%
%% The PUT can now be acknowledged. In the background the Bookie may then
%% choose to push the cache to the Penciller for eventual persistence within
%% the ledger. This push will either be acccepted or returned (if the
%% Penciller has a backlog of key changes). The back-pressure should lead to
%% the Bookie entering into a slow-offer status whereby the next PUT will be
%% acknowledged by a PAUSE signal - with the expectation that the this will
%% lead to a back-off behaviour.
book_put(Pid, Bucket, Key, Object, IndexSpecs) ->
book_put(Pid, Bucket, Key, Object, IndexSpecs, ?STD_TAG).
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) ->
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, infinity).
-spec book_put(pid(), leveled_codec:key(), leveled_codec:key(), any(),
leveled_codec:index_specs(),
leveled_codec:tag(), infinity|integer()) -> ok|pause.
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) when is_atom(Tag) ->
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL, false).
-spec book_put(pid(), leveled_codec:key(), leveled_codec:key(), any(),
leveled_codec:index_specs(),
leveled_codec:tag(), infinity|integer(),
boolean()) -> ok|pause.
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL, DataSync) ->
gen_server:call(Pid,
{put, Bucket, Key, Object, IndexSpecs, Tag, TTL, DataSync},
infinity).
-spec book_mput(pid(), list(leveled_codec:object_spec())) -> ok|pause.
%% @doc
%%
%% When the store is being run in head_only mode, batches of object specs may
%% be inserted in to the store using book_mput/2. ObjectSpecs should be
%% of the form {ObjectOp, Bucket, Key, SubKey, Value}. The Value will be
%% stored within the HEAD of the object (in the Ledger), so the full object
%% is retrievable using a HEAD request. The ObjectOp is either add or remove.
%%
%% The list should be de-duplicated before it is passed to the bookie.
book_mput(Pid, ObjectSpecs) ->
book_mput(Pid, ObjectSpecs, infinity).
-spec book_mput(pid(), list(leveled_codec:object_spec()), infinity|integer())
-> ok|pause.
%% @doc
%%
%% When the store is being run in head_only mode, batches of object specs may
%% be inserted in to the store using book_mput/2. ObjectSpecs should be
%% of the form {action, Bucket, Key, SubKey, Value}. The Value will be
%% stored within the HEAD of the object (in the Ledger), so the full object
%% is retrievable using a HEAD request.
%%
%% The list should be de-duplicated before it is passed to the bookie.
book_mput(Pid, ObjectSpecs, TTL) ->
gen_server:call(Pid, {mput, ObjectSpecs, TTL}, infinity).
-spec book_delete(pid(),
leveled_codec:key(), leveled_codec:key(),
leveled_codec:index_specs()) -> ok|pause.
%% @doc
%%
%% A thin wrap around the put of a special tombstone object. There is no
%% immediate reclaim of space, simply the addition of a more recent tombstone.
book_delete(Pid, Bucket, Key, IndexSpecs) ->
book_put(Pid, Bucket, Key, delete, IndexSpecs, ?STD_TAG).
-spec book_get(pid(),
leveled_codec:key(), leveled_codec:key(), leveled_codec:tag())
-> {ok, any()}|not_found.
-spec book_head(pid(),
leveled_codec:key(), leveled_codec:key(), leveled_codec:tag())
-> {ok, any()}|not_found.
-spec book_sqn(pid(),
leveled_codec:key(), leveled_codec:key(), leveled_codec:tag())
-> {ok, non_neg_integer()}|not_found.
-spec book_headonly(pid(),
leveled_codec:key(), leveled_codec:key(), leveled_codec:key())
-> {ok, any()}|not_found.
%% @doc - GET and HEAD requests
%%
%% The Bookie supports both GET and HEAD requests, with the HEAD request
%% returning only the metadata and not the actual object value. The HEAD
%% requets cna be serviced by reference to the Ledger Cache and the Penciller.
%%
%% GET requests first follow the path of a HEAD request, and if an object is
%% found, then fetch the value from the Journal via the Inker.
%%
%% to perform a head request in head_only mode with_lookup, book_headonly/4
%% should be used. Not if head_only mode is false or no_lookup, then this
%% request would not be supported
book_get(Pid, Bucket, Key, Tag) ->
gen_server:call(Pid, {get, Bucket, Key, Tag}, infinity).
book_head(Pid, Bucket, Key, Tag) ->
gen_server:call(Pid, {head, Bucket, Key, Tag, false}, infinity).
book_get(Pid, Bucket, Key) ->
book_get(Pid, Bucket, Key, ?STD_TAG).
book_head(Pid, Bucket, Key) ->
book_head(Pid, Bucket, Key, ?STD_TAG).
book_headonly(Pid, Bucket, Key, SubKey) ->
gen_server:call(Pid,
{head, Bucket, {Key, SubKey}, ?HEAD_TAG, false},
infinity).
book_sqn(Pid, Bucket, Key) ->
book_sqn(Pid, Bucket, Key, ?STD_TAG).
book_sqn(Pid, Bucket, Key, Tag) ->
gen_server:call(Pid, {head, Bucket, Key, Tag, true}, infinity).
-spec book_returnfolder(pid(), tuple()) -> {async, fun(() -> dynamic())}.
%% @doc Folds over store - deprecated
%% The tuple() is a query, and book_returnfolder will return an {async, Folder}
%% whereby calling Folder() will run a particular fold over a snapshot of the
%% store, and close the snapshot when complete
%%
%% For any new application requiring a fold - use the API below instead, and
%% one of:
%% - book_indexfold
%% - book_bucketlist
%% - book_keylist
%% - book_headfold
%% - book_objectfold
book_returnfolder(Pid, RunnerType) ->
gen_server:call(Pid, {return_runner, RunnerType}, infinity).
%% Different runner types for async queries:
%% - book_indexfold
%% - book_bucketlist
%% - book_keylist
%% - book_headfold
%% - book_objectfold
%%
%% See individual instructions for each one. All folds can be completed early
%% by using a fold_function that throws an exception when some threshold is
%% reached - and a worker that catches that exception.
%%
%% See test/end_to_end/iterator_SUITE:breaking_folds/1
%% @doc Builds and returns an `{async, Runner}' pair for secondary
%% index queries. Calling `Runner' will fold over keys (ledger) tagged
%% with the index `?IDX_TAG' and Constrain the fold to a specific
%% `Bucket''s index fields, as specified by the `Constraint'
%% argument. If `Constraint' is a tuple of `{Bucket, Key}' the fold
%% starts at `Key', meaning any keys lower than `Key' and which match
%% the start of the range query, will not be folded over (this is
%% useful for implementing pagination, for example.)
%%
%% Provide a `FoldAccT' tuple of fold fun ( which is 3 arity fun that
%% will be called once per-matching index entry, with the Bucket,
%% Primary Key (or {IndexVal and Primary key} if `ReturnTerms' is
%% true)) and an initial Accumulator, which will be passed as the 3rd
%% argument in the initial call to FoldFun. Subsequent calls to
%% FoldFun will use the previous return of FoldFun as the 3rd
%% argument, and the final return of `Runner' is the final return of
%% `FoldFun', the final Accumulator value. The query can filter inputs
%% based on `Range' and `TermHandling'. `Range' specifies the name of
%% `IndexField' to query, and `Start' and `End' optionally provide the
%% range to query over. `TermHandling' is a 2-tuple, the first
%% element is a `boolean()', `true' meaning return terms, (see fold
%% fun above), `false' meaning just return primary keys. `TermRegex'
%% is either a regular expression of type `re:mp()' (that will be run
%% against each index term value, and only those that match will be
%% accumulated) or `undefined', which means no regular expression
%% filtering of index values. NOTE: Regular Expressions can ONLY be
%% run on indexes that have binary or string values, NOT integer
%% values. In the Riak sense of secondary indexes, there are two types
%% of indexes `_bin' indexes and `_int' indexes. Term regex may only
%% be run against the `_bin' type.
%%
%% Any book_indexfold query will fold over the snapshot under the control
%% of the worker process controlling the function - and that process can
%% be interrupted by a throw, which will be forwarded to the worker (whilst
%% still closing down the snapshot). This may be used, for example, to
%% curtail a fold in the application at max_results
-spec book_indexfold(pid(),
Constraint:: {Bucket, StartKey},
FoldAccT :: {FoldFun, Acc},
Range :: {IndexField, Start, End},
TermHandling :: {ReturnTerms, TermRegex}) ->
{async, Runner::fun(() -> dynamic())}
when Bucket::term(),
Key :: term(),
StartKey::term(),
FoldFun::fun((Bucket, Key | {IndexVal, Key}, Acc) -> Acc),
Acc::dynamic(),
IndexField::term(),
IndexVal::term(),
Start::IndexVal,
End::IndexVal,
ReturnTerms::boolean(),
TermRegex :: leveled_codec:regular_expression().
book_indexfold(Pid, Constraint, FoldAccT, Range, TermHandling)
when is_tuple(Constraint) ->
RunnerType =
{index_query, Constraint, FoldAccT, Range, TermHandling},
book_returnfolder(Pid, RunnerType);
book_indexfold(Pid, Bucket, FoldAccT, Range, TermHandling) ->
% StartKey must be specified to avoid confusion when bucket is a tuple.
% Use an empty StartKey if no StartKey is required (e.g. <<>>). In a
% future release this code branch may be removed, and such queries may
% instead return `error`. For now null is assumed to be lower than any
% key
leveled_log:log(b0019, [Bucket]),
book_indexfold(Pid, {Bucket, null}, FoldAccT, Range, TermHandling).
%% @doc list buckets. Folds over the ledger only. Given a `Tag' folds
%% over the keyspace calling `FoldFun' from `FoldAccT' for each
%% `Bucket'. `FoldFun' is a 2-arity function that is passed `Bucket'
%% and `Acc'. On first call `Acc' is the initial `Acc' from
%% `FoldAccT', thereafter the result of the previous call to
%% `FoldFun'. `Constraint' can be either atom `all' or `first' meaning
%% return all buckets, or just the first one found. Returns `{async,
%% Runner}' where `Runner' is a fun that returns the final value of
%% `FoldFun', the final `Acc' accumulator.
-spec book_bucketlist(pid(), Tag, FoldAccT, Constraint) ->
{async, Runner} when
Tag :: leveled_codec:tag(),
FoldAccT :: {FoldFun, Acc},
FoldFun :: fun((Bucket, Acc) -> Acc),
Acc :: dynamic(),
Constraint :: first | all,
Bucket :: term(),
Runner :: fun(() -> Acc).
book_bucketlist(Pid, Tag, FoldAccT, Constraint) ->
RunnerType=
case Constraint of
first-> {first_bucket, Tag, FoldAccT};
all -> {bucket_list, Tag, FoldAccT}
end,
book_returnfolder(Pid, RunnerType).
%% @doc fold over the keys (ledger only) for a given `Tag'. Each key
%% will result in a call to `FoldFun' from `FoldAccT'. `FoldFun' is a
%% 3-arity function, called with `Bucket', `Key' and `Acc'. The
%% initial value of `Acc' is the second element of `FoldAccT'. Returns
%% `{async, Runner}' where `Runner' is a function that will run the
%% fold and return the final value of `Acc'
%%
%% Any book_keylist query will fold over the snapshot under the control
%% of the worker process controlling the function - and that process can
%% be interrupted by a throw, which will be forwarded to the worker (whilst
%% still closing down the snapshot). This may be used, for example, to
%% curtail a fold in the application at max_results
-spec book_keylist(pid(), Tag, FoldAccT) -> {async, Runner} when
Tag :: leveled_codec:tag(),
FoldAccT :: {FoldFun, Acc},
FoldFun :: fun((Bucket, Key, Acc) -> Acc),
Acc :: dynamic(),
Bucket :: term(),
Key :: term(),
Runner :: fun(() -> Acc).
book_keylist(Pid, Tag, FoldAccT) ->
RunnerType = {keylist, Tag, FoldAccT},
book_returnfolder(Pid, RunnerType).
%% @doc as for book_keylist/3 but constrained to only those keys in
%% `Bucket'
-spec book_keylist(pid(), Tag, Bucket, FoldAccT) -> {async, Runner} when
Tag :: leveled_codec:tag(),
FoldAccT :: {FoldFun, Acc},
FoldFun :: fun((Bucket, Key, Acc) -> Acc),
Acc :: dynamic(),
Bucket :: term(),
Key :: term(),
Runner :: fun(() -> Acc).
book_keylist(Pid, Tag, Bucket, FoldAccT) ->
RunnerType = {keylist, Tag, Bucket, FoldAccT},
book_returnfolder(Pid, RunnerType).
%% @doc as for book_keylist/4 with additional constraint that only
%% keys in the `KeyRange' tuple will be folder over, where `KeyRange'
%% is `StartKey', the first key in the range and `EndKey' the last,
%% (inclusive.) Or the atom `all', which will return all keys in the
%% `Bucket'.
-spec book_keylist(pid(), Tag, Bucket, KeyRange, FoldAccT) ->
{async, Runner} when
Tag :: leveled_codec:tag(),
FoldAccT :: {FoldFun, Acc},
FoldFun :: fun((Bucket, Key, Acc) -> Acc),
Acc :: dynamic(),
Bucket :: term(),
KeyRange :: {StartKey, EndKey} | all,
StartKey :: Key,
EndKey :: Key,
Key :: term(),
Runner :: fun(() -> Acc).
book_keylist(Pid, Tag, Bucket, KeyRange, FoldAccT) ->
RunnerType = {keylist, Tag, Bucket, KeyRange, FoldAccT, undefined},
book_returnfolder(Pid, RunnerType).
%% @doc as for book_keylist/5 with additional constraint that a compile regular
%% expression is passed to be applied against any key that is in the range.
%% This is always applied to the Key and only the Key, not to any SubKey.
-spec book_keylist(pid(), Tag, Bucket, KeyRange, FoldAccT, TermRegex) ->
{async, Runner} when
Tag :: leveled_codec:tag(),
FoldAccT :: {FoldFun, Acc},
FoldFun :: fun((Bucket, Key, Acc) -> Acc),
Acc :: dynamic(),
Bucket :: term(),
KeyRange :: {StartKey, EndKey} | all,
StartKey :: Key,
EndKey :: Key,
Key :: term(),
TermRegex :: leveled_codec:regular_expression(),
Runner :: fun(() -> Acc).
book_keylist(Pid, Tag, Bucket, KeyRange, FoldAccT, TermRegex) ->
RunnerType = {keylist, Tag, Bucket, KeyRange, FoldAccT, TermRegex},
book_returnfolder(Pid, RunnerType).
%% @doc fold over all the objects/values in the store in key
%% order. `Tag' is the tagged type of object. `FoldAccT' is a 2-tuple,
%% the first element being a 4-arity fun, that is called once for each
%% key with the arguments `Bucket', `Key', `Value', `Acc'. The 2nd
%% element is the initial accumulator `Acc' which is passed to
%% `FoldFun' on it's first call. Thereafter the return value from
%% `FoldFun' is the 4th argument to the next call of
%% `FoldFun'. `SnapPreFold' is a boolean where `true' means take the
%% snapshot at once, and `false' means take the snapshot when the
%% returned `Runner' is executed. Return `{async, Runner}' where
%% `Runner' is a 0-arity function that returns the final accumulator
%% from `FoldFun'
-spec book_objectfold(pid(), Tag, FoldAccT, SnapPreFold) -> {async, Runner} when
Tag :: leveled_codec:tag(),
FoldAccT :: {FoldFun, Acc},
FoldFun :: fun((Bucket, Key, Value, Acc) -> Acc),
Acc :: dynamic(),
Bucket :: term(),
Key :: term(),
Value :: term(),
SnapPreFold :: boolean(),
Runner :: fun(() -> Acc).
book_objectfold(Pid, Tag, FoldAccT, SnapPreFold) ->
RunnerType = {foldobjects_allkeys, Tag, FoldAccT, SnapPreFold},
book_returnfolder(Pid, RunnerType).
%% @doc exactly as book_objectfold/4 with the additional parameter
%% `Order'. `Order' can be `sqn_order' or `key_order'. In
%% book_objectfold/4 and book_objectfold/6 `key_order' is
%% implied. This function called with `Option == key_order' is
%% identical to book_objectfold/4. NOTE: if you most fold over ALL
%% objects, this is quicker than `key_order' due to accessing the
%% journal objects in thei ron disk order, not via a fold over the
%% ledger.
-spec book_objectfold(pid(), Tag, FoldAccT, SnapPreFold, Order) -> {async, Runner} when
Tag :: leveled_codec:tag(),
FoldAccT :: {FoldFun, Acc},
FoldFun :: fun((Bucket, Key, Value, Acc) -> Acc),
Acc :: dynamic(),
Bucket :: term(),
Key :: term(),
Value :: term(),
SnapPreFold :: boolean(),
Runner :: fun(() -> Acc),
Order :: key_order | sqn_order.
book_objectfold(Pid, Tag, FoldAccT, SnapPreFold, Order) ->
RunnerType = {foldobjects_allkeys, Tag, FoldAccT, SnapPreFold, Order},
book_returnfolder(Pid, RunnerType).
%% @doc as book_objectfold/4, with the addition of some constraints on
%% the range of objects folded over. The 3rd argument `Bucket' limits
%% ths fold to that specific bucket only. The 4th argument `Limiter'
%% further constrains the fold. `Limiter' can be either a `Range' or
%% `Index' query. `Range' is either that atom `all', meaning {min,
%% max}, or, a two tuple of start key and end key, inclusive. Index
%% Query is a 3-tuple of `{IndexField, StartTerm, EndTerm}`, just as
%% in book_indexfold/5
-spec book_objectfold(pid(), Tag, Bucket, Limiter, FoldAccT, SnapPreFold) ->
{async, Runner} when
Tag :: leveled_codec:tag(),
FoldAccT :: {FoldFun, Acc},
FoldFun :: fun((Bucket, Key, Value, Acc) -> Acc),
Acc :: dynamic(),
Bucket :: term(),
Key :: term(),
Value :: term(),
Limiter :: Range | Index,
Range :: {StartKey, EndKey} | all,
Index :: {IndexField, Start, End},
IndexField::term(),
IndexVal::term(),
Start::IndexVal,
End::IndexVal,
StartKey :: Key,
EndKey :: Key,
SnapPreFold :: boolean(),
Runner :: fun(() -> Acc).
book_objectfold(Pid, Tag, Bucket, Limiter, FoldAccT, SnapPreFold) ->
RunnerType =
case Limiter of
all ->
{foldobjects_bybucket, Tag, Bucket, all, FoldAccT, SnapPreFold};
Range when is_tuple(Range) andalso size(Range) == 2 ->
{foldobjects_bybucket, Tag, Bucket, Range, FoldAccT, SnapPreFold};
IndexQuery when is_tuple(IndexQuery) andalso size(IndexQuery) == 3 ->
IndexQuery = Limiter,
{foldobjects_byindex, Tag, Bucket, IndexQuery, FoldAccT, SnapPreFold}
end,
book_returnfolder(Pid, RunnerType).
%% @doc LevelEd stores not just Keys in the ledger, but also may store
%% object metadata, referred to as heads (after Riak head request for
%% object metadata) Often when folding over objects all that is really
%% required is the object metadata. These "headfolds" are an efficient
%% way to fold over the ledger (possibly wholly in memory) and get
%% object metadata.
%%
%% Fold over the object's head. `Tag' is the tagged type of the
%% objects to fold over. `FoldAccT' is a 2-tuple. The 1st element is a
%% 4-arity fold fun, that takes a Bucket, Key, ProxyObject, and the
%% `Acc'. The ProxyObject is an object that only contains the
%% head/metadata, and no object data from the journal. The `Acc' in
%% the first call is that provided as the second element of `FoldAccT'
%% and thereafter the return of the previous all to the fold fun. If
%% `JournalCheck' is `true' then the journal is checked to see if the
%% object in the ledger is present, which means a snapshot of the
%% whole store is required, if `false', then no such check is
%% performed, and onlt ledger need be snapshotted. `SnapPreFold' is a
%% boolean that determines if the snapshot is taken when the folder is
%% requested `true', or when when run `false'. `SegmentList' can be
%% `false' meaning, all heads, or a list of integers that designate
%% segments in a TicTac Tree.
-spec book_headfold(pid(), Tag, FoldAccT, JournalCheck, SnapPreFold, SegmentList) ->
{async, Runner} when
Tag :: leveled_codec:tag(),
FoldAccT :: {FoldFun, Acc},
FoldFun :: fun((Bucket, Key, Value, Acc) -> Acc),
Acc :: dynamic(),
Bucket :: term(),
Key :: term(),
Value :: term(),
JournalCheck :: boolean(),
SnapPreFold :: boolean(),
SegmentList :: false | list(integer()),
Runner :: fun(() -> Acc).
book_headfold(Pid, Tag, FoldAccT, JournalCheck, SnapPreFold, SegmentList) ->
book_headfold(Pid, Tag, all,
FoldAccT, JournalCheck, SnapPreFold,
SegmentList, false, false).
%% @doc as book_headfold/6, but with the addition of a `Limiter' that
%% restricts the set of objects folded over. `Limiter' can either be a
%% bucket list, or a key range of a single bucket. For bucket list,
%% the `Limiter' should be a 2-tuple, the first element the tag
%% `bucket_list' and the second a `list()' of `Bucket'. Only heads
%% from the listed buckets will be folded over. A single bucket key
%% range may also be used as a `Limiter', in which case the argument
%% is a 3-tuple of `{range ,Bucket, Range}' where `Bucket' is a
%% bucket, and `Range' is a 2-tuple of start key and end key,
%% inclusive, or the atom `all'. The rest of the arguments are as
%% `book_headfold/6'
-spec book_headfold(pid(), Tag, Limiter, FoldAccT, JournalCheck, SnapPreFold, SegmentList) ->
{async, Runner} when
Tag :: leveled_codec:tag(),
Limiter :: BucketList | BucketKeyRange,
BucketList :: {bucket_list, list(Bucket)},
BucketKeyRange :: {range, Bucket, KeyRange},
KeyRange :: {StartKey, EndKey} | all,
StartKey :: Key,
EndKey :: Key,
FoldAccT :: {FoldFun, Acc},
FoldFun :: fun((Bucket, Key, Value, Acc) -> Acc),
Acc :: dynamic(),
Bucket :: term(),
Key :: term(),