-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding functionality to satisfy v0.itworks #5
Conversation
Signed-off-by: Pavel Abramov <[email protected]>
No more NATS Signed-off-by: Pavel Abramov <[email protected]>
Signed-off-by: Pavel Abramov <[email protected]>
Because that is the correct name to use Signed-off-by: Pavel Abramov <[email protected]>
That also introduces Notifier Error and NkvClient now has map instead of single channel, need to think about cleaning it up Signed-off-by: Pavel Abramov <[email protected]>
Signed-off-by: Pavel Abramov <[email protected]>
Notify Key Value will load previously stored values and delete will remove values from filesystem Signed-off-by: Pavel Abramov <[email protected]>
This way it is easier to trace and debug things Signed-off-by: Pavel Abramov <[email protected]>
@deitch PR is ready for review, I'll do rest of todo in separate PR(s) Edit: I tried to make it easier to review commit-by-commit, i.e. each commit serves one purpose |
let req = ServerRequest::Delete(BaseMessage { | ||
id: Self::uuid(), | ||
key, | ||
}); | ||
self.send_request(&req).await | ||
} | ||
|
||
pub async fn subscribe(&mut self, key: String) -> tokio::io::Result<ServerResponse> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of this sets up a subscription (nice catch on the, "you already subscribed to this key, go away").
The subscriptions are stored in NkvClient.subscriptions
, which is a HashMap<String, watch::Receiver<Message>>
. How does that get used? The call is to NkvClient.subscribe("somekey")
, which just returns the server response. How do I set up a handler that gets triggered for each such change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, and those changes are sent to tokio::watch
which is a channel storing latest update. I think that it might make sense to make that HashMap public, so that clients could write their own handlers, idk how good of a practice it is to pass handlers for subscriptions (they way we're doing it right now)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would either of those look like? Sample code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think passing handlers for subscriptions would be the right way to do it. You would need an interface trait defining the handler, and then the subscribe
call would need to call it. Something like we do now (yeah, I didn't do it 100% right, don't care):
trait Handler {
fn handle(&self, message: &dyn Message);
}
pub async fn subscribe(&mut self, key: String, handler: Handler) -> tokio::io::Result<ServerResponse>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But you want 3 different handlers for subscribe, update and delete ? I mean we're talking again about general library ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need handlers for client update and delete? NkvClient user calls delete()
, it either succeeds or fails.
I was thinking a single handler for subscribe()
, which receives all changes to key x
.
Do you think we need separate ones for "there were updates to the keys" and "keys were deleted" and "keys were created"? I think most systems that work async (mainly message buses) just have a "subscribe to this topic and I will tell you everything on it". I think having separate handlers makes it more complicated? On the other hand, it means you need boilerplate to handle what kind of update it was.
I would start with one handler for subscribe. Let code calling NkvClient
figure out what to do with the update based on its type. We always can add finer-grained later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, makes sense, I don't want to introduce to many knobs, because it'll lead to boiler plating which I want to avoid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created #7
I put some comments inline. One other important thing that I didn't address is the "keyspace". What is our keyspace? I see a few options:
|
Signed-off-by: Pavel Abramov <[email protected]>
Regarding "key space" I believe our best option would be to have it implicit like NATS, via separated dot. Because technically it won't change the way we store things and the only thing that would be added is to get all by a regex as a special case, we can then update documentation |
Are you sure that implicit doesn't change storage? There can be significant performance differences between |
True, performance will be different, but depends on how many keys will we have? Say we are talking about 20 services which are namespaces of first order, then there are subscriptions/publications of 25 in each which are namespaces of 2nd order and each of them would contain say N values for each Edge App (say 25 as well). So it's ~12500 entries in total. How much it would increase our performance to iterate over 625 entries say instead of 12500 for each request? Well, I need to add benchmarks so that we could see it in microseconds :D |
You're thinking eve. I'm thinking, you're building a generic high performance low footprint kv store. |
Question is then, how many layers of nested namespaces should we allow in such generic store? |
Truth is, it doesn't matter. Define the API day one, you always can optimize the back end later. You just need to think about what the API would constrain later. |
Then making it implicit makes sense, we can change implementation later, so client can do wild masks on dot patterns, wdyt? |
Sure. Start with that, use regex for linear search. Split it in the future. |
It is not needed anymore Signed-off-by: Pavel Abramov <[email protected]>
Put description in #6 will address it in separate PR, it'll need some love |
// storage with ability to notify clients about changes | ||
// made in a value. When created via new() it will try to | ||
// load values from folder. Underlying structure is a HashMap | ||
// and designed to be access synchronously. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here it gets confusing. Is a "client" what NkvClient
is? Or is NkvClient
a network client that talks to some server structure, that wraps around NotifyKeyValue
? I think you mean the second, and the "client" here is the server (which then handles network communication), but it is not 100% clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"synchronously".
Let me see if I get this, and then we can update the docs.
CLI client --> NkvClient --> network --> CLI server --> NotifyKeyValue --> filesystem
Is that correct? If so:
- is
NotifyKeyValue
an actual implementation? Are there traits it implements, so that it can be plugged in with another implementation in the future? I am thinking about testing but also pure in-memory, etc. We have learned from EVE pubsub that having pluggable implementations for the backend storage is very useful. Or is it that there is a trait that the server would expect, andNotifyKeyValue
implements that trait? I think the other way around, asNotifyKeyValue
describes the behaviour, and using the filesystem is a concrete implementation. Or is it already this way and I just missed it? - can we update the comment to make it clear what the role of it is, what we mean by a "client"?
- synchronous, then, is a specific of the implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is correct, NotifyKeyValue
is an actual implementation, we can extract traits from it, like put
, get
, delete
, subscribe
, unsubscribe
so that we can create variations. Idea behind NotifyKeyValue is that it is just a container containing keys and Values, which are actually composition of PersistentValue and Notifier, so every time you update value you notify subscribers and store the value on a disk. Technically, we can implement Builder pattern which will build you necessary NotifyKeyValue
with PersistentValue or InMemory Value and Notifier being TCPNotifier or UnixSocketNotifier or whatever you want it to be.
It is sync under the hood but from the user (or client) perspective, whenever you are creating an object called NotifyKeyValue, you interact with it via channels, therefore you can access NotifyKeyValue in an async manner, but under the hood it's going to be a queue from channels which is processed synchronously. So technically from NotifyKeyValue perspective Server is a client :D Meaning Server creates NotifyKeyValue instance and communicates with it via channels and those channels are created for each connection server handles
src/server/srv.rs
Outdated
let sub_resp = client.subscribe(key.clone()).await.unwrap(); | ||
assert_eq!( | ||
sub_resp, | ||
request_msg::ServerResponse::Base(request_msg::BaseResp { | ||
id: "0".to_string(), | ||
status: http::StatusCode::OK, | ||
message: "Subscribed".to_string(), | ||
}) | ||
); | ||
// Give server time to subscribe | ||
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; | ||
|
||
let new_value: Box<[u8]> = Box::new([42, 0, 1, 0, 1]); | ||
let resp = client.put(key.clone(), new_value.clone()).await.unwrap(); | ||
assert_eq!( | ||
resp, | ||
request_msg::ServerResponse::Base(request_msg::BaseResp { | ||
id: "0".to_string(), | ||
status: http::StatusCode::OK, | ||
message: "No Error".to_string(), | ||
}) | ||
); | ||
let result = client.latest_state(&key).await; | ||
assert!(result.is_ok()); | ||
match result { | ||
Ok(Message::Update { value }) => { | ||
assert_eq!(value, new_value) | ||
} | ||
_ => panic!("Expected no errors"), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@deitch here is how getting the latest state looks like, otherwise we can make state pub and basically allow user to implement their own variation of latest_state
Signed-off-by: Pavel Abramov <[email protected]>
It's enough time to get server up and running and that reduces test time Signed-off-by: Pavel Abramov <[email protected]>
2d1d298
to
6f431fd
Compare
Signed-off-by: Pavel Abramov <[email protected]>
6f431fd
to
9e4ba3d
Compare
It is not needed Signed-off-by: Pavel Abramov <[email protected]>
Actually got too into it so I finished all the todo list, @deitch I see one unresolved discussion about documentation, once it's done I think we're safe to merge this and proceed with other issues |
Signed-off-by: Pavel Abramov <[email protected]>
fc8085c
to
8aad3cd
Compare
This commit removes NatsClient and also introduces proper subscribe mechanism, which you can actually use to receive new values.
There's a bunch of things to do till version v0.itworks , we can slowly continue moving with multiple PRs or I'll just create commit for each point in TODO list
TODO: