Skip to content

Commit

Permalink
feat(postgres): add support for eventstore.SequenceNumberGetter (#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
ar3s3ru authored Sep 3, 2021
1 parent 6da801f commit 5c63eab
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 6 deletions.
2 changes: 1 addition & 1 deletion eventstore/postgres/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/docker/docker v20.10.2+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/get-eventually/go-eventually v0.0.0-20210830220720-a038785a5316
github.com/get-eventually/go-eventually v0.0.0-20210903204041-6da801fa7f4e
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-migrate/migrate v3.5.4+incompatible
github.com/gorilla/mux v1.8.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions eventstore/postgres/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/get-eventually/go-eventually v0.0.0-20210830220720-a038785a5316 h1:mdvJ8lYRc0BXPnkQczyF7EeV9xNnKTMh459lnetvDus=
github.com/get-eventually/go-eventually v0.0.0-20210830220720-a038785a5316/go.mod h1:yOBDlloZ9BP1Hbmdp0lvkCC49esxLizZuytfjakJ5J4=
github.com/get-eventually/go-eventually v0.0.0-20210903204041-6da801fa7f4e h1:9+YQjGoKqBYE0I2GPW3HVNb6k6T6dQHkvEcLORoIOgg=
github.com/get-eventually/go-eventually v0.0.0-20210903204041-6da801fa7f4e/go.mod h1:yOBDlloZ9BP1Hbmdp0lvkCC49esxLizZuytfjakJ5J4=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-migrate/migrate v3.5.4+incompatible h1:R7OzwvCJTCgwapPCiX6DyBiu2czIUMDCB118gFTKTUA=
Expand Down
25 changes: 24 additions & 1 deletion eventstore/postgres/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (
_ "github.com/golang-migrate/migrate/database/postgres" // postgres driver for migrate
)

var _ eventstore.Store = &EventStore{}
var (
_ eventstore.Store = &EventStore{}
_ eventstore.SequenceNumberGetter = &EventStore{}
)

// EventStore is an eventstore.Store implementation which uses
// PostgreSQL as backend datastore.
Expand Down Expand Up @@ -181,3 +184,23 @@ func (st *EventStore) appendEvent(

return newVersion, nil
}

// LatestSequenceNumber returns the latest Sequence Number used by a Domain Event
// committed to the Event Store.
func (st EventStore) LatestSequenceNumber(ctx context.Context) (int64, error) {
row := st.db.QueryRowContext(
ctx,
"SELECT max(global_sequence_number) FROM events",
)

if err := row.Err(); err != nil {
return 0, fmt.Errorf("postgres.EventStore: failed to get latest sequence number: %w", err)
}

var sequenceNumber int64
if err := row.Scan(&sequenceNumber); err != nil {
return 0, fmt.Errorf("postgres.EventStore: failed to scan latest sequence number from sql row: %w", err)
}

return sequenceNumber, nil
}
51 changes: 47 additions & 4 deletions eventstore/postgres/store_test.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
package postgres_test

import (
"context"
"database/sql"
"math"
"os"
"testing"

"github.com/get-eventually/go-eventually/eventstore"
"github.com/get-eventually/go-eventually/eventstore/postgres"
"github.com/get-eventually/go-eventually/internal"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/get-eventually/go-eventually"
"github.com/get-eventually/go-eventually/eventstore"
"github.com/get-eventually/go-eventually/eventstore/postgres"
"github.com/get-eventually/go-eventually/eventstore/stream"
"github.com/get-eventually/go-eventually/internal"
)

var firstInstance = stream.ID{
Type: "first-type",
Name: "my-instance-for-latest-number",
}

const defaultPostgresURL = "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable"

func obtainEventStore(t *testing.T) (*sql.DB, postgres.EventStore) {
Expand Down Expand Up @@ -67,3 +76,37 @@ func TestStoreSuite(t *testing.T) {
return store
}))
}

func TestLatestSequenceNumber(t *testing.T) {
db, store := obtainEventStore(t)
defer func() { assert.NoError(t, db.Close()) }()

require.NoError(t, store.Register(internal.IntPayload(0)))

ctx := context.Background()

for i := 1; i < 10; i++ {
_, err := store.Append(
ctx,
firstInstance,
eventstore.VersionCheck(int64(i-1)),
eventually.Event{Payload: internal.IntPayload(i)},
)

require.NoError(t, err)
}

ch := make(chan eventstore.Event, 1)
go func() {
require.NoError(t, store.Stream(ctx, ch, stream.All{}, eventstore.SelectFromBeginning))
}()

var latestSequenceNumber int64
for event := range ch {
latestSequenceNumber = int64(math.Max(float64(latestSequenceNumber), float64(event.SequenceNumber)))
}

actual, err := store.LatestSequenceNumber(ctx)
assert.NoError(t, err)
assert.Equal(t, latestSequenceNumber, actual)
}

0 comments on commit 5c63eab

Please sign in to comment.