Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JetStream ConsumerContext Drain function not waiting for in flight requests to be done processing #1672

Open
thomas-maurice opened this issue Jul 15, 2024 · 6 comments
Assignees
Labels
defect Suspected defect such as a bug or regression

Comments

@thomas-maurice
Copy link

thomas-maurice commented Jul 15, 2024

Observed behavior

I am observing that the ConsumerContext.Drain function does not wait for the processing of in flight requests to finish before returning.

Expected behavior

I would expect the Drain function to wait for the processing of the current in flight message to finish before returning, allowing to cleanly exit the consumer.

This is especially odd because the documentation states the following:

Drain unsubscribes from the stream and cancels subscription. All messages that are already in the buffer will be processed in callback function.

So I would expect that the Drain function would block until the buffer is empty, then return. I am observing the same behaviour with the Stop function (though I would expect it to only wait for the current Consume run to finish)

It would be really neat if either Drain & Stop functions could block until the buffer/current inflight are processed (or after a timeout has passed if it is taking too long), or if we could have a method in the Consumer or ConsumerContext that could provide us with the number of requests being processed or still waiting for processing so we could determine when it is safe to shutdown the program, if I am not mistaking right now there are none

Server and client version

nats-server: v2.10.17
nats: 0.1.4
nats go lib: github.com/nats-io/nats.go v1.36.0

Host environment

OSX 14.4.1 on M1 Max
Reproduced on a framework 13 AMD, running Arch with the latest linux kernel

Steps to reproduce

You can use the following code to validate the behaviour. This creates a stream and a consumer listening on the subjects hello.>, the Consume function takes an artificially long amount of time to process the requests (5s).

By running this program, sending a message through one of the listened to subjects like so nats pub hello.foo hello and killing the program right after the wait started, we notice that it exits before the Consume run has finished.

package main

import (
	"context"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/nats-io/nats.go"
	"github.com/nats-io/nats.go/jetstream"
	"github.com/sirupsen/logrus"
)

const (
	NATS_URL = "nats://localhost:4222"
)

func mustGetNats(url string) *nats.Conn {
	conn, err := nats.Connect(url)
	if err != nil {
		logrus.WithError(err).Panic("could not connect to NATS")
	}

	return conn
}

func main() {
	consumerConn := mustGetNats(NATS_URL)

	js, err := jetstream.New(consumerConn)
	if err != nil {
		logrus.WithError(err).Fatal("could not open jetstream")
	}

	_, err = js.CreateOrUpdateStream(context.Background(), jetstream.StreamConfig{
		Name:        "programatic-stream",
		Description: "Some description",
		Subjects:    []string{"hello.>"},
		Retention:   jetstream.LimitsPolicy,
		MaxBytes:    2 * 1024 * 1024 * 1024,
		MaxMsgs:     10000,
		Discard:     jetstream.DiscardOld,
		MaxAge:      time.Hour * 24 * 14,
		Storage:     jetstream.FileStorage,
		Replicas:    1,
	})

	if err != nil {
		logrus.WithError(err).Fatal("could not create stream")
	}

	_, err = js.CreateOrUpdateConsumer(context.Background(), "programatic-stream", jetstream.ConsumerConfig{
		Durable:       "programatic-consumer",
		Name:          "programatic-consumer",
		AckPolicy:     jetstream.AckExplicitPolicy,
		MaxDeliver:    10,
		DeliverPolicy: jetstream.DeliverNewPolicy,
	})

	if err != nil {
		logrus.WithError(err).Fatal("could not create consumer")
	}

	consumer, err := js.Consumer(context.Background(), "programatic-stream", "programatic-consumer")
	if err != nil {
		logrus.WithError(err).Fatal("could not get the consumer")
	}

	consumerContext, err := consumer.Consume(func(msg jetstream.Msg) {
		logrus.Info("artificially wait 5s")
		time.Sleep(time.Second * 5)
		logrus.Info("waited 5s")
		logrus.Infof("got a message %s\n", string(msg.Data()))
		msg.Ack()
	})

	if err != nil {
		logrus.WithError(err).Fatal("could not create consumer")
	}

	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
	<-sigs
	logrus.Info("got interrupted")
	consumerContext.Drain()
	logrus.Info("drained consumer")
}

The console output I get is the following

INFO[0019] artificially wait 5s                         
^CINFO[0022] got interrupted                              
INFO[0022] drained consumer     

When I would have expected something like this

INFO[0019] artificially wait 5s                         
^CINFO[0020] got interrupted                              
INFO[0024] waited 5s
INFO[0024] got a message: hello
INFO[0024] drained consumer     

Am I doing something wrong or is it actually a bug ?

@thomas-maurice thomas-maurice added the defect Suspected defect such as a bug or regression label Jul 15, 2024
@thomas-maurice thomas-maurice changed the title JetStream consumer Drain function not waiting for in flight requests to be done processing JetStream ConsumerContext Drain function not waiting for in flight requests to be done processing Jul 15, 2024
@JakubSchneller
Copy link

I'm dealing with the same exact issue.

@jamm3e3333
Copy link

me too

@piotrpio piotrpio self-assigned this Jul 15, 2024
@krizacekcz
Copy link

We are using the new API too and you are having the same issue. But off-topic one: why do you call CreateOrUpdateConsumer, emit the result and then you call Consumer again, when u can receive it in the first call ?

_, err = js.CreateOrUpdateConsumer(context.Background(), "programatic-stream", jetstream.ConsumerConfig{
		Durable:       "programatic-consumer",
		Name:          "programatic-consumer",
		AckPolicy:     jetstream.AckExplicitPolicy,
		MaxDeliver:    10,
		DeliverPolicy: jetstream.DeliverNewPolicy,
	})

	if err != nil {
		logrus.WithError(err).Fatal("could not create consumer")
	}

	consumer, err := js.Consumer(context.Background(), "programatic-stream", "programatic-consumer")
	if err != nil {
		logrus.WithError(err).Fatal("could not get the consumer")
	}

@piotrpio
Copy link
Collaborator

Please see my comment on the other issue: #1673 (comment)

@thomas-maurice
Copy link
Author

We are using the new API too and you are having the same issue. But off-topic one: why do you call CreateOrUpdateConsumer, emit the result and then you call Consumer again, when u can receive it in the first call ?

That's an oversight of my part i could just get the consumer from that line, I put it together in a hurry for the example

@piotrpio nice, I'll check it out thanks !

@thomas-maurice
Copy link
Author

I would like to add a bit more input to that, I tried as suggested by @typecampo on Slack calling Drain on the Nats connection does what I intend, as opposed to calling it on the consumer directly. This is odd that it works on the connection and not the consumer, but that's a workaround!

As an example here is the code I used to test it:

package main

import (
	"context"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/nats-io/nats.go"
	"github.com/nats-io/nats.go/jetstream"
	"github.com/sirupsen/logrus"
)

const (
	NATS_URL = "nats://localhost:4222"
)

func main() {
	closedWg := &sync.WaitGroup{}
	closedWg.Add(1)
	consumerConn, err := nats.Connect(NATS_URL, nats.ClosedHandler(func(c *nats.Conn) {
		closedWg.Done()
	}))
	if err != nil {
		logrus.WithError(err).Panic("could not connect to NATS")
	}

	js, err := jetstream.New(consumerConn)
	if err != nil {
		logrus.WithError(err).Fatal("could not open jetstream")
	}

	_, err = js.CreateOrUpdateStream(context.Background(), jetstream.StreamConfig{
		Name:        "programatic-stream",
		Description: "Some description",
		Subjects:    []string{"hello.>"},
		Retention:   jetstream.LimitsPolicy,
		MaxBytes:    2 * 1024 * 1024 * 1024,
		MaxMsgs:     10000,
		Discard:     jetstream.DiscardOld,
		MaxAge:      time.Hour * 24 * 14,
		Storage:     jetstream.FileStorage,
		Replicas:    1,
	})

	if err != nil {
		logrus.WithError(err).Fatal("could not create stream")
	}

	consumer, err := js.CreateOrUpdateConsumer(context.Background(), "programatic-stream", jetstream.ConsumerConfig{
		Durable:       "programatic-consumer",
		Name:          "programatic-consumer",
		AckPolicy:     jetstream.AckExplicitPolicy,
		MaxDeliver:    10,
		DeliverPolicy: jetstream.DeliverNewPolicy,
	})

	if err != nil {
		logrus.WithError(err).Fatal("could not create consumer")
	}

	_, err = consumer.Consume(func(msg jetstream.Msg) {
		logrus.Info("artificially wait 5s")
		time.Sleep(time.Second * 5)
		logrus.Info("waited 5s")
		logrus.Infof("got a message %s\n", string(msg.Data()))
		msg.Ack()
	})

	if err != nil {
		logrus.WithError(err).Fatal("could not create consumer")
	}

	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
	<-sigs
	logrus.Info("got interrupted")
	consumerConn.Drain()
	closedWg.Wait()
	logrus.Info("drained consumer")
}

By sending two consecutive messages to the subject here is the output I get

INFO[0003] artificially wait 5s                         
^CINFO[0004] got interrupted                              
INFO[0008] waited 5s                                    
INFO[0008] got a message test                           
INFO[0008] artificially wait 5s                         
INFO[0013] waited 5s                                    
INFO[0013] got a message test                           
INFO[0013] drained consumer 

Hope it helps!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

5 participants