Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent storage for activity queue #31

Open
Nutomic opened this issue Mar 27, 2023 · 12 comments
Open

Persistent storage for activity queue #31

Nutomic opened this issue Mar 27, 2023 · 12 comments
Labels
enhancement New feature or request good first issue Good for newcomers

Comments

@Nutomic
Copy link
Member

Nutomic commented Mar 27, 2023

The queue for sending outgoing activities has an in-memory storage for activities that failed to be delivered and need to be retried later, when the target server is hopefully reachable again. As its only in memory, this storage is gone after a restart or crash. It would be good to provide a config option for storing it on disk, eg in a sled database.

@GeorgeLS
Copy link

GeorgeLS commented Jul 2, 2023

Hello there @Nutomic. I would like to work on this issue. Should we go with using sled or look for alternative solutions?
We could implement something on our own as well but sled looks pretty optimized.

Some questions first:

  • What is the expected workload?
  • What should be the expected behaviour after a restart of the sender? When the instance comes back again, should the failed messages be prioritized?

Also, I have another comment which we could track on another issue:
I see that sled has support for batch operations. This would be good to use and exhibit the same behaviour for the send action as well in the activity level. Let me explain what I mean using the example below:

We have two servers: A and B. We have a user x in server A who is being followed from n users from server B.
When user x posts something, then server A will have to make n requests to server B in order to share the information. Instead of that, we could do a batch operation an instead send one request with the information that the request data should be shared among the n users of server B. This is more performant and creates less traffic.

Disclaimer: I don't know if this implemented right now in Lemmy. From my understanding, it is not. Also, I don't know if this kind of behaviour is compatible with the ActivityPub protocol.

@sunaurus
Copy link

sunaurus commented Jul 2, 2023

My 2 cents regarding the downsides of Sled:

  • Admins will be forced to back up and restore the sled files when redeploying servers (either that, or risk losing a bunch of outgoing messages)
  • lemmy_server will gain a hard requirement for storage (currently, it will hapily run on a tiny disk).
  • Tooling around Sled is quite limited. Trying to debug issues in production is difficult. I have spent several hours trying to extract some partial data from the Sled database for pict-rs - something which would have have taken minutes on Postgres.

As lemmy_server already has a dependency on Postgres anyway, why not just leverage that? We are already writing all activites into the database anyway, so we already know that the database can handle it.

If more speed is required and/or if reliability of data is not a huge concern, then I would still rather use Redis than Sled, just because Redis is easy to host on its own server, and has great tooling.

@phiresky
Copy link
Contributor

phiresky commented Jul 3, 2023

We were discussing this on matrix, and @cetra3 , @sunaurus and me were all of the opinion a good first attempt at this could use PostgreSQL. It could look something like the following (pseudocode):

--- uses the existing activity table for most of the data
CREATE TYPE federation_type as enum (comment, post, comment_vote, post_vote);
CREATE TYPE federation_state as enum (todo, in_progress, gave_up);
CREATE TABLE federation_queue (
    id bigserial primary key,
    activity_id not null references activity.id,
    created timestamptz not null,
    inbox_url text not null,
    federation_type federation_type not null,
    state federation_state not null,
    retries bigint not null default 0,
);

the main process would insert stuff into federation_queue. A (potentially separate) process would take out values using the following:

WITH (SELECT id from federation_queue ORDER BY XXXX FOR UPDATE SKIP LOCKED LIMIT 1000) as batch
DO UPDATE federation_queue set state = 'in_progress' from batch where batch.id = federation_queue.id
RETURNING activity_id, id

That way, we get the following:

  • persistence on server restart
  • possibility for multi-consumer (horizontally scalable) dequeueing. helped by the FOR UPDATE SKIP LOCKED.
  • possibility of prioritization based on inbox_url (since right now some slow host can hold up the whole queue)
  • possiblity of prioritization based on federation_type (the ratio of votes to content is about 50:1 but votes take almost the same resources as post/comment federation). federating comments/posts is more important than federating votes so they take priority.

Implementation:

The postgresql part would probably be in lemmy. in activitypub-queue would be a trait something like:

pub trait PersistentFederationQueue  {
     /// add an activity to the queue. todo: figure out interaction with the activity table since it's already stored in there on the lemmy side
    pub fn queue(activity: ActivityJson, inboxes: Vec<Url>);
     /// retrieves a batch from the database and sets its state to "in_progress". returns (id, activity as json, inbox_urls)
     pub fn get_batch(limit: i64) -> Vec<(i64, ActivityJson, Vec<Url>);
     /// deletes / marks as finished
     pub fn mark_finished(ids: Vec<i64>);
     /// requeues some requests that have failed (state in_progress back to todo). the db should track the the retry count and figure out the delays i guess
    pub fn requeue(ids: Vec<i64>);
}

@Nutomic
Copy link
Member Author

Nutomic commented Jul 3, 2023

Agree with phiresky, although the queue function needs look more like this:

pub fn queue(task: SendActivityTask, num_retries: u8);

The num_retries param can be used so that Lemmy can avoid storing the first and second retry (after 60s, 60m), because these would cause excessive db writes.

In any case we currently need to make some major changes to the activity queue code in order to fix performance problems. This persistent storage would cause git conflicts and could worsen performance even more, so now is not a good time to implement it.

@phiresky
Copy link
Contributor

phiresky commented Jul 3, 2023

The num_retries param can be used so that Lemmy can avoid storing the first and second retry (after 60s, 60m), because these would cause excessive db writes.

I thought so as well, but one issue is that that prevents the whole dequeuing from running on separate processes/servers..

In any case we currently need to make some major changes to the activity queue code in order to fix performance problems. This persistent storage would cause git conflicts and could worsen performance even more, so now is not a good time to implement it.

I guess, but also this would play a huge part in fixing those problems by allowing implementation of proper prioritization based on hosts and on activity type, as well as allowing the queue to be out of lemmy process memory and main lemmy cpu time. It's not just about the robustness. But yeah it would be much larger changes than simpler "fixes".

@phiresky
Copy link
Contributor

phiresky commented Jul 8, 2023

I just realized something: There doesn't actually need to be a table that stores the send state for every inbox-activity combination like the CREATE TABLE federation_queue I described above. The existing activity table is enough as a persistent queue.

It's enough if there's a very tiny table like this:

CREATE TABLE outgoing_state (
      inbox_url text primary key,
      last_successful_activity_id not null references activity.id,
);

Then, for every receiver inbox, the dequeuer can just take the a batch of the next 1000 activities from the activities table, filter them by those that need to be received by this inbox, send them out, and update the last_successful_activity_id. If one of those activities fails to send then the others won't work either (just need to make sure it doesn't get stuck because of bugs).

That should reduces the storage and perf needed out of the queueing system (e.g. postgresql) by instance_count so by like 300x. @cetra3

It does increase the processing load if you have many instances that only subscribe to a tiny subset of content, but probably worth it.

@colatkinson
Copy link
Contributor

Hey just wanted to check whether the implementation described above (this crate provides a PersistentFederationQueue trait, applications implement their own storage solution) is generally accepted as the way forward. I noticed the comment on #64, and so was unsure of the status.

For my personal use-case, BYO queue would be very helpful -- though if that ends up not being the actual implementation for whatever reason, I'll try and find a workaround of some description.

@Nutomic
Copy link
Member Author

Nutomic commented Jul 10, 2023

@colatkinson Can you explain why you want to bring your own queue? For me the use case isnt clear.

@colatkinson
Copy link
Contributor

So in my case I'm already pulling in a different persistent distributed queuing system. While I haven't yet written a prototype of this part (so everything below should be considered speculative at best), my general inclination was to use it for sending activities as well. In addition, I currently support multiple RDBMSes -- so a dependency on any particular DB (be it Postgres or another) would be undesirable.

Beyond that, I'd expect users of the crate to have fairly diverse needs -- a single-node bot may just need some embedded DB, whereas an application concerned with high scalability may want to use e.g. Kafka. Similarly, I can imagine some applications requiring custom behaviors around prioritization or retry intervals.

That said, effectively my current plan is to apply a (vaguely hacky) patch downstream to expose a retry-less send_activity() to allow lifting this logic out of the crate -- while hopefully reducing the scope of potential merge conflicts you mentioned upthread.

Hopefully this explains it somewhat -- let me know if there's anything I can clarify further. Also happy to help with implementation efforts, though I obviously don't want to interfere with any performance work happening in this area.

@cetra3
Copy link
Contributor

cetra3 commented Jul 11, 2023

Would #64 be in the right direction for what you're after?

@Nutomic
Copy link
Member Author

Nutomic commented Jul 11, 2023

@colatkinson This crate is not going to have any direct database dependency, it will only expose a trait like PersistentFederationQueue mentioned above, so that applications can handle storage however they want.

For me its still not clear what the advantages of bring-your-own-queue are. If you want to get rid of retries, it would be much easier to add a setting for that.

@colatkinson
Copy link
Contributor

Ah there may have been a misunderstanding on my part -- I was assuming that bring-your-own-queue was referring to being able to implement a PersistentFederationQueue, and that the persistent queue would essentially replace the existing ActivityQueue.

My interest is primarily in having application-defined persistent queuing. If the plan is to keep ActivityQueue to shuffle messages from the PersistentFederationQueue to worker tasks or something like that, then yeah I agree BYO in-memory queue seems unnecessary for my purposes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request good first issue Good for newcomers
Projects
None yet
Development

No branches or pull requests

6 participants