-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async Client #108
base: master
Are you sure you want to change the base?
Async Client #108
Conversation
…ospike wrapper crate
The async-std CI bug is fixed. Looks like a limitation of concurrent connections in the CI env. The benchmark tool is still broken. Sadly, the bencher lib does not support the async setup. Testing against the sync API would be useless since the sync setup is just a block_on wrapper to the async one. That just adds a lot of overhead and slows it down. Please also check the code and tell me if you find anything that i might have forgotten. I think the optimization is good, but surely there is more potential to it. (Ignore batch reads for now. I'm still working on that. The current implementation is still a more or less hacky workaround to get it working). The client API might still be inconsistent between a few functions. That's mainly related to moving values between threads. It removes some comfort features from the related functions (operate, query and batch). I would like to keep it that way so this comfort can stay for the other functions, but that's up to you. Right now i hardcoded an older Aerospike version (5.5.0.2) in the CI file to make it run without the required partitioning update. That needs to be removed later. |
Ready to review from my side. I'm a little unsure about the other open PRs. This one moved the whole client to the aerospike-core folder. @khaf you said you are working on a rework of the partitions. If you already got anything running there, would you please share it as a draft on a own branch for example so i can check if it fits in without a problem or even already integrate it? I think unit tests and a new benchmark should be implemented, but i will do that later in another PR. I guess there is already enough to review here with 117 changed files. |
# Conflicts: # aerospike-core/src/client.rs # aerospike-core/src/cluster/mod.rs # aerospike-core/src/commands/admin_command.rs # aerospike-core/src/commands/stream_command.rs
Hey @khaf Also a lot has changed in the ecosystem and the language itself since the clients initial design. I would also be up to help you in the process of researching/designing/developing a more fitting and cleaner solution than this PR/API is for the addressed problems if that helps you to get this client more "running again" next to your other commitments. |
@jonas32 Thanks Jonas, both for your amazing contributions and understanding. I didn't mean that we'd want to move this PR to another repo to put it to pasture, since it is a great piece of code and unless there is reason to the contrary we'll get it in the repo. No reason to let good work go to waste. What I meant was that as we learn more about how systems work both locally and at scale, new perspectives emerge regarding the use cases different approaches serve. In this specific case, I meant we may want to preserve the threaded version and move the async version to another repo. It is becoming more clear to me that optimizing for latency and throughput are two different goals and async is geared towards the latter. Some of our customers are all about latency and would gladly sacrifice a bit of throughput for very low 99 percentile tail latencies. There are many more things that I'd like to revisit, including testing, life cycles, serialization, instrumentation, monitoring, self-balancing and healing among others. As I begin to think about them, I'll file new discussions on the relevant Github section so that we get the ball rolling. Of course in the meanwhile, I'll have to get this client updated to support the latest features of the server to keep the hype up :) Thank you for your help in keeping this branch up to date and going. I'll take you up on your offer for help, since I've grown quite rusty away from Rust. We invite everyone to take part as we continue to think about the most efficient ways to store and move data at scale. |
Hi, just letting everyone know that my company has started to use @jonas32 's branch in production. It introduces a bug where batch reads on clusters with size > 1 will read batch keys out of order (I've got a pull request fixing this bug pointed towards Jonas' branch). However, it has brought great improvements to both latency and throughput. The greater improvements to latency are because in real usage, it's typically rare to ever do anything once... we can now do lots of operations from a single thread using join! and its far faster and simpler than spinning up threads to do it. It also brings a greater sense of simplicity to the internal design of the crate as it is no longer responsible for managing thread pools or queues in order to do operations in parallel. I trust this could be leveraged to make further improvements in future. Regarding 99th latency. My previous company (one of Aerospike's earliest customers) found that the best way to solve this was using rapid retries using READ_SEQUENCE. I'm hoping to write a patch based on Jonas' branch to all this also. |
@CMoore-Darwinium thanks for the info. I am also planning on using that branch for one of my personal projects. I wonder, have you tried running the sync client with |
@vamshiaruru-virgodesigns we were using spawn::blocking on the sync client until we switched. I don't really think we have an apples-to-apples comparison because the async version allows us to do some stuff that we weren't before. With spawn_blocking we needed to be very disciplined about how many tasks we created, since had to contend with blocking pool exhaustion with high traffic. For the "finalization" part of our transactions it was taking ~30ms to perform one write and maybe 8-9 operations in series. This was also highly variable based on load. Now we use join_all to perform all the async writes on the same thread. This part is consistently over and done with in 1.2ms-1.5ms. Granted, this is all very unscientific because obviously there were some major structural code changes to go from sync to async. However, I'd credit the async client for allowing us to make those code changes. |
@khaf don't get me wrong there. I didn't read your message as if you were saying that the PR should be moved out of sight or should be skipped. I just understand why you are not comfortable with committing to the API that this PR introduces on the longer run. Not focusing on the async client exactly as this PR proposes is totally fine for me, since i agree to the problems that you pointed out before.
Oh, i noticed this too while using the branch, but i did not expect it to be ordered at all in the first place. @vamshiaruru-virgodesigns
The aerospike_core crate is simply the async client. I didn't check, but i guess it can not win that test. Its just adding extra overhead by blocking futures so the user does not have to as a compatibility layer to the current release. Testing the current master version against the async one would be more accurate. That way, its still great if you want to go for throughput, but not ideal for the scenario that @khaf is talking about where customers want to go for latency instead. When i built this, i tested the performance under a few different scenarios. |
@khaf just letting you know, I have a patch in https://github.com/darwinium-com/aerospike-client-rust to make the client rack aware, as well as introducing replica retry policies. As far as I know, this is the main missing feature in the Rust client and it works now. We're currently using this async patch, so this patch is a precondition. |
Hello @CMoore-Darwinium @jonas32 I created patch with TLS support for client from this PR. If anyone can test it with EE version of aerospike that would be great. Also, I want create PR with |
@@ -19,6 +19,7 @@ use serde::Serialize; | |||
use std::convert::From; | |||
|
|||
/// Container object for a record bin, comprising a name and a value. | |||
#[derive(Clone)] | |||
pub struct Bin<'a> { | |||
/// Bin name | |||
pub name: &'a str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that so many life times have been removed, removing this does not seem to be much of a controversial issue. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you talking about the Clone or the lifetime on the struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Struct's name
field. Can be a String
, and life will be easier ever after.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. That will simplify it a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be frank, I'm worried about the CPU cost of all these little conversions, but my impression is we are saturating the io long before the CPU (async itself is very CPU heavy). I hope we don't come to regret this later. On another note, is there an idiomatic way of accepting &str
, String
and &'static str
in all user-facing APIs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes there is. This would work:
fn main() {
let stat: &'static str = "static";
accept_all("&str");
accept_all("String".to_string());
accept_all(stat);
}
fn accept_all<S: Into<String>>(value: S) {
println!("{}", value.into());
}
&self, | ||
policy: &ScanPolicy, | ||
namespace: &str, | ||
set_name: &str, | ||
bins: T, | ||
) -> Result<Arc<Recordset>> | ||
where | ||
T: Into<Bins>, | ||
T: Into<Bins> + Send + Sync + 'static, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preferably, this IntoBins
trait should allow a lazy iterator so that everything fit on the stack and no heavy allocation would be needed. Thoughts?
@@ -247,6 +247,9 @@ pub fn append<'a>(policy: &ListPolicy, bin: &'a str, value: &'a Value) -> Operat | |||
|
|||
/// Create list append items operation. Server appends each input list item to the end of list | |||
/// bin. Server returns list size. | |||
/// | |||
/// # Panics | |||
/// Will panic if values is empty | |||
pub fn append_items<'a>(policy: &ListPolicy, bin: &'a str, values: &'a [Value]) -> Operation<'a> { | |||
assert!(!values.is_empty()); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
&str
probably should become String
here as well.
Some(timeout) => Some(Instant::now() + timeout), | ||
None => None, | ||
} | ||
self.timeout.map(|timeout| Instant::now() + timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sigh. I guess I had forgotten my Scala days.
Alright, I think the best way to move forward would be to create a branch named I'll create that branch latest Sunday evening CET if I don't hear back from anyone. |
The |
Hey @khaf |
@jonas32 Don't worry about it, I'll take care of them. If you have some free time and looking for an interesting problem, I believe the lazy iterator for the serializers is the one. |
Just for clarification, you are talking about something in the direction of |
It's about PR #126 . pub trait ToBins {
/// Convert the struct to a `Bin` Array
fn to_bins(&self) -> Vec<Bin>;
} That Vec is expensive and unnecessary since you don't need the bins all at the same time. It would be better if we could get this: fn to_bin_iter(&self, bins: Option<[String]>) -> iter::Iterator<Bin>; This way the bin is allocated on the stack and it does not matter how big the struct is. |
@jonas32 I'm trying to use the
I see a lot of mentions on the internet, but scarcely anyone provides a workaround that works. How did you test it? |
This happens because the Runtime is not initialized. So put #[tokio::main] over your main and it should work |
That I figured, but I'm using it in a library (that is itself a C plugin). There is no |
i think the easiest way would be using the async client and manually running the runtime. |
Shouldn't this be how the |
@khaf you can create runtime without main https://docs.rs/tokio/latest/tokio/runtime/index.html#usage |
without main annotation, you cant really make a default one. You need to spawn manually then. But yes, it might be worth to integrate that into the client. Ill have a look at that later. |
} | ||
} | ||
|
||
impl Drop for PooledConnection { | ||
fn drop(&mut self) { | ||
if let Some(conn) = self.conn.take() { | ||
self.queue.put_back(conn); | ||
block_on(self.queue.put_back(conn)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using futures::executor::block_on underneath an async function can cause a deadlock.
We had a production outage this week because of this line. I'm about to make a special patch to this branch to address it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for this inconvenience, i didnt know about that.
You should now be able to directly push to my branch in case its needed, since I currently dont have the time to work on this branch myself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I'll run my fix past you and @khaf to see if everyone's onboard with it first.
I changed internals to a synchronous mutex, to allow it to be used more easily in the drop function.
Synchronous mutexes are safe in async code (and actually outperform async mutexes) with the proviso that they are never held across .await points... otherwise they deadlock.
So the put_back function now looks like this:
pub fn put_back(&self, mut conn: Connection) {
let mut internals = self.0.internals.lock().unwrap();
if internals.num_conns < self.0.capacity {
internals.connections.push_back(IdleConnection(conn));
} else {
aerospike_rt::spawn(async move { conn.close().await });
internals.num_conns -= 1;
}
}
Because of the limitation on holding mutexes across await points, I also had to restructure fn get(&self)
to do the synchronous code first and the asynchonous code later, as so:
pub async fn get(&self) -> Result<PooledConnection> {
let mut connections_to_free = Vec::new();
let mut conn: Option<Connection> = None;
{
let mut internals = self.0.internals.lock().unwrap();
while let Some(IdleConnection(this_conn)) = internals.connections.pop_front() {
if this_conn.is_idle() {
connections_to_free.push(this_conn);
internals.num_conns -= 1;
} else {
conn = Some(this_conn);
break;
}
}
if conn.is_none() {
if internals.num_conns >= self.0.capacity {
bail!(ErrorKind::NoMoreConnections);
}
internals.num_conns += 1;
}
}
for mut connection in connections_to_free {
connection.close().await;
}
if conn.is_none() {
let new_conn = aerospike_rt::timeout(
Duration::from_secs(5),
Connection::new(&self.0.host.address(), &self.0.policy),
)
.await;
let Ok(Ok(new_conn)) = new_conn else {
self.0.internals.lock().unwrap().num_conns -= 1;
bail!(ErrorKind::Connection(
"Could not open network connection".to_string()
));
};
conn = Some(new_conn);
}
Ok(PooledConnection {
queue: self.clone(),
conn,
})
Not only does it allow the synchonous mutex to be used here, but it also does not hold a lock while calling close(), so it should be faster under load.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh by the way. I mentioned the outage, but I didn't mention how great it is to have this patch at all. I'm not sure if we'd have a production environment without it.
This first commit includes most of the basic changes to the client to get it running on async-std and tokio.
There are surely a few more spots that need work or block threads. Feel free to have a look.
The client is not really working yet. I did never test it at least since unit tests are still broken.
Tests are not working yet. There are still a few logical things that need to be changed for async, but that would again be a bigger change itself (thread pools etc.).