Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IsEmpty() checks were missing before some SwitchMemtable() invocation… #808

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2140,7 +2140,10 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
log::Writer* new_log = nullptr;
MemTable* new_mem = nullptr;
IOStatus io_s;


if(cfd->mem()->IsEmpty()) {
return Status::OK();
}
// Recoverable state is persisted in WAL. After memtable switch, WAL might
// be deleted, so we write the state to memtable to be persisted as well.
Status s = WriteRecoverableState();
Expand Down
188 changes: 128 additions & 60 deletions examples/simple_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,89 +5,157 @@

#include <cstdio>
#include <string>
#include<iostream>

#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/iterator.h"
namespace fs = std::filesystem;

using ROCKSDB_NAMESPACE::DB;
using ROCKSDB_NAMESPACE::Options;
using ROCKSDB_NAMESPACE::PinnableSlice;
using ROCKSDB_NAMESPACE::ReadOptions;
using ROCKSDB_NAMESPACE::Status;
using ROCKSDB_NAMESPACE::WriteBatch;
using ROCKSDB_NAMESPACE::WriteOptions;

using namespace ROCKSDB_NAMESPACE;
using namespace std;
#if defined(OS_WIN)
std::string kDBPath = "C:\\Windows\\TEMP\\rocksdb_simple_example";
#else
std::string kDBPath = "/tmp/rocksdb_simple_example";
#endif

int main() {
DB* db;
DB* db;
std::string sst_root_dir = "/tmp/test/";
vector<string> paths;
// #define MAX_KEYS 100000

int max_keys = 0, per_iter_keys = 0, conc_writes = 0;

int create_sst_file(int start, int end);
void ingest_file();

bool create_sst_files() {
for(int i=1;i<max_keys;i+=per_iter_keys) {
bool res = create_sst_file(i, i+per_iter_keys);
if(!res) {
return false;
}
}
return true;
}

vector<string> get_all_files(string dir_path) {
return paths;
}

int create_sst_file(int start, int end) {
Options options;
// Optimize RocksDB. This is the easiest way to get RocksDB to perform well
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// create the DB if it's not already present
options.create_if_missing = true;

// open DB
Status s = DB::Open(options, kDBPath, &db);
assert(s.ok());
SstFileWriter sst_file_writer(EnvOptions(), options);
string file_path = sst_root_dir + std::to_string(start);
paths.push_back(file_path);
Status s = sst_file_writer.Open(file_path);
if (!s.ok()) {
printf("Error while opening file %s, Error: %s\n", file_path.c_str(),
s.ToString().c_str());
return false;
}

// Put key-value
s = db->Put(WriteOptions(), "key1", "value");
assert(s.ok());
std::string value;
// get value
s = db->Get(ReadOptions(), "key1", &value);
assert(s.ok());
assert(value == "value");

// atomically apply a set of updates
{
WriteBatch batch;
batch.Delete("key1");
batch.Put("key2", value);
s = db->Write(WriteOptions(), &batch);
// Insert rows into the SST file, note that inserted keys must be
// strictly increasing (based on options.comparator)
std::vector<std::string> nums;
for(int i=start;i<=end;i++) {
if(i%10 == 0) continue;
nums.push_back(std::to_string(i));
}
std::sort(nums.begin(), nums.end());
for(auto num : nums) {
s = sst_file_writer.Put(num, num);
if (!s.ok()) {
printf("Error while adding Key: %s, Error: %s\n", num.c_str(),
s.ToString().c_str());
return false;
}
}

s = db->Get(ReadOptions(), "key1", &value);
assert(s.IsNotFound());
// Close the file
s = sst_file_writer.Finish();
if (!s.ok()) {
printf("Error while finishing file %s, Error: %s\n", file_path.c_str(),
s.ToString().c_str());
return false;
}
return true;
}

db->Get(ReadOptions(), "key2", &value);
assert(value == "value");
void ingest_file() {
cout<<"executing ingest_file\n";
IngestExternalFileOptions ifo;
vector<string> ssts_files = get_all_files(sst_root_dir);
Status s = db->IngestExternalFile(ssts_files, ifo);
if (!s.ok()) {
printf("Error while ingesting files. Error %s\n", s.ToString().c_str());
return;
}
return;
}

{
PinnableSlice pinnable_val;
db->Get(ReadOptions(), db->DefaultColumnFamily(), "key2", &pinnable_val);
assert(pinnable_val == "value");
void read_kvs() {
auto it = db->NewIterator(ReadOptions());
cout<<"Executing read_kvs\n";
int iter = 100;
while(iter>0) {


it->SeekToFirst();
while(it->Valid()) {
string key = it->key().ToString();
string val = it->value().ToString();
cout<<"Key: "<<key<<" value: "<<val<<endl;
it->Next();
}
iter--;
}
}

{
std::string string_val;
// If it cannot pin the value, it copies the value to its internal buffer.
// The intenral buffer could be set during construction.
PinnableSlice pinnable_val(&string_val);
db->Get(ReadOptions(), db->DefaultColumnFamily(), "key2", &pinnable_val);
assert(pinnable_val == "value");
// If the value is not pinned, the internal buffer must have the value.
assert(pinnable_val.IsPinned() || string_val == "value");
void write_kvs() {
cout<<"Executing write_kvs\n";
// Put key-value
for(int i=1; i<conc_writes;i++) {
string kv = to_string(i);
Status s = db->Put(WriteOptions(), kv, kv);
}
}

PinnableSlice pinnable_val;
s = db->Get(ReadOptions(), db->DefaultColumnFamily(), "key1", &pinnable_val);
assert(s.IsNotFound());
// Reset PinnableSlice after each use and before each reuse
pinnable_val.Reset();
db->Get(ReadOptions(), db->DefaultColumnFamily(), "key2", &pinnable_val);
assert(pinnable_val == "value");
pinnable_val.Reset();
// The Slice pointed by pinnable_val is not valid after this point

delete db;
int main(int argc, char *argv[]) {

//create one instance of db for both ingesting and directly writing new keys to the db. db is global variable
Options options;
conc_writes = (int) std::stoi(argv[1]);
max_keys = (int) std::stoi(argv[2]);
per_iter_keys = (int) std::stoi(argv[3]);
options.IncreaseParallelism();
// options.OptimizeLevelStyleCompaction();
options.create_if_missing = true;
cout<<"Default Memtable size: "<<options.write_buffer_size<<endl;
options.write_buffer_size = 1024;
Status s = DB::Open(options, kDBPath, &db);

if(!s.ok()) {
cout<<"Error while opening database: "<<s.ToString().c_str()<<endl;
}
assert(s.ok());

//create sst
create_sst_files();
std::cout<<"SST created"<<std::endl;

//ingest and write in parallel to reproduce.
std::thread t2(ingest_file);
std::thread t1(write_kvs);
std::thread t3(read_kvs);
t1.join();
t2.join();
t3.join();
std::cout<<"Ingestion completed"<<std::endl;
return 0;
}

Loading