Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unclear how to keep the consumer open after an exception is thrown #177

Open
jndietz opened this issue Aug 3, 2022 · 1 comment
Open

Comments

@jndietz
Copy link

jndietz commented Aug 3, 2022

Documentation Issue

I think it would be good to document how to keep a consumer alive, or handle exceptions gracefully within a reactive stream. I came up with a solution but it doesn't seem correct.

Improvement Suggestion

All of the example I've found online show the receiver being closed after it has consumed events. I think a lot of people have use cases where there is a listener constantly running, and handles messages as they come in as opposed to just shutting down after they've all been consumed.

Additional context

    Disposable processAuditEvents() {
        final String AUDIT_QUEUE_NAME = "audit"

        receiver
                .consumeAutoAck(AUDIT_QUEUE_NAME)
                .subscribe(m -> {
                    log.debug("Received message {}", new String(m.body))
                    def wrapper = objectMapper.readValue(m.body, WrapperObject)
                    strategies
                            .find { it.handles(wrapper.payload) }
                            .handleEvent(wrapper.payload)
                }, { ex ->
                    log.error("Something bad happened: ${ex.message}")
                    processAuditEvents() // 👈 just re-register the consumer on error 🤷‍♂️
                })
    }

Instead of using a Receiver, I ended up falling back to the annotation-driven style of listening, and this appears to work well enough for me.

    @RabbitListener(queues = "audit", ackMode = "MANUAL")
    Mono<Void> listen(CustomerActivityEventWrapper wrapper) {
        log.debug("Received message {}", wrapper.toString())
        strategies
                .find { it.handles(wrapper.payload) }
                .handleEvent(wrapper.payload)
    }
@matsev
Copy link

matsev commented Nov 11, 2024

Hi @jndietz! 👋

Did you find a solution to this problem or did you manage to work around it? We have a problem that occasionally causes our receiver to terminate that we would like to mitigate, c.f. #186

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants