Skip to content

Commit

Permalink
Merge pull request #96 from alexrudd/alexrudd/stop-position
Browse files Browse the repository at this point in the history
Add support for stop position
  • Loading branch information
tylertreat authored Nov 18, 2020
2 parents f7afc15 + b25a693 commit b849ccc
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 15 deletions.
90 changes: 76 additions & 14 deletions v2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ func (s StartPosition) toProto() proto.StartPosition {
return proto.StartPosition(s)
}

// StopPosition controls where to stop consuming in a stream.
type StopPosition int32

func (s StopPosition) toProto() proto.StopPosition {
return proto.StopPosition(s)
}

const (
defaultMaxConnsPerBroker = 2
defaultKeepAliveTime = 30 * time.Second
Expand Down Expand Up @@ -72,10 +79,10 @@ var (
// ErrAckTimeout indicates a publish ack was not received in time.
ErrAckTimeout = errors.New("publish ack timeout")

// ErrReadonlyPartition is sent to subscribers when the stream partition
// they are subscribed to has either been set to readonly or is already
// readonly and all messages have been read. It is also returned when
// attempting to publish to a readonly partition.
// ErrReadonlyPartition is returned when all messages have been read from a
// read only stream, or when the subscribed to stop position has been
// reached. It is also returned when attempting to publish to a readonly
// partition.
ErrReadonlyPartition = errors.New("readonly partition")
)

Expand Down Expand Up @@ -434,9 +441,13 @@ type Client interface {
// Subscribe creates an ephemeral subscription for the given stream. It
// begins receiving messages starting at the configured position and waits
// for new messages when it reaches the end of the stream. The default
// start position is the end of the stream. It returns an
// ErrNoSuchPartition if the given stream or partition does not exist. Use
// a cancelable Context to close a subscription.
// start position is the end of the stream.
// ErrNoSuchPartition is returned if the given stream or partition does not
// exist.
// ErrReadonlyPartition is return to subscribers when all messages have been
// read from a read only stream, or when the configured stop position is
// reached.
// Use a cancelable Context to close a subscription.
Subscribe(ctx context.Context, stream string, handler Handler, opts ...SubscriptionOption) error

// Publish publishes a new message to the Liftbridge stream. The partition
Expand Down Expand Up @@ -895,6 +906,15 @@ type SubscriptionOptions struct {
// StartTimestamp sets the stream start position to the given timestamp.
StartTimestamp time.Time

// StopPosition controls where to stop consuming in the stream.
StopPosition StopPosition

// StopOffset sets the stream offset to stop consuming at.
StopOffset int64

// StopTimestamp sets the stream stop position to the given timestamp.
StopTimestamp time.Time

// Partition sets the stream partition to consume.
Partition int32

Expand Down Expand Up @@ -958,6 +978,37 @@ func StartAtEarliestReceived() SubscriptionOption {
}
}

// StopAtOffset sets the desired stop offset to stop consuming at in the stream.
func StopAtOffset(offset int64) SubscriptionOption {
return func(o *SubscriptionOptions) error {
o.StopPosition = StopPosition(proto.StopPosition_STOP_OFFSET)
o.StopOffset = offset
return nil
}
}

// StopAtTime sets the desired timestamp to stop consuming at in the stream.
func StopAtTime(stop time.Time) SubscriptionOption {
return func(o *SubscriptionOptions) error {
if stop.After(time.Now()) {
return errors.New("stop time cannot be in the future")
}

o.StopPosition = StopPosition(proto.StopPosition_STOP_TIMESTAMP)
o.StopTimestamp = stop
return nil
}
}

// StopAtLatestReceived sets the subscription stop position to the last
// message received in the stream.
func StopAtLatestReceived() SubscriptionOption {
return func(o *SubscriptionOptions) error {
o.StopPosition = StopPosition(proto.StopPosition_STOP_LATEST)
return nil
}
}

// ReadISRReplica sets read replica option. If true, the client will request
// subscription from an random ISR replica instead of subscribing explicitly
// to partition's leader. As a random ISR replica is given, it may well be the
Expand Down Expand Up @@ -991,12 +1042,16 @@ func Partition(partition int32) SubscriptionOption {
}
}

// Subscribe creates an ephemeral subscription for the given stream. It begins
// receiving messages starting at the configured position and waits for new
// messages when it reaches the end of the stream. The default start position
// is the end of the stream. It returns an ErrNoSuchPartition if the given
// stream or partition does not exist. Use a cancelable Context to close a
// subscription.
// Subscribe creates an ephemeral subscription for the given stream. It
// begins receiving messages starting at the configured position and waits
// for new messages when it reaches the end of the stream. The default
// start position is the end of the stream.
// ErrNoSuchPartition is returned if the given stream or partition does not
// exist.
// ErrReadonlyPartition is return to subscribers when all messages have been
// read from a read only stream, or when the configured stop position is
// reached.
// Use a cancelable Context to close a subscription.
func (c *client) Subscribe(ctx context.Context, streamName string, handler Handler,
options ...SubscriptionOption) (err error) {

Expand Down Expand Up @@ -1474,6 +1529,9 @@ func (c *client) subscribe(ctx context.Context, stream string,
StartPosition: opts.StartPosition.toProto(),
StartOffset: opts.StartOffset,
StartTimestamp: opts.StartTimestamp.UnixNano(),
StopPosition: opts.StopPosition.toProto(),
StopOffset: opts.StopOffset,
StopTimestamp: opts.StopTimestamp.UnixNano(),
Partition: opts.Partition,
ReadISRReplica: opts.ReadISRReplica,
Resume: opts.Resume,
Expand All @@ -1499,9 +1557,13 @@ func (c *client) subscribe(ctx context.Context, stream string,
continue
}
if err != nil {
if status.Code(err) == codes.NotFound {
switch status.Code(err) {
case codes.NotFound:
err = ErrNoSuchPartition
case codes.ResourceExhausted:
err = ErrReadonlyPartition
}

return nil, nil, err
}
return st, func() { pool.put(conn) }, nil
Expand Down
2 changes: 1 addition & 1 deletion v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.4.2
github.com/kr/pretty v0.1.0 // indirect
github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201022201130-7ae2595b40b7
github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201029165056-10f2aa65f256
github.com/nats-io/nats-server/v2 v2.1.4 // indirect
github.com/nats-io/nats.go v1.9.2
github.com/nats-io/nuid v1.0.1
Expand Down
2 changes: 2 additions & 0 deletions v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201022201130-7ae2595b40b7 h1:xrF+cDIKVXEzs3dizCDU8SCgC9prT3X562D72iU4W/I=
github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201022201130-7ae2595b40b7/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8=
github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201029165056-10f2aa65f256 h1:2pZtC3v6IBTwE70xfb/k0DPlOJ6BlXpthCUWrxCnhwo=
github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201029165056-10f2aa65f256/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8=
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
Expand Down

0 comments on commit b849ccc

Please sign in to comment.