Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
lhy1024 committed Nov 18, 2024
1 parent bc7428b commit df4ac32
Showing 1 changed file with 29 additions and 17 deletions.
46 changes: 29 additions & 17 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
@@ -73,14 +73,21 @@ func NewRegionStorageWithLevelDBBackend(

// TODO: support other KV storage backends like BadgerDB in the future.

type regionSource int

const (
unloaded regionSource = iota
fromEtcd
fromLeveldb
)

type coreStorage struct {
Storage
regionStorage endpoint.RegionStorage

useRegionStorage int32
regionLoadedFromDefault bool
regionLoadedFromStorage bool
mu syncutil.RWMutex
useRegionStorageFlag int32
regionLoaded regionSource
mu syncutil.RWMutex
}

// NewCoreStorage creates a new core storage with the given default and region storage.
@@ -91,6 +98,7 @@ func NewCoreStorage(defaultStorage Storage, regionStorage endpoint.RegionStorage
return &coreStorage{
Storage: defaultStorage,
regionStorage: regionStorage,
regionLoaded: unloaded,
}
}

@@ -117,12 +125,12 @@ func TrySwitchRegionStorage(s Storage, useLocalRegionStorage bool) endpoint.Regi
if useLocalRegionStorage {
// Switch the region storage to regionStorage, all region info will be read/saved by the internal
// regionStorage, and in most cases it's LevelDB-backend.
atomic.StoreInt32(&ps.useRegionStorage, 1)
atomic.StoreInt32(&ps.useRegionStorageFlag, 1)
return ps.regionStorage
}
// Switch the region storage to defaultStorage, all region info will be read/saved by the internal
// defaultStorage, and in most cases it's etcd-backend.
atomic.StoreInt32(&ps.useRegionStorage, 0)
atomic.StoreInt32(&ps.useRegionStorageFlag, 0)
return ps.Storage
}

@@ -137,50 +145,50 @@ func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.Regi
ps.mu.Lock()
defer ps.mu.Unlock()

if atomic.LoadInt32(&ps.useRegionStorage) == 0 {
if !ps.useRegionStorage() {
err := ps.Storage.LoadRegions(ctx, f)
if err == nil {
ps.regionLoadedFromDefault = true
ps.regionLoaded = fromEtcd
}

Check warning on line 152 in pkg/storage/storage.go

Codecov / codecov/patch

pkg/storage/storage.go#L151-L152

Added lines #L151 - L152 were not covered by tests
return err
}

if !ps.regionLoadedFromStorage {
if ps.regionLoaded == unloaded {
if err := ps.regionStorage.LoadRegions(ctx, f); err != nil {
return err
}
ps.regionLoadedFromStorage = true
ps.regionLoaded = fromLeveldb
}
return nil
}

// LoadRegion loads one region from storage.
func (ps *coreStorage) LoadRegion(regionID uint64, region *metapb.Region) (ok bool, err error) {
if atomic.LoadInt32(&ps.useRegionStorage) > 0 {
if ps.useRegionStorage() {

Check warning on line 167 in pkg/storage/storage.go

Codecov / codecov/patch

pkg/storage/storage.go#L167

Added line #L167 was not covered by tests
return ps.regionStorage.LoadRegion(regionID, region)
}
return ps.Storage.LoadRegion(regionID, region)
}

// LoadRegions loads all regions from storage to RegionsInfo.
func (ps *coreStorage) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error {
if atomic.LoadInt32(&ps.useRegionStorage) > 0 {
if ps.useRegionStorage() {

Check warning on line 175 in pkg/storage/storage.go

Codecov / codecov/patch

pkg/storage/storage.go#L175

Added line #L175 was not covered by tests
return ps.regionStorage.LoadRegions(ctx, f)
}
return ps.Storage.LoadRegions(ctx, f)
}

// SaveRegion saves one region to storage.
func (ps *coreStorage) SaveRegion(region *metapb.Region) error {
if atomic.LoadInt32(&ps.useRegionStorage) > 0 {
if ps.useRegionStorage() {
return ps.regionStorage.SaveRegion(region)
}
return ps.Storage.SaveRegion(region)
}

// DeleteRegion deletes one region from storage.
func (ps *coreStorage) DeleteRegion(region *metapb.Region) error {
if atomic.LoadInt32(&ps.useRegionStorage) > 0 {
if ps.useRegionStorage() {
return ps.regionStorage.DeleteRegion(region)
}
return ps.Storage.DeleteRegion(region)
@@ -204,13 +212,17 @@ func (ps *coreStorage) Close() error {
return nil
}

func (ps *coreStorage) useRegionStorage() bool {
return atomic.LoadInt32(&ps.useRegionStorageFlag) > 0
}

// AreRegionsLoaded returns whether the regions are loaded.
func AreRegionsLoaded(s Storage) bool {
ps := s.(*coreStorage)
ps.mu.RLock()
defer ps.mu.RUnlock()
if atomic.LoadInt32(&ps.useRegionStorage) == 0 {
return ps.regionLoadedFromDefault
if ps.useRegionStorage() {
return ps.regionLoaded == fromLeveldb
}
return ps.regionLoadedFromStorage
return ps.regionLoaded == fromEtcd

Check warning on line 227 in pkg/storage/storage.go

Codecov / codecov/patch

pkg/storage/storage.go#L227

Added line #L227 was not covered by tests
}

0 comments on commit df4ac32

Please sign in to comment.