From 0806ada8c7dc1f36c120695220e3cbe39bfaaa73 Mon Sep 17 00:00:00 2001 From: alexrudd Date: Sat, 14 Nov 2020 00:12:10 +0000 Subject: [PATCH 1/2] Add support for stop position --- v2/client.go | 67 ++++++++++++++++++++++++++++++++++++++++++++++++---- v2/go.mod | 2 +- v2/go.sum | 2 ++ 3 files changed, 65 insertions(+), 6 deletions(-) diff --git a/v2/client.go b/v2/client.go index 0040c28..244ed8a 100644 --- a/v2/client.go +++ b/v2/client.go @@ -39,6 +39,13 @@ func (s StartPosition) toProto() proto.StartPosition { return proto.StartPosition(s) } +// StopPosition controls where to stop consuming in a stream. +type StopPosition int32 + +func (s StopPosition) toProto() proto.StopPosition { + return proto.StopPosition(s) +} + const ( defaultMaxConnsPerBroker = 2 defaultKeepAliveTime = 30 * time.Second @@ -65,6 +72,10 @@ var ( // subscribed to has been deleted. ErrStreamDeleted = errors.New("stream has been deleted") + // ErrStreamExhausted is sent to subscribers when the stop position has been + // reached. + ErrStreamExhausted = errors.New("stream has been exhausted") + // ErrPartitionPaused is sent to subscribers when the stream partition they // are subscribed to has been paused. ErrPartitionPaused = errors.New("stream partition has been paused") @@ -434,9 +445,12 @@ type Client interface { // Subscribe creates an ephemeral subscription for the given stream. It // begins receiving messages starting at the configured position and waits // for new messages when it reaches the end of the stream. The default - // start position is the end of the stream. It returns an - // ErrNoSuchPartition if the given stream or partition does not exist. Use - // a cancelable Context to close a subscription. + // start position is the end of the stream. + // ErrNoSuchPartition is returned if the given stream or partition does not + // exist. + // ErrStreamExhausted is sent to subscribers when the stop position has been + // reached. + // Use a cancelable Context to close a subscription. Subscribe(ctx context.Context, stream string, handler Handler, opts ...SubscriptionOption) error // Publish publishes a new message to the Liftbridge stream. The partition @@ -895,6 +909,15 @@ type SubscriptionOptions struct { // StartTimestamp sets the stream start position to the given timestamp. StartTimestamp time.Time + // StopPosition controls where to stop consuming in the stream. + StopPosition StopPosition + + // StopOffset sets the stream offset to stop consuming at. + StopOffset int64 + + // StopTimestamp sets the stream stop position to the given timestamp. + StopTimestamp time.Time + // Partition sets the stream partition to consume. Partition int32 @@ -958,6 +981,33 @@ func StartAtEarliestReceived() SubscriptionOption { } } +// StopAtOffset sets the desired stop offset to stop consuming at in the stream. +func StopAtOffset(offset int64) SubscriptionOption { + return func(o *SubscriptionOptions) error { + o.StopPosition = StopPosition(proto.StopPosition_STOP_OFFSET) + o.StopOffset = offset + return nil + } +} + +// StopAtTime sets the desired timestamp to stop consuming at in the stream. +func StopAtTime(stop time.Time) SubscriptionOption { + return func(o *SubscriptionOptions) error { + o.StopPosition = StopPosition(proto.StopPosition_STOP_TIMESTAMP) + o.StopTimestamp = stop + return nil + } +} + +// StopAtLatestReceived sets the subscription stop position to the last +// message received in the stream. +func StopAtLatestReceived() SubscriptionOption { + return func(o *SubscriptionOptions) error { + o.StopPosition = StopPosition(proto.StopPosition_STOP_LATEST) + return nil + } +} + // ReadISRReplica sets read replica option. If true, the client will request // subscription from an random ISR replica instead of subscribing explicitly // to partition's leader. As a random ISR replica is given, it may well be the @@ -1474,6 +1524,9 @@ func (c *client) subscribe(ctx context.Context, stream string, StartPosition: opts.StartPosition.toProto(), StartOffset: opts.StartOffset, StartTimestamp: opts.StartTimestamp.UnixNano(), + StopPosition: opts.StopPosition.toProto(), + StopOffset: opts.StopOffset, + StopTimestamp: opts.StopTimestamp.UnixNano(), Partition: opts.Partition, ReadISRReplica: opts.ReadISRReplica, Resume: opts.Resume, @@ -1499,9 +1552,13 @@ func (c *client) subscribe(ctx context.Context, stream string, continue } if err != nil { - if status.Code(err) == codes.NotFound { + switch status.Code(err) { + case codes.NotFound: err = ErrNoSuchPartition + case codes.ResourceExhausted: + err = ErrStreamExhausted } + return nil, nil, err } return st, func() { pool.put(conn) }, nil @@ -1553,7 +1610,7 @@ LOOP: err = ErrPartitionPaused case codes.ResourceExhausted: // Indicates the end of a readonly partition has been reached. - err = ErrReadonlyPartition + err = ErrStreamExhausted } handler(messageFromProto(msg), err) } diff --git a/v2/go.mod b/v2/go.mod index 4353301..fd9b00f 100644 --- a/v2/go.mod +++ b/v2/go.mod @@ -6,7 +6,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/protobuf v1.4.2 github.com/kr/pretty v0.1.0 // indirect - github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201022201130-7ae2595b40b7 + github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201029165056-10f2aa65f256 github.com/nats-io/nats-server/v2 v2.1.4 // indirect github.com/nats-io/nats.go v1.9.2 github.com/nats-io/nuid v1.0.1 diff --git a/v2/go.sum b/v2/go.sum index 6f4e4f0..ab2306a 100644 --- a/v2/go.sum +++ b/v2/go.sum @@ -36,6 +36,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201022201130-7ae2595b40b7 h1:xrF+cDIKVXEzs3dizCDU8SCgC9prT3X562D72iU4W/I= github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201022201130-7ae2595b40b7/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8= +github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201029165056-10f2aa65f256 h1:2pZtC3v6IBTwE70xfb/k0DPlOJ6BlXpthCUWrxCnhwo= +github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201029165056-10f2aa65f256/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= From b25a693ed61e9899962639f2955f1931e4af290e Mon Sep 17 00:00:00 2001 From: alexrudd Date: Wed, 18 Nov 2020 22:17:55 +0000 Subject: [PATCH 2/2] Use existing ErrReadonlyPartition for reaching stop position --- v2/client.go | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/v2/client.go b/v2/client.go index 244ed8a..47248d0 100644 --- a/v2/client.go +++ b/v2/client.go @@ -72,10 +72,6 @@ var ( // subscribed to has been deleted. ErrStreamDeleted = errors.New("stream has been deleted") - // ErrStreamExhausted is sent to subscribers when the stop position has been - // reached. - ErrStreamExhausted = errors.New("stream has been exhausted") - // ErrPartitionPaused is sent to subscribers when the stream partition they // are subscribed to has been paused. ErrPartitionPaused = errors.New("stream partition has been paused") @@ -83,10 +79,10 @@ var ( // ErrAckTimeout indicates a publish ack was not received in time. ErrAckTimeout = errors.New("publish ack timeout") - // ErrReadonlyPartition is sent to subscribers when the stream partition - // they are subscribed to has either been set to readonly or is already - // readonly and all messages have been read. It is also returned when - // attempting to publish to a readonly partition. + // ErrReadonlyPartition is returned when all messages have been read from a + // read only stream, or when the subscribed to stop position has been + // reached. It is also returned when attempting to publish to a readonly + // partition. ErrReadonlyPartition = errors.New("readonly partition") ) @@ -448,7 +444,8 @@ type Client interface { // start position is the end of the stream. // ErrNoSuchPartition is returned if the given stream or partition does not // exist. - // ErrStreamExhausted is sent to subscribers when the stop position has been + // ErrReadonlyPartition is return to subscribers when all messages have been + // read from a read only stream, or when the configured stop position is // reached. // Use a cancelable Context to close a subscription. Subscribe(ctx context.Context, stream string, handler Handler, opts ...SubscriptionOption) error @@ -993,6 +990,10 @@ func StopAtOffset(offset int64) SubscriptionOption { // StopAtTime sets the desired timestamp to stop consuming at in the stream. func StopAtTime(stop time.Time) SubscriptionOption { return func(o *SubscriptionOptions) error { + if stop.After(time.Now()) { + return errors.New("stop time cannot be in the future") + } + o.StopPosition = StopPosition(proto.StopPosition_STOP_TIMESTAMP) o.StopTimestamp = stop return nil @@ -1041,12 +1042,16 @@ func Partition(partition int32) SubscriptionOption { } } -// Subscribe creates an ephemeral subscription for the given stream. It begins -// receiving messages starting at the configured position and waits for new -// messages when it reaches the end of the stream. The default start position -// is the end of the stream. It returns an ErrNoSuchPartition if the given -// stream or partition does not exist. Use a cancelable Context to close a -// subscription. +// Subscribe creates an ephemeral subscription for the given stream. It +// begins receiving messages starting at the configured position and waits +// for new messages when it reaches the end of the stream. The default +// start position is the end of the stream. +// ErrNoSuchPartition is returned if the given stream or partition does not +// exist. +// ErrReadonlyPartition is return to subscribers when all messages have been +// read from a read only stream, or when the configured stop position is +// reached. +// Use a cancelable Context to close a subscription. func (c *client) Subscribe(ctx context.Context, streamName string, handler Handler, options ...SubscriptionOption) (err error) { @@ -1556,7 +1561,7 @@ func (c *client) subscribe(ctx context.Context, stream string, case codes.NotFound: err = ErrNoSuchPartition case codes.ResourceExhausted: - err = ErrStreamExhausted + err = ErrReadonlyPartition } return nil, nil, err @@ -1610,7 +1615,7 @@ LOOP: err = ErrPartitionPaused case codes.ResourceExhausted: // Indicates the end of a readonly partition has been reached. - err = ErrStreamExhausted + err = ErrReadonlyPartition } handler(messageFromProto(msg), err) }