Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue65 #66

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/niova-CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- name: dpkg / rpm prep (debug)
run: echo "sudo -E apt-get install -y uuid-dev uuid libuuid1 libaio-dev libaio1 libgcrypt20 openssl libssl-dev `apt-cache search librocksdb | awk '{print $1}'` uncrustify libasan5 libtsan0"
- name: dpkg / rpm prep
run: sudo -E apt-get install -y uuid-dev uuid libuuid1 libaio-dev libaio1 libgcrypt20 openssl libssl-dev python3-pip python-jmespath python3-setuptools `apt-cache search librocksdb | awk '{print $1}'` uncrustify libasan5 libtsan0
run: sudo -E apt-get install -y uuid-dev uuid libuuid1 libaio-dev libaio1 libgcrypt20 openssl libssl-dev python3-pip python3-setuptools `apt-cache search librocksdb | awk '{print $1}'` uncrustify libasan5 libtsan0
- name: make
run: cd ./code &&
./prepare.sh &&
Expand Down
14 changes: 9 additions & 5 deletions src/include/pumice_db_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@

#include "pumice_db_net.h"

int
void *
PmdbObjGetX(pmdb_t pmdb, const pmdb_obj_id_t *obj_id, const char *key,
size_t key_size, char *value, size_t value_size,
bool expand_reply_buff,
struct pmdb_obj_stat *user_pmdb_stat);

int
Expand All @@ -22,9 +23,10 @@ int
PmdbObjPut(pmdb_t pmdb, const pmdb_obj_id_t *obj_id, const char *kv,
size_t kv_size, struct pmdb_obj_stat *user_pmdb_stat);

int
void *
PmdbObjGet(pmdb_t pmdb, const pmdb_obj_id_t *obj_id, const char *key,
size_t key_size, char *value, size_t value_size);
size_t key_size, char *value, size_t value_size,
bool expand_reply_buff);

int
PmdbObjLookupNB(pmdb_t pmdb, const pmdb_obj_id_t *obj_id,
Expand All @@ -35,14 +37,16 @@ PmdbObjPutNB(pmdb_t pmdb, const pmdb_obj_id_t *obj_id, const char *kv,
size_t kv_size, pmdb_user_cb_t user_cb, void *user_arg,
struct pmdb_obj_stat *user_pmdb_stat);

int
void *
PmdbObjGetNB(pmdb_t pmdb, const pmdb_obj_id_t *obj_id, const char *key,
size_t key_size, char *value, size_t value_size,
bool expand_reply_buff,
pmdb_user_cb_t user_cb, void *user_arg);

int
void *
PmdbObjGetXNB(pmdb_t pmdb, const pmdb_obj_id_t *obj_id, const char *key,
size_t key_size, char *value, size_t value_size,
bool expand_reply_buff,
pmdb_user_cb_t user_cb, void *user_arg,
struct pmdb_obj_stat *user_pmdb_stat);

Expand Down
1 change: 1 addition & 0 deletions src/include/pumice_db_net.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ typedef struct pmdb_obj_stat
{
pmdb_obj_id_t obj_id;
int64_t sequence_num;
int64_t reply_size;
int status;
uint8_t write_op_pending : 1;
} pmdb_obj_stat_t;
Expand Down
1 change: 1 addition & 0 deletions src/include/raft_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ raft_client_request_submit(raft_client_instance_t rci,
const struct raft_net_client_user_id *rncui,
const struct iovec *src_iovs, size_t nsrc_iovs,
struct iovec *dest_iovs, size_t ndest_iovs,
bool expand_reply_buff,
const struct timespec timeout,
const enum raft_client_request_type rcrt,
raft_client_user_cb_t user_cb, void *user_arg,
Expand Down
63 changes: 44 additions & 19 deletions src/pumice_db_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pmdb_client_completion_fill_pmdb_stat(struct pmdb_client_request *pcreq,
pst->obj_id = pcreq->pcreq_obj_id;
pst->sequence_num = reply->pmdbrm_write_seqno;
pst->write_op_pending = !!reply->pmdbrm_write_pending;
pst->reply_size = reply->pmdbrm_data_size;
}
}

Expand Down Expand Up @@ -276,7 +277,7 @@ pmdb_obj_lookup_internal(pmdb_t pmdb, const pmdb_obj_id_t *obj_id,
NIOVA_ASSERT(pmdb_obj_id_2_rncui(obj_id, &rncui) == &rncui);

return raft_client_request_submit(pmdb_2_rci(pmdb), &rncui, &req_iov, 1,
&reply_iov, 1, timeout,
&reply_iov, 1, false, timeout,
blocking ? RCRT_READ : RCRT_READ_NB,
pmdb_client_request_cb, pcreq,
pcreq->pcreq_tag);
Expand Down Expand Up @@ -347,7 +348,7 @@ pmdb_obj_put_internal(pmdb_t pmdb, const pmdb_obj_id_t *obj_id,
};

return raft_client_request_submit(pmdb_2_rci(pmdb), &rncui, req_iovs, 2,
&reply_iov, 1, timeout,
&reply_iov, 1, false, timeout,
blocking ? RCRT_WRITE : RCRT_WRITE_NB,
pmdb_client_request_cb, pcreq,
pcreq->pcreq_tag);
Expand Down Expand Up @@ -383,17 +384,17 @@ PmdbObjPutNB(pmdb_t pmdb, const pmdb_obj_id_t *obj_id, const char *kv,
user_cb, user_arg, user_pmdb_stat);
}

static int
static void *
pmdb_obj_get_internal(pmdb_t pmdb, const pmdb_obj_id_t *obj_id,
const void *key, size_t key_size,
void *value, size_t value_size,
const void *key, size_t key_size, void *value,
size_t value_size, bool expand_reply_buff,
const bool blocking, const struct timespec timeout,
pmdb_user_cb_t user_cb, void *user_arg,
struct pmdb_obj_stat *user_pmdb_stat)
{
// NULL user_buf or buf_size of 0 is OK
if (!pmdb || !obj_id || (!blocking && !user_cb))
return -EINVAL;
return NULL;

int rc = 0;

Expand All @@ -402,7 +403,7 @@ pmdb_obj_get_internal(pmdb_t pmdb, const pmdb_obj_id_t *obj_id,
value, value_size, user_pmdb_stat, timeout,
user_cb, user_arg, &rc);
if (!pcreq)
return rc;
return NULL;

struct raft_net_client_user_id rncui;
NIOVA_ASSERT(pmdb_obj_id_2_rncui(obj_id, &rncui) == &rncui);
Expand All @@ -421,69 +422,93 @@ pmdb_obj_get_internal(pmdb_t pmdb, const pmdb_obj_id_t *obj_id,
[1].iov_len = value_size,
};

return raft_client_request_submit(pmdb_2_rci(pmdb), &rncui, req_iovs, 2,
reply_iovs, 2, timeout,
rc = raft_client_request_submit(pmdb_2_rci(pmdb), &rncui, req_iovs, 2,
reply_iovs, 2,
expand_reply_buff,
timeout,
blocking ? RCRT_READ : RCRT_READ_NB,
pmdb_client_request_cb, pcreq,
pcreq->pcreq_tag);

/* If server sent data larger than value_size,
raft_client_reply_try_complete() might have allocated bigger buffer and
stored the pointer in reply_iovs[1].iov_base.
*/
void *reply_buff = reply_iovs[1].iov_base;

if (rc)
{
SIMPLE_LOG_MSG(LL_ERROR, "raft_client_request_submit failed (error: %d)", rc);
if (reply_buff && reply_buff != value)
niova_free(reply_buff);

return NULL;
}
return reply_buff;
}

/**
* PmdbObjGet - blocking public get (read) routine.
*/
int
void *
PmdbObjGetX(pmdb_t pmdb, const pmdb_obj_id_t *obj_id, const char *key,
size_t key_size, char *value, size_t value_size,
bool expand_reply_buff,
struct pmdb_obj_stat *user_pmdb_stat)
{
const struct timespec timeout = {pmdb_get_default_request_timeout(), 0};

return pmdb_obj_get_internal(pmdb, obj_id, key, key_size, value,
value_size, true, timeout, NULL, NULL,
value_size, expand_reply_buff, true, timeout,
NULL, NULL,
user_pmdb_stat);
}

/**
* PmdbObjGet - blocking public get (read) routine.
*/
int
void *
PmdbObjGet(pmdb_t pmdb, const pmdb_obj_id_t *obj_id, const char *key,
size_t key_size, char *value, size_t value_size)
size_t key_size, char *value, size_t value_size, bool expand_reply_buff)
{
const struct timespec timeout = {pmdb_get_default_request_timeout(), 0};

return pmdb_obj_get_internal(pmdb, obj_id, key, key_size, value,
value_size, true, timeout, NULL, NULL, NULL);
value_size, false, true, timeout, NULL, NULL, NULL);
}

/**
* PmdbObjGetNB - non-blocking public put (write) routine.
*/
int
void *
PmdbObjGetNB(pmdb_t pmdb, const pmdb_obj_id_t *obj_id, const char *key,
size_t key_size, char *value, size_t value_size,
bool expand_reply_buff,
pmdb_user_cb_t user_cb, void *user_arg)
{
if (!user_cb)
return -EINVAL;
return NULL;

const struct timespec timeout = {pmdb_get_default_request_timeout(), 0};

return pmdb_obj_get_internal(pmdb, obj_id, key, key_size, value,
value_size, false, timeout, user_cb,
value_size, expand_reply_buff, false, timeout,
user_cb,
user_arg, NULL);
}

int
void *
PmdbObjGetXNB(pmdb_t pmdb, const pmdb_obj_id_t *obj_id, const char *key,
size_t key_size, char *value, size_t value_size,
bool expand_reply_buff,
pmdb_user_cb_t user_cb, void *user_arg,
struct pmdb_obj_stat *user_pmdb_stat)
{
const struct timespec timeout = {pmdb_get_default_request_timeout(), 0};

return pmdb_obj_get_internal(pmdb, obj_id, key, key_size, value,
value_size, true, timeout, user_cb, user_arg,
value_size, expand_reply_buff, true, timeout,
user_cb, user_arg,
user_pmdb_stat);
}

Expand Down
25 changes: 22 additions & 3 deletions src/raft_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ struct raft_client_request_handle
uint8_t rcrh_cb_exec : 1;
uint8_t rcrh_op_wr : 1;
uint8_t rcrh_history_cache : 1;
uint8_t rcrh_expand_reply_iovs : 1;
int16_t rcrh_error;
uint16_t rcrh_sin_reply_port;
struct in_addr rcrh_sin_reply_addr;
Expand Down Expand Up @@ -1174,7 +1175,7 @@ static int
raft_client_request_handle_init(
struct raft_client_instance *rci, struct raft_client_request_handle *rcrh,
const struct iovec *src_iovs, size_t nsrc_iovs, struct iovec *dest_iovs,
size_t ndest_iovs, const struct timespec now,
size_t ndest_iovs, bool expand_reply_buff, const struct timespec now,
const struct timespec timeout, const enum raft_client_request_type rcrt,
raft_client_user_cb_t user_cb, void *user_arg,
const raft_net_request_tag_t tag)
Expand Down Expand Up @@ -1205,6 +1206,7 @@ raft_client_request_handle_init(
rcrh->rcrh_initializing = 1;
rcrh->rcrh_send_niovs = nsrc_iovs;
rcrh->rcrh_recv_niovs = ndest_iovs;
rcrh->rcrh_expand_reply_iovs = expand_reply_buff ? 1 : 0;

rcrh->rcrh_blocking =
(rcrt == RCRT_READ_NB || rcrt == RCRT_WRITE_NB) ? 0 : 1;
Expand Down Expand Up @@ -1384,6 +1386,7 @@ raft_client_request_submit(raft_client_instance_t client_instance,
const struct raft_net_client_user_id *rncui,
const struct iovec *src_iovs, size_t nsrc_iovs,
struct iovec *dest_iovs, size_t ndest_iovs,
bool expand_reply_buff,
const struct timespec timeout,
const enum raft_client_request_type rcrt,
raft_client_user_cb_t user_cb, void *user_arg,
Expand Down Expand Up @@ -1435,7 +1438,8 @@ raft_client_request_submit(raft_client_instance_t client_instance,

int rc =
raft_client_request_handle_init(rci, rcrh, src_iovs, nsrc_iovs,
dest_iovs, ndest_iovs, now, timeout,
dest_iovs, ndest_iovs, expand_reply_buff,
now, timeout,
rcrt, user_cb, user_arg, tag);
if (rc)
{
Expand Down Expand Up @@ -1563,12 +1567,27 @@ raft_client_reply_try_complete(struct raft_client_instance *rci,
{
rcrh->rcrh_reply_size = rcrm->rcrm_data_size;

// XXX Need a fault injection here!
struct iovec *recv_iovs = &rcrh->rcrh_iovs[rcrh->rcrh_send_niovs];

int reply_size_error =
(rcrh->rcrh_reply_size >
niova_io_iovs_total_size_get(
&rcrh->rcrh_iovs[rcrh->rcrh_send_niovs],
rcrh->rcrh_recv_niovs)) ? -E2BIG : 0;

// If client has allocated smaller buffer and allowed to explan the buffer on bigger size result
// Or haven't allocated buffer at all.
if ((reply_size_error == -E2BIG &&
rcrh->rcrh_expand_reply_iovs) || recv_iovs[1].iov_base == NULL)
{
/* Allocate or Reallocate buffer */
SIMPLE_LOG_MSG(LL_DEBUG, "Allocate or Reallocate buffer: %ld",
(rcrh->rcrh_reply_size - recv_iovs[0].iov_len));

recv_iovs[1].iov_base = realloc(recv_iovs[1].iov_base, (rcrh->rcrh_reply_size - recv_iovs[0].iov_len));
recv_iovs[1].iov_len = (rcrh->rcrh_reply_size - recv_iovs[0].iov_len);
reply_size_error = 0;
}
if (from)
{
rcrh->rcrh_sin_reply_addr = from->sin_addr;
Expand Down
7 changes: 6 additions & 1 deletion src/raft_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1526,7 +1526,7 @@ raft_server_log_truncate(struct raft_instance *ri)

ri->ri_backend->rib_log_truncate(ri, trunc_entry_idx);

DBG_RAFT_INSTANCE_TAG(LL_NOTIFY, "log-rollback", ri, "new-max-raft-idx=%ld",
DBG_RAFT_INSTANCE_TAG(LL_WARN, "log-rollback", ri, "new-max-raft-idx=%ld",
trunc_entry_idx);
}

Expand Down Expand Up @@ -2645,6 +2645,11 @@ raft_server_append_entry_log_prune_if_needed(

raft_instance_update_newest_entry_hdr(ri, &reh, RI_NEHDR_ALL, true);
}
else // Restore the initial state of the entry header
{
// Issue #65
raft_instance_initialize_newest_entry_hdr(ri);
}

// truncate the log.
raft_server_log_truncate(ri);
Expand Down
57 changes: 28 additions & 29 deletions test/dict_app/dictionary_client/pumice-dict-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,37 +71,36 @@ func pmdbDictClient() {
*/
data_length := PumiceDBCommon.GetStructSize(req_dict)
fmt.Println("Length of the structure: ", data_length)
rc := -1
/* Retry the read on failure */
for ok := true; ok; ok = (rc < 0) {

// Allocate C memory to store the value of the result.
fmt.Println("Allocating buffer of size: ", data_length)
value_buf := C.malloc(C.size_t(data_length))

var reply_size int64
//read operation
rc = client_obj.PmdbClientRead(req_dict, rncui, value_buf,
int64(data_length), &reply_size)

if rc < 0 {
fmt.Println("Read request failed, error: ", rc)
//if rc == os.E2BIG {
if reply_size > data_length {
fmt.Println("Allocate bigger buffer and retry read operation: ", data_length)
data_length = reply_size
}
} else {
result_dict := &DictAppLib.Dict_app{}
PumiceDBCommon.Decode(value_buf, result_dict, reply_size)

fmt.Println("Result of the read request is:")
fmt.Println("Word: ", input_text)
fmt.Println("Frequecy of the word: ", result_dict.Dict_wcount)
}

C.free(value_buf)

// Allocate C memory to store the value of the result.
fmt.Println("Allocating buffer of size: ", data_length)
value_buf := C.malloc(C.size_t(data_length))

var reply_size int64
//read operation
reply_buff := client_obj.PmdbClientRead(req_dict, rncui, value_buf,
int64(data_length), true, &reply_size)

if reply_buff == nil {
fmt.Println("Read request failed !!")
} else {
result_dict := &DictAppLib.Dict_app{}
PumiceDBCommon.Decode(value_buf, result_dict, reply_size)

fmt.Println("Result of the read request is:")
fmt.Println("Word: ", input_text)
fmt.Println("Frequecy of the word: ", result_dict.Dict_wcount)
}
if reply_buff != value_buf {
/* If pmdb library has allocated bigger buffer to
* accomodate the result, make sure we free the buffer.
*/
fmt.Println("Free the buffer allocated by library")
C.free(reply_buff)
}

C.free(value_buf)
}
}
}
Expand Down
Loading