Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[eventually-postgres] subscribers drop connection and then hang #144

Closed
to266 opened this issue Dec 7, 2020 · 3 comments · Fixed by #145
Closed

[eventually-postgres] subscribers drop connection and then hang #144

to266 opened this issue Dec 7, 2020 · 3 comments · Fixed by #145
Labels
bug Something isn't working

Comments

@to266
Copy link
Contributor

to266 commented Dec 7, 2020

Describe the bug
Occasionally subscribers fail here
https://github.com/eventually-rs/eventually-rs/blob/718d9acc473cf731276aec67e6148b163073a7dc/eventually-postgres/src/subscriber.rs#L82

with simply a logged statement

2020-12-04T10:44:00.727548992Z thread 'actix-rt:worker:0' panicked at 'subscriber connection failed: Error { kind: Closed, cause: None }', /usr/local/cargo/git/checkouts/eventually-rs-937be219cd9e869e/718d9ac/eventually-postgres/src/subscriber.rs:82:35

This would be OK if the process driving an inmemory projection somehow also failed, as that is part of the public API visible to the users of the library. Then the whole thing can be handled in some way or another. However right now if a projector is created as follows, then no error or panic is seen, simply the error is getting logged from the subscriber, and projection stops working.

    let dsn = std::env::var("EVENTUALLY_URL").expect("Eventually url not set");
    let subscriber =
        EventSubscriber::<my_aggregate::Id, my_aggregate::MyEvent>::new(&dsn, EVENTUALLY_NAME)
            .await?;
    let subscription = TransientSubscription::new(eventstore.clone(), subscriber);
    let projection = Arc::new(RwLock::new(MyProjection::default()));
    let mut projector = eventually::Projector::new(projection.clone(), subscription);
    eventually::spawn(async move {
        projector
            .run()
            .await
            .expect("This projection background process failed")
    });

To Reproduce
Steps to reproduce the behavior:

  1. Start projection
  2. Kill database (e.g delete/recreate the pod)
  3. Observe that the projector.run().await never fails
@to266 to266 added the bug Something isn't working label Dec 7, 2020
@to266
Copy link
Contributor Author

to266 commented Dec 7, 2020

So, the way I understand it right now, the following happens:

  • EventSubscriber::new() is created, a new tokio stask is spawned, but the corresponding JoinHandle<T> is not captured. The only way of knowing the state of the spawned task is via the broadcast::channel.
  • If the connection drops, the event.expect() line panics, and the panic is captured by tokio - which then would be visible in the JoinHandle, however we don't have reference to it.
  • Actually even if we did, we don't ever expect the spawned task to finish - so we would never await for it to finish.

A way I see forward:

  • instead of expect, extract the event with an if let Ok(event) (or Some(event), whichever is appropriate) - and in the else branch send a special type of message over the tx_captured, which signals that the background process lost the database connection and it should be restarted. Then in subscribe_all, when such event is captured, an Err is returned instead - which can be propagated to Subscription -> Projector -> user that spawned the projector.
  • Alternatively, maybe the whole (client, mut connection) pair could be moved inside the spawed task, and then restarted as appropriate? Subscription is able to handle duplicate events and skip them as appropriate, so starting again is not an issue. It might be the case that some events get skipped during the restart though, and subscription would not know anything about that (unless we also chain that with a one-off stream from the beginning)

@to266 to266 mentioned this issue Dec 7, 2020
@ar3s3ru
Copy link
Collaborator

ar3s3ru commented Dec 8, 2020

Hey @to266, thanks for reporting the bug.

The fix in #145 works, but I'm not sure the design is correct...
The reason why EventSubscriber spawns the task in new() is because I had no idea how to spawn the task in subscribe_all().

My initial idea was to make the EventStore implement the EventSubscriber trait, so that we could use the bb8::Pool for getting a dedicated connection. However, according to djc/bb8#60 this is not possible, since bb8_postgres::PoolConnectionManager uses type Connection = tokio_postgres::Client, but the notifications need tokio_postgres::Connection:

https://github.com/eventually-rs/eventually-rs/blob/718d9acc473cf731276aec67e6148b163073a7dc/eventually-postgres/src/subscriber.rs#L78

However, we might still be able to use async-stream to remove the logic from EventSubscriber::new() to EventSubscriber::subscribe_all, similar to this:

pub struct EventSubscriber<Id, Event> {
    type_name: &'static str,
    config: tokio_postgres::config::Config,
    id: std::marker::PhantomData<Id>,
    event: std::marker::PhantomData<Event>,
}

impl<Id, Event> EventSubscriber<Id, Event> {
    #[inline]
    pub fn new(type_name: &'static str, config: tokio_postgres::config::Config) -> Self {
        // yadda yadda yadda
    }
}

impl<Id, Event> eventually_core::subscription::EventSubscriber for EventSubscriber<Id, Event>
where
    // all the necessary type bounds
{
    // NOTE: we don't really need a future here, only an EventStream
    fn subscribe_all(&self) -> BoxFuture<Result<EventStream<Self>>> {
        let fut = async move {
            // Maybe TLS should be passed to the constructor?
            let (client, connection) = self.config.connect(tls).await?;

            let client = Arc::new(client);
            let client_captured = client.clone();
            
            let mut stream = futures::stream::poll_fn(move |cx| connection.poll_message(cx));
            let stream = try_stream! {
                while let Some(event) = stream.next().await {
                    // An error that stops the whole stream.
                    let event = event.map_err(EventSubscriber::Stream)?;

                    if let AsyncMessage::Notification(not) = event {
                        yield serde_json::from_str::<NotificationPayload<Event>>(not.payload())
                            .map_err(|e| DeserializeError {
                                message: e.to_string(),
                            })?
                            .and_then(TryInto::try_into);
                    }
                }

                drop(client_captured);
            };

            // Still need to figure out how to call LISTEN using the Client...
            // Maybe join the two futures? Use a barrier?
            Ok(stream)
        };

        Box::pin(fut)
    }
}

@ar3s3ru
Copy link
Collaborator

ar3s3ru commented Dec 9, 2020

Let's continue the discussion here: https://github.com/eventually-rs/eventually-rs/discussions/147

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants