diff --git a/cmd/literiver/main.go b/cmd/literiver/main.go index c497b1c92..47961e2e8 100644 --- a/cmd/literiver/main.go +++ b/cmd/literiver/main.go @@ -132,16 +132,6 @@ func Run(cctx *cli.Context) (err error) { } }() - // Trim expired DBs in a separate goroutine. - go func() { - for { - if err := r.expireDBs(); err != nil { - slog.Error("failed to expire DBs", "error", err) - } - time.Sleep(time.Second * 5) - } - }() - // Serve metrics over HTTP if enabled. if r.Config.Addr != "" { hostport := r.Config.Addr diff --git a/cmd/literiver/replicator.go b/cmd/literiver/replicator.go index ce4b6f60e..902a0dc3a 100644 --- a/cmd/literiver/replicator.go +++ b/cmd/literiver/replicator.go @@ -13,6 +13,7 @@ import ( "github.com/benbjohnson/litestream" mapset "github.com/deckarep/golang-set/v2" "github.com/fsnotify/fsnotify" + lru "github.com/hashicorp/golang-lru/v2/expirable" ) type DBEntry struct { @@ -39,20 +40,24 @@ type Config struct { type Replicator struct { Config Config + DBs *lru.LRU[string, *litestream.DB] + lk sync.RWMutex - DBEntries map[string]*DBEntry DebounceSet map[string]time.Time SeenDBs mapset.Set[string] } func NewReplicator(config Config) *Replicator { - return &Replicator{ + r := Replicator{ Config: config, - DBEntries: make(map[string]*DBEntry), DebounceSet: make(map[string]time.Time), SeenDBs: mapset.NewSet[string](), } + + r.DBs = lru.NewLRU[string, *litestream.DB](config.MaxActiveDBs, r.onEvict, config.DBTTL) + + return &r } // Close closes all open databases. @@ -140,52 +145,15 @@ func (r *Replicator) shouldDebounce(path string) bool { return false } -// syncNewDB starts syncing a new DB and closes the least recently used DB if necessary -// to make room in the active DB set -func (r *Replicator) syncNewDB(db *litestream.DB, path string) error { - r.lk.Lock() - defer r.lk.Unlock() - - _, err := r.evictLRU() - if err != nil { - return fmt.Errorf("failed to evict LRU DB: %w", err) - } - - if err := db.Open(); err != nil { - return fmt.Errorf("failed to open DB for sync (%s): %w", path, err) - } - - activeDBGauge.Inc() - - r.DBEntries[path] = &DBEntry{ - DB: db, - ExpiresAt: time.Now().Add(r.Config.DBTTL), - } - - return nil -} - -// bumpTTL bumps the TTL of the given path if it exists -// and returns true if the TTL was bumped -func (r *Replicator) bumpTTL(path string) bool { - r.lk.Lock() - defer r.lk.Unlock() - dbEntry, ok := r.DBEntries[path] - if ok { - dbEntry.ExpiresAt = time.Now().Add(r.Config.DBTTL) - return true - } - return false -} - func (r *Replicator) processDBUpdate(path string) error { if r.shouldDebounce(path) { // We're debouncing and don't need to do anything else return nil } - if r.bumpTTL(path) { - // We've bumped the TTL for this active DB and don't need to do anything else + // If the DB is already active, bump the TTL and return + if db, ok := r.DBs.Get(path); ok { + r.DBs.Add(path, db) return nil } @@ -210,64 +178,26 @@ func (r *Replicator) processDBUpdate(path string) error { return fmt.Errorf("failed to init DB from config for (%s): %w", path, err) } - err = r.syncNewDB(db, path) - if err != nil { - return fmt.Errorf("failed to sync new DB (%s): %w", path, err) + if err := db.Open(); err != nil { + return fmt.Errorf("failed to open DB for sync (%s): %w", path, err) } + activeDBGauge.Inc() + // Add to the active set, evicting the oldest DB if necessary. + r.DBs.Add(path, db) + return nil } -func (r *Replicator) expireDBs() error { +// onEvict closes a DB and sets a debounce timer +func (r *Replicator) onEvict(path string, db *litestream.DB) { r.lk.Lock() defer r.lk.Unlock() - - for path, dbEntry := range r.DBEntries { - if time.Now().After(dbEntry.ExpiresAt) { - slog.Warn("closing expired DB", "path", path) - if err := r.removeDB(dbEntry, path); err != nil { - return fmt.Errorf("failed to remove expired DB (%s): %w", path, err) - } - } - } - - return nil -} - -// removeDB removes the given DB from the active DB set and sets a debounce timer -// ONLY CALL THIS WITH THE LOCK HELD -func (r *Replicator) removeDB(entry *DBEntry, path string) error { - delete(r.DBEntries, path) r.DebounceSet[path] = time.Now().Add(time.Second * 8) - if err := entry.DB.Close(); err != nil { - return fmt.Errorf("failed to close DB (%s): %w", path, err) + if err := db.Close(); err != nil { + slog.Error("failed to close DB on eviction", "error", err, "path", path) } activeDBGauge.Dec() - return nil -} - -// evictLRU evicts the least recently used DB if the active DB set is full -// and returns true if an eviction occurred -// ONLY CALL THIS WITH THE LOCK HELD -func (r *Replicator) evictLRU() (bool, error) { - // Check if we need to close an existing DB. - if len(r.DBEntries) >= r.Config.MaxActiveDBs { - // Close the least recently used DB. - var lruPath string - var lruExpiresAt time.Time - for path, dbEntry := range r.DBEntries { - if lruExpiresAt.IsZero() || dbEntry.ExpiresAt.Before(lruExpiresAt) { - lruPath = path - lruExpiresAt = dbEntry.ExpiresAt - } - } - slog.Warn("closing least recently used DB", "path", lruPath, "expires_at", lruExpiresAt) - if err := r.removeDB(r.DBEntries[lruPath], lruPath); err != nil { - return false, fmt.Errorf("failed to remove least recently used DB (%s): %w", lruPath, err) - } - return true, nil - } - return false, nil } func (r *Replicator) shutdown() error { @@ -276,8 +206,8 @@ func (r *Replicator) shutdown() error { slog.Warn("shutting down dir replication") - for _, dbEntry := range r.DBEntries { - if err := dbEntry.DB.Close(); err != nil { + for _, db := range r.DBs.Values() { + if err := db.Close(); err != nil { slog.Error("failed to close DB", "error", err) } }