diff --git a/v2/client.go b/v2/client.go index 0040c28..47248d0 100644 --- a/v2/client.go +++ b/v2/client.go @@ -39,6 +39,13 @@ func (s StartPosition) toProto() proto.StartPosition { return proto.StartPosition(s) } +// StopPosition controls where to stop consuming in a stream. +type StopPosition int32 + +func (s StopPosition) toProto() proto.StopPosition { + return proto.StopPosition(s) +} + const ( defaultMaxConnsPerBroker = 2 defaultKeepAliveTime = 30 * time.Second @@ -72,10 +79,10 @@ var ( // ErrAckTimeout indicates a publish ack was not received in time. ErrAckTimeout = errors.New("publish ack timeout") - // ErrReadonlyPartition is sent to subscribers when the stream partition - // they are subscribed to has either been set to readonly or is already - // readonly and all messages have been read. It is also returned when - // attempting to publish to a readonly partition. + // ErrReadonlyPartition is returned when all messages have been read from a + // read only stream, or when the subscribed to stop position has been + // reached. It is also returned when attempting to publish to a readonly + // partition. ErrReadonlyPartition = errors.New("readonly partition") ) @@ -434,9 +441,13 @@ type Client interface { // Subscribe creates an ephemeral subscription for the given stream. It // begins receiving messages starting at the configured position and waits // for new messages when it reaches the end of the stream. The default - // start position is the end of the stream. It returns an - // ErrNoSuchPartition if the given stream or partition does not exist. Use - // a cancelable Context to close a subscription. + // start position is the end of the stream. + // ErrNoSuchPartition is returned if the given stream or partition does not + // exist. + // ErrReadonlyPartition is return to subscribers when all messages have been + // read from a read only stream, or when the configured stop position is + // reached. + // Use a cancelable Context to close a subscription. Subscribe(ctx context.Context, stream string, handler Handler, opts ...SubscriptionOption) error // Publish publishes a new message to the Liftbridge stream. The partition @@ -895,6 +906,15 @@ type SubscriptionOptions struct { // StartTimestamp sets the stream start position to the given timestamp. StartTimestamp time.Time + // StopPosition controls where to stop consuming in the stream. + StopPosition StopPosition + + // StopOffset sets the stream offset to stop consuming at. + StopOffset int64 + + // StopTimestamp sets the stream stop position to the given timestamp. + StopTimestamp time.Time + // Partition sets the stream partition to consume. Partition int32 @@ -958,6 +978,37 @@ func StartAtEarliestReceived() SubscriptionOption { } } +// StopAtOffset sets the desired stop offset to stop consuming at in the stream. +func StopAtOffset(offset int64) SubscriptionOption { + return func(o *SubscriptionOptions) error { + o.StopPosition = StopPosition(proto.StopPosition_STOP_OFFSET) + o.StopOffset = offset + return nil + } +} + +// StopAtTime sets the desired timestamp to stop consuming at in the stream. +func StopAtTime(stop time.Time) SubscriptionOption { + return func(o *SubscriptionOptions) error { + if stop.After(time.Now()) { + return errors.New("stop time cannot be in the future") + } + + o.StopPosition = StopPosition(proto.StopPosition_STOP_TIMESTAMP) + o.StopTimestamp = stop + return nil + } +} + +// StopAtLatestReceived sets the subscription stop position to the last +// message received in the stream. +func StopAtLatestReceived() SubscriptionOption { + return func(o *SubscriptionOptions) error { + o.StopPosition = StopPosition(proto.StopPosition_STOP_LATEST) + return nil + } +} + // ReadISRReplica sets read replica option. If true, the client will request // subscription from an random ISR replica instead of subscribing explicitly // to partition's leader. As a random ISR replica is given, it may well be the @@ -991,12 +1042,16 @@ func Partition(partition int32) SubscriptionOption { } } -// Subscribe creates an ephemeral subscription for the given stream. It begins -// receiving messages starting at the configured position and waits for new -// messages when it reaches the end of the stream. The default start position -// is the end of the stream. It returns an ErrNoSuchPartition if the given -// stream or partition does not exist. Use a cancelable Context to close a -// subscription. +// Subscribe creates an ephemeral subscription for the given stream. It +// begins receiving messages starting at the configured position and waits +// for new messages when it reaches the end of the stream. The default +// start position is the end of the stream. +// ErrNoSuchPartition is returned if the given stream or partition does not +// exist. +// ErrReadonlyPartition is return to subscribers when all messages have been +// read from a read only stream, or when the configured stop position is +// reached. +// Use a cancelable Context to close a subscription. func (c *client) Subscribe(ctx context.Context, streamName string, handler Handler, options ...SubscriptionOption) (err error) { @@ -1474,6 +1529,9 @@ func (c *client) subscribe(ctx context.Context, stream string, StartPosition: opts.StartPosition.toProto(), StartOffset: opts.StartOffset, StartTimestamp: opts.StartTimestamp.UnixNano(), + StopPosition: opts.StopPosition.toProto(), + StopOffset: opts.StopOffset, + StopTimestamp: opts.StopTimestamp.UnixNano(), Partition: opts.Partition, ReadISRReplica: opts.ReadISRReplica, Resume: opts.Resume, @@ -1499,9 +1557,13 @@ func (c *client) subscribe(ctx context.Context, stream string, continue } if err != nil { - if status.Code(err) == codes.NotFound { + switch status.Code(err) { + case codes.NotFound: err = ErrNoSuchPartition + case codes.ResourceExhausted: + err = ErrReadonlyPartition } + return nil, nil, err } return st, func() { pool.put(conn) }, nil diff --git a/v2/go.mod b/v2/go.mod index 4353301..fd9b00f 100644 --- a/v2/go.mod +++ b/v2/go.mod @@ -6,7 +6,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/protobuf v1.4.2 github.com/kr/pretty v0.1.0 // indirect - github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201022201130-7ae2595b40b7 + github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201029165056-10f2aa65f256 github.com/nats-io/nats-server/v2 v2.1.4 // indirect github.com/nats-io/nats.go v1.9.2 github.com/nats-io/nuid v1.0.1 diff --git a/v2/go.sum b/v2/go.sum index 6f4e4f0..ab2306a 100644 --- a/v2/go.sum +++ b/v2/go.sum @@ -36,6 +36,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201022201130-7ae2595b40b7 h1:xrF+cDIKVXEzs3dizCDU8SCgC9prT3X562D72iU4W/I= github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201022201130-7ae2595b40b7/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8= +github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201029165056-10f2aa65f256 h1:2pZtC3v6IBTwE70xfb/k0DPlOJ6BlXpthCUWrxCnhwo= +github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201029165056-10f2aa65f256/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=