Skip to content

Commit

Permalink
fix: state backend don't clean fs state
Browse files Browse the repository at this point in the history
  • Loading branch information
guorui.fan committed Dec 19, 2022
1 parent f022ade commit 16122fe
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 72 deletions.
4 changes: 2 additions & 2 deletions streaming-connector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/RuiFG/streaming/streaming-connector
go 1.19

require (
github.com/RuiFG/streaming/streaming-core v0.1.0
github.com/RuiFG/streaming/streaming-core v0.1.1
github.com/Shopify/sarama v1.35.0
)

Expand All @@ -28,7 +28,7 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/xujiajun/mmap-go v1.0.1 // indirect
github.com/xujiajun/nutsdb v0.11.0 // indirect
github.com/xujiajun/nutsdb v0.11.1 // indirect
github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion streaming-core/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/golang/protobuf v1.5.2
github.com/stretchr/testify v1.8.0
github.com/uber-go/tally/v4 v4.1.4
github.com/xujiajun/nutsdb v0.11.0
github.com/xujiajun/nutsdb v0.11.1
go.uber.org/zap v1.24.0
google.golang.org/protobuf v1.28.1
)
Expand Down
4 changes: 2 additions & 2 deletions streaming-core/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ github.com/uber-go/tally/v4 v4.1.4/go.mod h1:aXeSTDMl4tNosyf6rdU8jlgScHyjEGGtfJ/
github.com/xujiajun/gorouter v1.2.0/go.mod h1:yJrIta+bTNpBM/2UT8hLOaEAFckO+m/qmR3luMIQygM=
github.com/xujiajun/mmap-go v1.0.1 h1:7Se7ss1fLPPRW+ePgqGpCkfGIZzJV6JPq9Wq9iv/WHc=
github.com/xujiajun/mmap-go v1.0.1/go.mod h1:CNN6Sw4SL69Sui00p0zEzcZKbt+5HtEnYUsc6BKKRMg=
github.com/xujiajun/nutsdb v0.11.0 h1:uzQb3Tvib0R/5kwdt6AcEJJpHtmcunGGMsq63dhnyyM=
github.com/xujiajun/nutsdb v0.11.0/go.mod h1:sAT5Kr8+53X2r1eFMHw2VSPLSAo/PiJCZPK5QtMsw7g=
github.com/xujiajun/nutsdb v0.11.1 h1:zLyIvp3ABHMohtcqi0sbt7gGOFWfse+ZbLv2GVb6ZYw=
github.com/xujiajun/nutsdb v0.11.1/go.mod h1:sAT5Kr8+53X2r1eFMHw2VSPLSAo/PiJCZPK5QtMsw7g=
github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235 h1:w0si+uee0iAaCJO9q86T6yrhdadgcsoNuh47LrUykzg=
github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235/go.mod h1:MR4+0R6A9NS5IABnIM3384FfOq8QFVnm7WDrBOhIaMU=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
Expand Down
60 changes: 41 additions & 19 deletions streaming-core/store/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package store
import (
"fmt"
"github.com/xujiajun/nutsdb"
"go.uber.org/zap"
"sort"
"strconv"
"sync"
Expand Down Expand Up @@ -34,11 +35,15 @@ func NewMemoryBackend() Backend {
}

type fs struct {
db *nutsdb.DB
logger *zap.Logger
db *nutsdb.DB
//storage stores all checkpoint state
storage *sync.Map
//checkpoints are currently completed checkpoint id sorted slice
checkpoints []int64
checkpoints []int64
//checkpointsTotalNum
checkpointsTotalNum int
checkpointsNumMerged int
checkpointsNumRetained int
}

Expand Down Expand Up @@ -92,35 +97,50 @@ func (r *fs) Persist(checkpointId int64) error {
} else {
r.checkpoints = append(r.checkpoints, checkpointId)

//1. persist checkpoint state into db
if err := r.db.Update(func(tx *nutsdb.Tx) error {
var err error
//1. persist checkpoint state into db
m.(*sync.Map).Range(func(name, state any) bool {
if err = tx.Put(
formatCheckpointId(checkpointId), []byte(name.(string)), state.([]byte), 0); err != nil {
return false
}
return true
})
//2.clean up checkpoint status in db
var deletedCheckpointIds []int64
if len(r.checkpoints) > r.checkpointsNumRetained {
deletedCheckpointIds = r.checkpoints[:len(r.checkpoints)-r.checkpointsNumRetained]
r.checkpoints = r.checkpoints[len(r.checkpoints)-r.checkpointsNumRetained:]
}
for _, deletedCheckpointId := range deletedCheckpointIds {
if err = tx.DeleteBucket(nutsdb.DataStructureBPTree, formatCheckpointId(deletedCheckpointId)); err != nil {
return err
}
}
//3.clean up checkpoint status in memory
for _, deletedCheckpointId := range deletedCheckpointIds {
r.storage.Delete(deletedCheckpointId)
}
return nil
}); err != nil {
return fmt.Errorf("failed to persist %d checkpoint state: %w", checkpointId, err)
}
r.checkpointsTotalNum += 1
//2.clean up expired checkpoint status in db
//3.clean up checkpoint status in memory
if r.checkpointsTotalNum%r.checkpointsNumRetained == 0 {
if err := r.db.Update(func(tx *nutsdb.Tx) error {
var deletedCheckpointIds []int64
if len(r.checkpoints) > r.checkpointsNumRetained {
deletedCheckpointIds = r.checkpoints[:len(r.checkpoints)-r.checkpointsNumRetained]
r.checkpoints = r.checkpoints[len(r.checkpoints)-r.checkpointsNumRetained:]
}
for _, deletedCheckpointId := range deletedCheckpointIds {
if err := tx.DeleteBucket(nutsdb.DataStructureBPTree, formatCheckpointId(deletedCheckpointId)); err != nil {
return err
}
}
for _, deletedCheckpointId := range deletedCheckpointIds {
r.storage.Delete(deletedCheckpointId)
}
return nil
}); err != nil {
r.logger.Warn("failed to clear up expired checkpoint data.", zap.Error(err))
}
}
if r.checkpointsTotalNum%r.checkpointsNumMerged == 0 {
//4.merge fs state
if err := r.db.Merge(); err != nil {
r.logger.Warn("failed to merge fs state.", zap.Error(err))
}
}

}
return nil
}
Expand Down Expand Up @@ -151,7 +171,7 @@ func (r *fs) Close() error {
return r.db.Close()
}

func NewFSBackend(checkpointsDir string, checkpointsNumRetained int) (Backend, error) {
func NewFSBackend(logger *zap.Logger, checkpointsDir string, checkpointsNumRetained int, checkpointsNumMerged int) (Backend, error) {
opts := nutsdb.DefaultOptions
opts.SegmentSize = 1 * nutsdb.GB
opts.Dir = checkpointsDir
Expand All @@ -160,10 +180,12 @@ func NewFSBackend(checkpointsDir string, checkpointsNumRetained int) (Backend, e
return nil, err
}
store := &fs{
logger: logger,
db: db,
storage: &sync.Map{},
checkpoints: []int64{},
checkpointsNumRetained: checkpointsNumRetained,
checkpointsNumMerged: checkpointsNumMerged,
}
return store, store.init()
}
8 changes: 7 additions & 1 deletion streaming-core/store/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package store

import (
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"os"
"testing"
)
Expand All @@ -10,7 +11,7 @@ func tempFSBackend(checkpointsNumRetained int) (Backend, error) {
if mkdirTemp, err := os.MkdirTemp("", ""); err != nil {
return nil, err
} else {
return NewFSBackend(mkdirTemp, checkpointsNumRetained)
return NewFSBackend(zap.L(), mkdirTemp, checkpointsNumRetained, checkpointsNumRetained*3)
}
}

Expand All @@ -27,3 +28,8 @@ func TestFSBackendSaveAndGet(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, get, []byte{123, 123, 123})
}

func TestIteratorBucket(t *testing.T) {
backend, _ := NewFSBackend(zap.L(), "/Users/klein/GoLandProjects/athena/tmp", 1, 2)
_ = backend.Close()
}
2 changes: 1 addition & 1 deletion streaming-core/stream/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (e *Environment) Start() (err error) {
//0. check stream graph and print

//1. init all task
if e.storeBackend, err = store.NewFSBackend(e.options.CheckpointsDir, e.options.CheckpointsNumRetained); err != nil {
if e.storeBackend, err = store.NewFSBackend(e.logger.Named("stateBackend"), e.options.CheckpointsDir, e.options.CheckpointsNumRetained, e.options.CheckpointsNumRetained*3); err != nil {
return fmt.Errorf("failed to new fs store backend: %w", err)
}

Expand Down
15 changes: 1 addition & 14 deletions streaming-operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,5 @@ module github.com/RuiFG/streaming/streaming-operator

go 1.19

require github.com/RuiFG/streaming/streaming-core v0.1.0
require github.com/RuiFG/streaming/streaming-core v0.1.1

require (
github.com/bwmarrin/snowflake v0.3.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/xujiajun/mmap-go v1.0.1 // indirect
github.com/xujiajun/nutsdb v0.11.0 // indirect
github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/sys v0.0.0-20220405210540-1e041c57c461 // indirect
google.golang.org/protobuf v1.28.1 // indirect
)
Loading

0 comments on commit 16122fe

Please sign in to comment.