Skip to content

Commit

Permalink
chore: add a raft library
Browse files Browse the repository at this point in the history
This is a typed version of dragonboat APIs. It also does some simplifications, namely
- No immutable files in snapshots
- No support for cancelling snapshot save / restore
- No active sessions, which can result into duplicate events on timeouts / errors.

Also, it does not support adding / removing members from the cluster yet.

However, it supports
- typed events
- typed queries
- snapshotting
  • Loading branch information
jvmakine committed Dec 17, 2024
1 parent 8803a38 commit 6ddf2e8
Show file tree
Hide file tree
Showing 6 changed files with 985 additions and 0 deletions.
130 changes: 130 additions & 0 deletions cmd/raft-tester/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package main

import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"log"
"os"
"os/signal"
"time"

"github.com/alecthomas/kong"
"github.com/block/ftl/internal/raft"
"github.com/lni/dragonboat/v4"
"golang.org/x/exp/rand"
"golang.org/x/sync/errgroup"
)

var cli struct {
RaftConfig raft.RaftConfig `embed:"" prefix:"raft-"`
}

type IntStateMachine struct {
sum int64
}

type IntEvent int64

func (i *IntEvent) UnmarshalBinary(data []byte) error { //nolint:unparam
*i = IntEvent(binary.BigEndian.Uint64(data))
return nil
}

func (i IntEvent) MarshalBinary() ([]byte, error) { //nolint:unparam
return binary.BigEndian.AppendUint64([]byte{}, uint64(i)), nil
}

var _ raft.StateMachine[int64, int64, IntEvent, *IntEvent] = &IntStateMachine{}

func (s IntStateMachine) Lookup(key int64) (int64, error) {
return s.sum, nil
}

func (s *IntStateMachine) Update(msg IntEvent) error {
s.sum += int64(msg)
return nil
}

func (s IntStateMachine) Close() error {
return nil
}

func (s IntStateMachine) Recover(reader io.Reader) error {
err := binary.Read(reader, binary.BigEndian, &s.sum)
if err != nil {
return fmt.Errorf("failed to recover from snapshot: %w", err)
}
return nil
}

func (s IntStateMachine) Save(writer io.Writer) error {
err := binary.Write(writer, binary.BigEndian, s.sum)
if err != nil {
return fmt.Errorf("failed to save snapshot: %w", err)
}
return nil
}

func main() {
kctx := kong.Parse(&cli)
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)

cluster := raft.New(&cli.RaftConfig)
shard := raft.AddShard(ctx, cluster, 1, &IntStateMachine{})

wg, ctx := errgroup.WithContext(ctx)
messages := make(chan int)

wg.Go(func() error {
defer close(messages)
// send a random number every 10 seconds
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
messages <- rand.Intn(1000)
case <-ctx.Done():
return nil
}
}
})
wg.Go(func() error {
return cluster.Start(ctx, nil)
})
wg.Go(func() error {
ticker := time.NewTicker(10 * time.Second)
for {
select {
case msg := <-messages:
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

err := shard.Propose(ctx, IntEvent(msg))
if errors.Is(err, dragonboat.ErrShardNotReady) {
log.Println("shard not ready")
} else if err != nil {
return fmt.Errorf("failed to propose event: %w", err)
}
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

state, err := shard.Query(ctx, 1)
if err != nil {
return fmt.Errorf("failed to query shard: %w", err)
}
log.Println("state: ", state)
case <-ctx.Done():
return nil
}
}
})

if err := wg.Wait(); err != nil {
kctx.FatalIfErrorf(err)
}
}
28 changes: 28 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ require (
github.com/jotaen/kong-completion v0.0.6
github.com/jpillora/backoff v1.0.0
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/lni/dragonboat/v4 v4.0.0-20240618143154-6a1623140f27
github.com/mattn/go-isatty v0.0.20
github.com/multiformats/go-base36 v0.2.0
github.com/opencontainers/go-digest v1.0.0
Expand Down Expand Up @@ -92,7 +93,11 @@ require (
al.essio.dev/pkg/shellescape v1.5.1 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/DataDog/zstd v1.5.2 // indirect
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/VictoriaMetrics/metrics v1.18.1 // indirect
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 // indirect
Expand All @@ -103,6 +108,11 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.33.2 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cockroachdb/errors v1.9.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f // indirect
github.com/cockroachdb/pebble v0.0.0-20221207173255-0f086d933dac // indirect
github.com/cockroachdb/redact v1.1.3 // indirect
github.com/containerd/stargz-snapshotter/estargz v0.14.3 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/distribution/reference v0.5.0 // indirect
Expand All @@ -116,19 +126,26 @@ require (
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/getsentry/sentry-go v0.12.0 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-msgpack/v2 v2.1.1 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-sockaddr v1.0.0 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/golang-lru v0.5.1 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/hashicorp/memberlist v0.5.1 // indirect
github.com/iancoleman/strcase v0.3.0 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
Expand All @@ -139,9 +156,16 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lni/dragonboat v2.1.7+incompatible // indirect
github.com/lni/dragonboat/v3 v3.3.8 // indirect
github.com/lni/goutils v1.4.0 // indirect
github.com/lni/vfs v0.2.1-0.20220616104132-8852fd867376 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/miekg/dns v1.1.26 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/term v0.5.0 // indirect
Expand All @@ -158,11 +182,15 @@ require (
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/riywo/loginshell v0.0.0-20200815045211-7d26008be1ab // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect
github.com/sasha-s/go-deadlock v0.3.5 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/segmentio/ksuid v1.0.4 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/sourcegraph/jsonrpc2 v0.2.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/histogram v1.2.0 // indirect
github.com/vbatts/tar-split v0.11.3 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
Expand Down
Loading

0 comments on commit 6ddf2e8

Please sign in to comment.