Skip to content

Commit

Permalink
Use LRU package instead of rolling my own
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 committed Nov 7, 2023
1 parent 2a7a254 commit 8254df5
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 103 deletions.
10 changes: 0 additions & 10 deletions cmd/literiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,6 @@ func Run(cctx *cli.Context) (err error) {
}
}()

// Trim expired DBs in a separate goroutine.
go func() {
for {
if err := r.expireDBs(); err != nil {
slog.Error("failed to expire DBs", "error", err)
}
time.Sleep(time.Second * 5)
}
}()

// Serve metrics over HTTP if enabled.
if r.Config.Addr != "" {
hostport := r.Config.Addr
Expand Down
116 changes: 23 additions & 93 deletions cmd/literiver/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/benbjohnson/litestream"
mapset "github.com/deckarep/golang-set/v2"
"github.com/fsnotify/fsnotify"
lru "github.com/hashicorp/golang-lru/v2/expirable"
)

type DBEntry struct {
Expand All @@ -39,20 +40,24 @@ type Config struct {
type Replicator struct {
Config Config

DBs *lru.LRU[string, *litestream.DB]

lk sync.RWMutex
DBEntries map[string]*DBEntry
DebounceSet map[string]time.Time
SeenDBs mapset.Set[string]
}

func NewReplicator(config Config) *Replicator {
return &Replicator{
r := Replicator{
Config: config,

DBEntries: make(map[string]*DBEntry),
DebounceSet: make(map[string]time.Time),
SeenDBs: mapset.NewSet[string](),
}

r.DBs = lru.NewLRU[string, *litestream.DB](config.MaxActiveDBs, r.onEvict, config.DBTTL)

return &r
}

// Close closes all open databases.
Expand Down Expand Up @@ -140,52 +145,15 @@ func (r *Replicator) shouldDebounce(path string) bool {
return false
}

// syncNewDB starts syncing a new DB and closes the least recently used DB if necessary
// to make room in the active DB set
func (r *Replicator) syncNewDB(db *litestream.DB, path string) error {
r.lk.Lock()
defer r.lk.Unlock()

_, err := r.evictLRU()
if err != nil {
return fmt.Errorf("failed to evict LRU DB: %w", err)
}

if err := db.Open(); err != nil {
return fmt.Errorf("failed to open DB for sync (%s): %w", path, err)
}

activeDBGauge.Inc()

r.DBEntries[path] = &DBEntry{
DB: db,
ExpiresAt: time.Now().Add(r.Config.DBTTL),
}

return nil
}

// bumpTTL bumps the TTL of the given path if it exists
// and returns true if the TTL was bumped
func (r *Replicator) bumpTTL(path string) bool {
r.lk.Lock()
defer r.lk.Unlock()
dbEntry, ok := r.DBEntries[path]
if ok {
dbEntry.ExpiresAt = time.Now().Add(r.Config.DBTTL)
return true
}
return false
}

func (r *Replicator) processDBUpdate(path string) error {
if r.shouldDebounce(path) {
// We're debouncing and don't need to do anything else
return nil
}

if r.bumpTTL(path) {
// We've bumped the TTL for this active DB and don't need to do anything else
// If the DB is already active, bump the TTL and return
if db, ok := r.DBs.Get(path); ok {
r.DBs.Add(path, db)
return nil
}

Expand All @@ -210,64 +178,26 @@ func (r *Replicator) processDBUpdate(path string) error {
return fmt.Errorf("failed to init DB from config for (%s): %w", path, err)
}

err = r.syncNewDB(db, path)
if err != nil {
return fmt.Errorf("failed to sync new DB (%s): %w", path, err)
if err := db.Open(); err != nil {
return fmt.Errorf("failed to open DB for sync (%s): %w", path, err)
}

activeDBGauge.Inc()
// Add to the active set, evicting the oldest DB if necessary.
r.DBs.Add(path, db)

return nil
}

func (r *Replicator) expireDBs() error {
// onEvict closes a DB and sets a debounce timer
func (r *Replicator) onEvict(path string, db *litestream.DB) {
r.lk.Lock()
defer r.lk.Unlock()

for path, dbEntry := range r.DBEntries {
if time.Now().After(dbEntry.ExpiresAt) {
slog.Warn("closing expired DB", "path", path)
if err := r.removeDB(dbEntry, path); err != nil {
return fmt.Errorf("failed to remove expired DB (%s): %w", path, err)
}
}
}

return nil
}

// removeDB removes the given DB from the active DB set and sets a debounce timer
// ONLY CALL THIS WITH THE LOCK HELD
func (r *Replicator) removeDB(entry *DBEntry, path string) error {
delete(r.DBEntries, path)
r.DebounceSet[path] = time.Now().Add(time.Second * 8)
if err := entry.DB.Close(); err != nil {
return fmt.Errorf("failed to close DB (%s): %w", path, err)
if err := db.Close(); err != nil {
slog.Error("failed to close DB on eviction", "error", err, "path", path)
}
activeDBGauge.Dec()
return nil
}

// evictLRU evicts the least recently used DB if the active DB set is full
// and returns true if an eviction occurred
// ONLY CALL THIS WITH THE LOCK HELD
func (r *Replicator) evictLRU() (bool, error) {
// Check if we need to close an existing DB.
if len(r.DBEntries) >= r.Config.MaxActiveDBs {
// Close the least recently used DB.
var lruPath string
var lruExpiresAt time.Time
for path, dbEntry := range r.DBEntries {
if lruExpiresAt.IsZero() || dbEntry.ExpiresAt.Before(lruExpiresAt) {
lruPath = path
lruExpiresAt = dbEntry.ExpiresAt
}
}
slog.Warn("closing least recently used DB", "path", lruPath, "expires_at", lruExpiresAt)
if err := r.removeDB(r.DBEntries[lruPath], lruPath); err != nil {
return false, fmt.Errorf("failed to remove least recently used DB (%s): %w", lruPath, err)
}
return true, nil
}
return false, nil
}

func (r *Replicator) shutdown() error {
Expand All @@ -276,8 +206,8 @@ func (r *Replicator) shutdown() error {

slog.Warn("shutting down dir replication")

for _, dbEntry := range r.DBEntries {
if err := dbEntry.DB.Close(); err != nil {
for _, db := range r.DBs.Values() {
if err := db.Close(); err != nil {
slog.Error("failed to close DB", "error", err)
}
}
Expand Down

0 comments on commit 8254df5

Please sign in to comment.