Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add raft example #153

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
247f961
Add a Linux implementation of file locking
mhagger Jul 19, 2023
e2141f7
Fix/suppress some linter warnings
mhagger Jul 18, 2023
7b97b3a
Define `SnapshotStorage` interface
mhagger Mar 8, 2023
84b03dc
Define `KVStore` interface
mhagger Mar 8, 2023
cdd60c2
raftNode: initialize `snapshotStorage` in `newRaftNode()`
mhagger Mar 8, 2023
5034ccf
newRaftNode(): inline part of the goroutine's work
mhagger Mar 8, 2023
c1dd4dc
startRaftNode(): replacement for `newRaftNode()`
mhagger Mar 8, 2023
2a547ec
raftNode.snapdir: remove member
mhagger Mar 8, 2023
0fdf48b
raftNode.id: convert type to `uint64`
mhagger Mar 8, 2023
17981fa
startRaftNode(): take the `SnapshotStorage` as an argument
mhagger Mar 8, 2023
34654f2
kvstore.loadAndApplySnapshot(), applyCommits(): extract methods
mhagger Mar 8, 2023
5be8540
kvstore: separate initialization from startup
mhagger Mar 8, 2023
302d8a5
kvstore.loadSnapshot(): inline method
mhagger Mar 8, 2023
78c9106
FSM: new interface, representing a finite state machine
mhagger Mar 8, 2023
9163a7d
Move more functionality from `kvstore` to `kvfsm`
mhagger Mar 8, 2023
2ea3a54
TestProposeOnCommit(): add some clarifying comments
mhagger Mar 8, 2023
a328306
raftexample_test: introduce `peer` type
mhagger Mar 8, 2023
7b9b2cc
raftexample_test: give each `peer` its own `FSM`
mhagger Mar 9, 2023
1de94b9
kvfsm.applyCommits(): return an error
mhagger Mar 9, 2023
b44de5b
FSM.ProcessCommits(): return an error rather than calling `log.Fatal()`
mhagger Mar 9, 2023
58d3994
FSM.ApplyCommits(): new method
mhagger Mar 9, 2023
4feda00
newKVStore(): don't call `LoadAndApplySnapshot()`
mhagger Mar 9, 2023
ba8d8ca
Make `ProcessCommits()` a method of `raftNode`
mhagger Mar 9, 2023
bb3c651
newRaftNode(): call `LoadAndApplySnapshot()` here
mhagger Mar 9, 2023
73ae94b
LoadAndApplySnapshot(): move method to `raftNode` and make it private
mhagger Mar 9, 2023
c0cac21
raftNode: add a new and better way to tell when the node is done
mhagger Mar 9, 2023
b19e64f
serveHTTPKVAPI(): monitor the raft node using its "done" channel
mhagger Mar 9, 2023
b4c1a55
cluster.Close(): read any error from the node directly
mhagger Mar 9, 2023
d420ab3
TestProposeOnCommit(): read any error from the node directly
mhagger Mar 9, 2023
d181e4a
cluster.Cleanup(): new method, extracted from `Close()`
mhagger Mar 11, 2023
0ae9c56
TestProposeOnCommit(): change test to use `ProcessCommits()`
mhagger Mar 11, 2023
848ee9d
newRaftNode(): don't return `commitC` and `errorC`
mhagger Mar 11, 2023
cd6fbb1
add raft example
Elbehery Feb 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
raftNode: add a new and better way to tell when the node is done
The old method, monitoring the `errorC` channel, is not great because
the error only pops out of that channel once. Which of the pieces of
code that are waiting on that channel reads the error? Nobody knows!

Instead, provide a `Done()` method and an `Err()` method that
interested parties can use to determine when the node finishes and
what error it returned.

The callers haven't been changed yet.

Signed-off-by: Michael Haggerty <[email protected]>
  • Loading branch information
mhagger authored and Elbehery committed Feb 21, 2024
commit c0cac211fcf6e6150cee8257116fb2af7abe3c82
2 changes: 1 addition & 1 deletion raftexample/main.go
Original file line number Diff line number Diff line change
@@ -55,7 +55,7 @@ func main() {
)

go func() {
if err := rc.ProcessCommits(commitC, errorC); err != nil {
if err := rc.ProcessCommits(commitC); err != nil {
log.Fatalf("raftexample: %v", err)
}
}()
32 changes: 28 additions & 4 deletions raftexample/raft.go
Original file line number Diff line number Diff line change
@@ -49,6 +49,11 @@ type raftNode struct {
commitC chan<- *commit // entries committed to log (k,v)
errorC chan<- error // errors from raft session

// When serveChannels is done, `err` is set to any error and then
// `done` is closed.
err error
done chan struct{}

id uint64 // client ID for raft session
peers []string // raft peer URLs
join bool // node is joining an existing cluster
@@ -130,6 +135,7 @@ func startRaftNode(
confChangeC: confChangeC,
commitC: commitC,
errorC: errorC,
done: make(chan struct{}),
id: id,
peers: peers,
join: join,
@@ -176,7 +182,7 @@ func (rc *raftNode) loadAndApplySnapshot() {

// ProcessCommits reads commits from `commitC` and applies them into
// the kvstore until that channel is closed.
func (rc *raftNode) ProcessCommits(commitC <-chan *commit, errorC <-chan error) error {
func (rc *raftNode) ProcessCommits(commitC <-chan *commit) error {
for commit := range commitC {
if commit == nil {
// This is a request that we load a snapshot.
@@ -188,10 +194,25 @@ func (rc *raftNode) ProcessCommits(commitC <-chan *commit, errorC <-chan error)
return err
}
}
if err, ok := <-errorC; ok {
return err
<-rc.done
return rc.err
}

// Done returns a channel that is closed when `rc` is done processing
// requests.
func (rc *raftNode) Done() <-chan struct{} {
return rc.done
}

// Err returns any error encountered while processing requests, or nil
// if request processing is not yet done.
func (rc *raftNode) Err() error {
select {
case <-rc.done:
return rc.err
default:
return nil
}
return nil
}

func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error {
@@ -345,8 +366,10 @@ func (rc *raftNode) replayWAL() *wal.WAL {
func (rc *raftNode) writeError(err error) {
rc.stopHTTP()
close(rc.commitC)
rc.err = err
rc.errorC <- err
close(rc.errorC)
close(rc.done)
rc.node.Stop()
}

@@ -399,6 +422,7 @@ func (rc *raftNode) stop() {
rc.stopHTTP()
close(rc.commitC)
close(rc.errorC)
close(rc.done)
rc.node.Stop()
}

4 changes: 2 additions & 2 deletions raftexample/raftexample_test.go
Original file line number Diff line number Diff line change
@@ -222,14 +222,14 @@ func TestPutAndGetKeyValue(t *testing.T) {

kvs, fsm := newKVStore(proposeC)

node, commitC, errorC := startRaftNode(
node, commitC, _ := startRaftNode(
id, clusters, false,
fsm, snapshotStorage,
proposeC, confChangeC,
)

go func() {
if err := node.ProcessCommits(commitC, errorC); err != nil {
if err := node.ProcessCommits(commitC); err != nil {
log.Fatalf("raftexample: %v", err)
}
}()