Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add raft example #153

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
247f961
Add a Linux implementation of file locking
mhagger Jul 19, 2023
e2141f7
Fix/suppress some linter warnings
mhagger Jul 18, 2023
7b97b3a
Define `SnapshotStorage` interface
mhagger Mar 8, 2023
84b03dc
Define `KVStore` interface
mhagger Mar 8, 2023
cdd60c2
raftNode: initialize `snapshotStorage` in `newRaftNode()`
mhagger Mar 8, 2023
5034ccf
newRaftNode(): inline part of the goroutine's work
mhagger Mar 8, 2023
c1dd4dc
startRaftNode(): replacement for `newRaftNode()`
mhagger Mar 8, 2023
2a547ec
raftNode.snapdir: remove member
mhagger Mar 8, 2023
0fdf48b
raftNode.id: convert type to `uint64`
mhagger Mar 8, 2023
17981fa
startRaftNode(): take the `SnapshotStorage` as an argument
mhagger Mar 8, 2023
34654f2
kvstore.loadAndApplySnapshot(), applyCommits(): extract methods
mhagger Mar 8, 2023
5be8540
kvstore: separate initialization from startup
mhagger Mar 8, 2023
302d8a5
kvstore.loadSnapshot(): inline method
mhagger Mar 8, 2023
78c9106
FSM: new interface, representing a finite state machine
mhagger Mar 8, 2023
9163a7d
Move more functionality from `kvstore` to `kvfsm`
mhagger Mar 8, 2023
2ea3a54
TestProposeOnCommit(): add some clarifying comments
mhagger Mar 8, 2023
a328306
raftexample_test: introduce `peer` type
mhagger Mar 8, 2023
7b9b2cc
raftexample_test: give each `peer` its own `FSM`
mhagger Mar 9, 2023
1de94b9
kvfsm.applyCommits(): return an error
mhagger Mar 9, 2023
b44de5b
FSM.ProcessCommits(): return an error rather than calling `log.Fatal()`
mhagger Mar 9, 2023
58d3994
FSM.ApplyCommits(): new method
mhagger Mar 9, 2023
4feda00
newKVStore(): don't call `LoadAndApplySnapshot()`
mhagger Mar 9, 2023
ba8d8ca
Make `ProcessCommits()` a method of `raftNode`
mhagger Mar 9, 2023
bb3c651
newRaftNode(): call `LoadAndApplySnapshot()` here
mhagger Mar 9, 2023
73ae94b
LoadAndApplySnapshot(): move method to `raftNode` and make it private
mhagger Mar 9, 2023
c0cac21
raftNode: add a new and better way to tell when the node is done
mhagger Mar 9, 2023
b19e64f
serveHTTPKVAPI(): monitor the raft node using its "done" channel
mhagger Mar 9, 2023
b4c1a55
cluster.Close(): read any error from the node directly
mhagger Mar 9, 2023
d420ab3
TestProposeOnCommit(): read any error from the node directly
mhagger Mar 9, 2023
d181e4a
cluster.Cleanup(): new method, extracted from `Close()`
mhagger Mar 11, 2023
0ae9c56
TestProposeOnCommit(): change test to use `ProcessCommits()`
mhagger Mar 11, 2023
848ee9d
newRaftNode(): don't return `commitC` and `errorC`
mhagger Mar 11, 2023
cd6fbb1
add raft example
Elbehery Feb 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move more functionality from kvstore to kvfsm
Signed-off-by: Michael Haggerty <[email protected]>
  • Loading branch information
mhagger authored and Elbehery committed Feb 21, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 9163a7d12dcbce54493ab7ea9c8ff75c056811a6
85 changes: 48 additions & 37 deletions raftexample/kvstore.go
Original file line number Diff line number Diff line change
@@ -51,8 +51,11 @@ func newKVStore(snapshotStorage SnapshotStorage, proposeC chan<- string) (*kvsto
kvStore: make(map[string]string),
snapshotStorage: snapshotStorage,
}
s.loadAndApplySnapshot()
return s, kvfsm{kvs: s}
fsm := kvfsm{
kvs: s,
}
fsm.LoadAndApplySnapshot()
return s, fsm
}

func (s *kvstore) Lookup(key string) (string, bool) {
@@ -70,10 +73,39 @@ func (s *kvstore) Propose(k string, v string) {
s.proposeC <- buf.String()
}

// loadAndApplySnapshot load the most recent snapshot from the
// Set sets a single value. It should only be called by `kvfsm`.
func (s *kvstore) set(k, v string) {
s.mu.Lock()
defer s.mu.Unlock()
s.kvStore[k] = v
}

func (s *kvstore) restoreFromSnapshot(store map[string]string) {
s.mu.Lock()
defer s.mu.Unlock()
s.kvStore = store
}

// kvfsm implements the `FSM` interface for the underlying `*kvstore`.
type kvfsm struct {
kvs *kvstore
}

// RestoreSnapshot restores the current state of the KV store to the
// value encoded in `snapshot`.
func (fsm kvfsm) RestoreSnapshot(snapshot []byte) error {
var store map[string]string
if err := json.Unmarshal(snapshot, &store); err != nil {
return err
}
fsm.kvs.restoreFromSnapshot(store)
return nil
}

// LoadAndApplySnapshot loads the most recent snapshot from the
// snapshot storage (if any) and applies it to the current state.
func (s *kvstore) loadAndApplySnapshot() {
snapshot, err := s.snapshotStorage.Load()
func (fsm kvfsm) LoadAndApplySnapshot() {
snapshot, err := fsm.kvs.snapshotStorage.Load()
if err != nil {
if err == snap.ErrNoSnapshot {
// No snapshots available; do nothing.
@@ -83,64 +115,43 @@ func (s *kvstore) loadAndApplySnapshot() {
}

log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
if err := fsm.RestoreSnapshot(snapshot.Data); err != nil {
log.Panic(err)
}
}

func (fsm kvfsm) TakeSnapshot() ([]byte, error) {
fsm.kvs.mu.RLock()
defer fsm.kvs.mu.RUnlock()
return json.Marshal(fsm.kvs.kvStore)
}

// applyCommits decodes and applies each of the commits in `commit` to
// the current state, then signals that it is done by closing
// `commit.applyDoneC`.
func (s *kvstore) applyCommits(commit *commit) {
func (fsm kvfsm) applyCommits(commit *commit) {
for _, data := range commit.data {
var dataKv kv
dec := gob.NewDecoder(bytes.NewBufferString(data))
if err := dec.Decode(&dataKv); err != nil {
log.Fatalf("raftexample: could not decode message (%v)", err)
}
s.mu.Lock()
s.kvStore[dataKv.Key] = dataKv.Val
s.mu.Unlock()
fsm.kvs.set(dataKv.Key, dataKv.Val)
}
close(commit.applyDoneC)
}

func (s *kvstore) getSnapshot() ([]byte, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return json.Marshal(s.kvStore)
}

func (s *kvstore) recoverFromSnapshot(snapshot []byte) error {
var store map[string]string
if err := json.Unmarshal(snapshot, &store); err != nil {
return err
}
s.mu.Lock()
defer s.mu.Unlock()
s.kvStore = store
return nil
}

type kvfsm struct {
kvs *kvstore
}

func (fsm kvfsm) TakeSnapshot() ([]byte, error) {
return fsm.kvs.getSnapshot()
}

// ProcessCommits() reads commits from `commitC` and applies them into
// the kvstore until that channel is closed.
func (fsm kvfsm) ProcessCommits(commitC <-chan *commit, errorC <-chan error) {
for commit := range commitC {
if commit == nil {
// This is a request that we load a snapshot.
fsm.kvs.loadAndApplySnapshot()
fsm.LoadAndApplySnapshot()
continue
}

fsm.kvs.applyCommits(commit)
fsm.applyCommits(commit)
}
if err, ok := <-errorC; ok {
log.Fatal(err)
50 changes: 50 additions & 0 deletions raftexample/kvstore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"reflect"
"testing"
)

func Test_kvstore_snapshot(t *testing.T) {
tm := map[string]string{"foo": "bar"}
s := &kvstore{kvStore: tm}
fsm := kvfsm{
kvs: s,
}

v, _ := s.Lookup("foo")
if v != "bar" {
t.Fatalf("foo has unexpected value, got %s", v)
}

data, err := fsm.TakeSnapshot()
if err != nil {
t.Fatal(err)
}
s.kvStore = nil

if err := fsm.RestoreSnapshot(data); err != nil {
t.Fatal(err)
}
v, _ = s.Lookup("foo")
if v != "bar" {
t.Fatalf("foo has unexpected value, got %s", v)
}
if !reflect.DeepEqual(s.kvStore, tm) {
t.Fatalf("store expected %+v, got %+v", tm, s.kvStore)
}
}
5 changes: 5 additions & 0 deletions raftexample/raft.go
Original file line number Diff line number Diff line change
@@ -100,6 +100,11 @@ type FSM interface {
// bytes that can be saved or loaded by a `SnapshotStorage`.
TakeSnapshot() ([]byte, error)

// RestoreSnapshot restores the finite state machine to the state
// represented by `snapshot` (which, in turn, was returned by
// `TakeSnapshot`).
RestoreSnapshot(snapshot []byte) error

// ProcessCommits reads committed updates (and requests to restore
// snapshots) from `commitC` and applies them to the finite state
// machine.
5 changes: 5 additions & 0 deletions raftexample/raftexample_test.go
Original file line number Diff line number Diff line change
@@ -45,6 +45,11 @@ func (sw snapshotWatcher) TakeSnapshot() ([]byte, error) {
return nil, nil
}

func (sw snapshotWatcher) RestoreSnapshot(snapshot []byte) error {
_ = snapshot
panic("not implemented")
}

type cluster struct {
peers []string
commitC []<-chan *commit