-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add Timeplus APIs #18
Conversation
yuzifeng1984
commented
Aug 12, 2024
•
edited
Loading
edited
- Idempotent insert
- Thread-safe insert APIs
- More better controlled retry on exception.
@@ -0,0 +1,243 @@ | |||
#pragma once |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cloned from proton
388c528
to
7568fe7
Compare
timeplus/timeplus.h
Outdated
|
||
namespace timeplus { | ||
|
||
struct TimeplusConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TimeplusConfig can contain a ClientConfig
directly ? would this simplify code ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We overwrite some client config items and not want user to change it. So only expose a portion of the client parameters to config. The draw back is user could not change the unexposed parameters (such as ssl).
I am thinking of what is the best way to do this...
PS. segregate user from directly config client might make future extension easier, (eg. auto node host/port discovery)
timeplus/timeplus.h
Outdated
std::string password; | ||
|
||
/// Max number of connections maintained in pool. | ||
unsigned int max_connections = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's stick to uint32_t
etc new style
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this per host_port max_connections ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is the total number of clients in the pool (so at-most max_connections will be created). Each client will round-robin pick the host/port when connect/re-connect, starting from the first host/port.
tests/insert-async/main.cpp
Outdated
block.AppendColumn("i", col_i); | ||
block.AppendColumn("s", col_s); | ||
|
||
class Inserter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this class and use a plain loop in the main which could be way more direct usage
tests/insert-async/main.cpp
Outdated
inserter.InsertBlock(); | ||
} | ||
|
||
while (inserter.BlockInserted() != INSERT_BLOCKS) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have a bit advanced usage by using atomic wait https://en.cppreference.com/w/cpp/atomic/atomic/wait instead of spin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently sdk is based on C++17. I will add notification later.
tests/insert-async/main.cpp
Outdated
} | ||
|
||
} else { | ||
std::cout << "[" << timestamp() << "]\t Failed to insert block: insert_id=" << id << std::endl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a fail message / reason ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added to print out result.err_msg
tests/insert-async/main.cpp
Outdated
} | ||
|
||
void InsertBlock() { | ||
tp_.InsertAsync(table_name_, block_, [this](uint64_t id, const auto& result) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the id is passed via callback, what can users do with it ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the internal id and changed the insert APIs in timeplus.h
The InsertResult contains the original parameters for user convinience.
auto handle_insert_result = [&](size_t user_block_id, const InsertResult& result) {...}
tp.InsertAsync("table", block,
[user_block_id, &handle_insert_result](const BaseResult& result) {
const auto& insert_result = static_cast<const InsertResult&>(result);
handle_insert_result(block_id, insert_result);
});
timeplus/client.cpp
Outdated
Query query("INSERT INTO " + table_name + " ( " + fields_section.str() + " ) VALUES", query_id); | ||
std::string settings; | ||
if (!idempotent_id.empty()) { | ||
settings = " SETTINGS idempotent_id='" + idempotent_id + "'"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a bit more perf
settings.reserve(128);
settings.append(" SETTINGS idempotent_id='").append(idempotent_id).append("'");
timeplus/client.cpp
Outdated
settings = " SETTINGS idempotent_id='" + idempotent_id + "'"; | ||
} | ||
|
||
Query query("INSERT INTO " + table_name + " ( " + fields_section.str() + " )" + settings + " VALUES", query_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, let's try to minimize string memory allocation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This in general seems very verbose string concatenation and lower perf, it is not even using native wire protocol ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to use the query settings serialization.
timeplus/timeplus.h
Outdated
|
||
using InsertResult = BaseResult; | ||
|
||
using Callback = std::function<void(uint64_t id, const BaseResult&)>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we provide an id for the callback, what users can do with the id ? It seems too late.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed the insert arguments and return. please check the update.
timeplus/timeplus.h
Outdated
|
||
namespace timeplus { | ||
|
||
struct TimeplusConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nite : we can consider creating a separate timeplus_config.h file
timeplus/client_pool.h
Outdated
|
||
std::optional<std::pair<ClientPtr, bool>> Acquire(int64_t timeout_ms); | ||
|
||
void Release(ClientPtr& client, bool valid) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's take ClientPtr
since it will be more convenient and perf-wise, it is still fine and caller can std::move(...)
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed.
timeplus/client_pool.h
Outdated
|
||
~ClientPool() { assert(clients_.size() == pool_size_); } | ||
|
||
std::optional<std::pair<ClientPtr, bool>> Acquire(int64_t timeout_ms); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need return a bool ? Can acquire return a valid ClientPtr ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking to distinct the cases between blocking queue is empty and the returned client cannot connect within timeout.
I changed the behavior to throw different exceptions when could not acquire the client.
|
||
auto& [client, valid] = maybe_client.value(); | ||
try { | ||
/// Lazy init client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can init the client greedy-ly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Client init will create connection to server which is relative heavy. May take even longer time when timeout. So put the init where user is using the it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The challenge with lazy init is we don't have a good way to report back, oh, your hostname port configure is wrong
, we can't connect. For me an initial connect to report this configuration issue (which happens often) is good
timeplus/timeplus.cpp
Outdated
auto task_id = next_task_id_.fetch_add(1); | ||
auto task = std::make_shared<InsertTask>(); | ||
task->task_id = task_id; | ||
task->table_name = std::move(table_name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
task->table_name.swap(table_name)
task->block.swap(block)
task->idempotent_id.swap(idempotent_id);
Or provide a construct to InsertTask
which takes these &&
parameters there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a constructor
timeplus/client.cpp
Outdated
settings = " SETTINGS idempotent_id='" + idempotent_id + "'"; | ||
Query query("INSERT INTO " + table_name + " ( " + fields_section.str() + " ) VALUES", query_id); | ||
|
||
if (!idempotent_id.empty() && versionNumber(server_info_.version_major, server_info_.version_minor, server_info_.version_patch) >= |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we cache version number ? and in our case probably we don't need this version number guard since all of our production enterprise users have version number bigger than this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the server version check for idempotent insert
void Insert(const std::string& table_name, const Block& block); | ||
void Insert(const std::string& table_name, const std::string& query_id, const Block& block); | ||
/// Insertion will be idempotent when `idempotent_id` is not empty. | ||
void Insert(const std::string& table_name, const Block& block, const std::string & idempotent_id = ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems it is good to introduce BlockPtr
which is a std::shared_ptr<const Block>
in this way we don't need copy block during ingest and when retry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, the current legacy interface / implementation of this SDK is not necessary optimal, we can revise when necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I change the new timeplus APIs to receive BlockPtr; while keep the legacy client interface since it does not need to own the block for usage and performance consideration. so user has more flexibility in using client::insert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does legacy client copy the Block internally ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just double checked and no copy as far as I saw. all sub-function access/keep the block as const reference.
timeplus/client.cpp
Outdated
size_t query_text_size = INSERT_INTO.size() + table_name.size() + LEFT_PAREN.size() + fields_section_size + RIGHT_PAREN_VALUES.size(); | ||
|
||
std::string query_text; | ||
query_text.reserve(query_text_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can do query_text_size * 1.2
for example for safety
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed.
also refactor TimeplusConfig to allow user directly set client options.
041ac46
to
a0446c5
Compare
1299dc7
to
cfbd06b
Compare