Skip to content

Commit

Permalink
add event-listener interface
Browse files Browse the repository at this point in the history
  • Loading branch information
yangyazhou committed Dec 27, 2023
1 parent c18aeae commit 160373d
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 0 deletions.
55 changes: 55 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "rocksdb/experimental.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/iterator.h"
#include "rocksdb/listener.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/metadata.h"
Expand Down Expand Up @@ -77,10 +78,12 @@ using ROCKSDB_NAMESPACE::DBOptions;
using ROCKSDB_NAMESPACE::DbPath;
using ROCKSDB_NAMESPACE::Env;
using ROCKSDB_NAMESPACE::EnvOptions;
using ROCKSDB_NAMESPACE::EventListener;
using ROCKSDB_NAMESPACE::ExportImportFilesMetaData;
using ROCKSDB_NAMESPACE::FileLock;
using ROCKSDB_NAMESPACE::FileType;
using ROCKSDB_NAMESPACE::FilterPolicy;
using ROCKSDB_NAMESPACE::FlushJobInfo;
using ROCKSDB_NAMESPACE::FlushOptions;
using ROCKSDB_NAMESPACE::ImportColumnFamilyOptions;
using ROCKSDB_NAMESPACE::InfoLogLevel;
Expand Down Expand Up @@ -282,6 +285,28 @@ struct rocksdb_compactionfiltercontext_t {
CompactionFilter::Context rep;
};

struct rocksdb_flush_job_info_t {
FlushJobInfo rep;
};

struct rocksdb_event_listener_t : public EventListener {
void* state_;
void (*on_flush_begin)(void*, rocksdb_flush_job_info_t*);
void (*on_flush_completed)(void*, rocksdb_flush_job_info_t*);

void OnFlushBegin(DB*, const FlushJobInfo& info) override {
rocksdb_flush_job_info_t info_t;
info_t.rep = info;
on_flush_begin(state_, &info_t);
}

void OnFlushCompleted(DB*, const FlushJobInfo& info) override {
rocksdb_flush_job_info_t info_t;
info_t.rep = info;
on_flush_completed(state_, &info_t);
}
};

struct rocksdb_compactionfilter_t : public CompactionFilter {
void* state_;
void (*destructor_)(void*);
Expand Down Expand Up @@ -6627,6 +6652,36 @@ void rocksdb_enable_manual_compaction(rocksdb_t* db) {
db->rep->EnableManualCompaction();
}

char* rocksdb_flush_job_info_cf_name(rocksdb_flush_job_info_t* info, size_t* name_len) {
auto name = info->rep.cf_name;
*name_len = name.size();
return CopyString(name);
}

uint64_t rocksdb_flush_job_info_largest_seqno(rocksdb_flush_job_info_t* info) {
return info->rep.largest_seqno;
}

uint64_t rocksdb_flush_job_info_smallest_seqno(rocksdb_flush_job_info_t* info) {
return info->rep.smallest_seqno;
}

rocksdb_event_listener_t* rocksdb_event_listener_create(
void* state, void (*on_flush_begin)(void*, rocksdb_flush_job_info_t*),
void (*on_flush_completed)(void*, rocksdb_flush_job_info_t*)) {
rocksdb_event_listener_t* result = new rocksdb_event_listener_t;
result->state_ = state;
result->on_flush_begin = on_flush_begin;
result->on_flush_completed = on_flush_completed;
return result;
}

void rocksdb_options_add_event_listener(rocksdb_options_t* opt,
rocksdb_event_listener_t* listener) {
opt->rep.listeners.push_back(
std::shared_ptr<rocksdb_event_listener_t>(listener));
}

} // end extern "C"

#endif // !ROCKSDB_LITE
15 changes: 15 additions & 0 deletions include/rocksdb/c.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ typedef struct rocksdb_memory_consumers_t rocksdb_memory_consumers_t;
typedef struct rocksdb_memory_usage_t rocksdb_memory_usage_t;
typedef struct rocksdb_export_import_files_metadata_t rocksdb_export_import_files_metadata_t;
typedef struct rocksdb_live_file_metadata rocksdb_live_file_metadata;
typedef struct rocksdb_flush_job_info_t rocksdb_flush_job_info_t;
typedef struct rocksdb_event_listener_t rocksdb_event_listener_t;

/* DB operations */

Expand Down Expand Up @@ -1289,6 +1291,19 @@ extern ROCKSDB_LIBRARY_API unsigned char
rocksdb_options_get_skip_checking_sst_file_sizes_on_db_open(
rocksdb_options_t* opt);

extern ROCKSDB_LIBRARY_API char* rocksdb_flush_job_info_cf_name(
rocksdb_flush_job_info_t* info, size_t* name_len);
extern ROCKSDB_LIBRARY_API uint64_t
rocksdb_flush_job_info_largest_seqno(rocksdb_flush_job_info_t* info);
extern ROCKSDB_LIBRARY_API uint64_t
rocksdb_flush_job_info_smallest_seqno(rocksdb_flush_job_info_t* info);
extern ROCKSDB_LIBRARY_API rocksdb_event_listener_t*
rocksdb_event_listener_create(
void* state, void (*on_flush_begin)(void*, rocksdb_flush_job_info_t*),
void (*on_flush_completed)(void*, rocksdb_flush_job_info_t*));
extern ROCKSDB_LIBRARY_API void rocksdb_options_add_event_listener(
rocksdb_options_t* opt, rocksdb_event_listener_t* listener);

/* Blob Options Settings */
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_enable_blob_files(
rocksdb_options_t* opt, unsigned char val);
Expand Down

0 comments on commit 160373d

Please sign in to comment.