Skip to content

Commit

Permalink
Migrate mongo driver
Browse files Browse the repository at this point in the history
  • Loading branch information
maxcnunes committed Dec 5, 2023
1 parent faf1822 commit 8b97500
Show file tree
Hide file tree
Showing 11 changed files with 273 additions and 164 deletions.
13 changes: 8 additions & 5 deletions backend/backend.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
package backend

import "time"
import (
"context"
"time"
)

//go:generate counterfeiter -o ../fakes/fakebackend/fake_masterlock.go . MasterLock

type MasterLock interface {
// Achieve a lock to become the master. If lock is successful, the provided
// MasterInfo will be filled out and recorded. The MasterInfo passed in will be filled
// out with the remaining details.
Lock(info *MasterInfo) error
Lock(ctx context.Context, info *MasterInfo) error

// Release the lock to relinquish the master role. This will not succeed if the
// provided masterID does not match the ID of the current master.
UnLock(masterID string) error
UnLock(ctx context.Context, masterID string) error

// Write a heartbeat to ensure that the master role is not lost.
// If successful, the last heartbeat time is written to the passed MasterInfo
WriteHeartbeat(info *MasterInfo) error
WriteHeartbeat(ctx context.Context, info *MasterInfo) error

// Get the current master status. Provides the MasterInfo of the current master.
Status() (*MasterInfo, error)
Status(ctx context.Context) (*MasterInfo, error)
}

type MasterInfo struct {
Expand Down
38 changes: 15 additions & 23 deletions backend/mongo/masterlock.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package mongo

import (
"context"
"fmt"
"time"

"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
mgo "github.com/qiniu/qmgo"
"go.mongodb.org/mongo-driver/bson"

"github.com/InVisionApp/go-master/backend"
)
Expand Down Expand Up @@ -36,7 +37,7 @@ func (m *MongoMasterInfo) toMasterInfo() *backend.MasterInfo {
// Achieve a lock to become the master. If lock is successful, the provided
// MasterInfo will be filled out and recorded. The MasterInfo passed in will be filled
// out with the remaining details.
func (m *MongoBackend) Lock(info *backend.MasterInfo) error {
func (m *MongoBackend) Lock(ctx context.Context, info *backend.MasterInfo) error {
// get the heartbeat first to see if there is one before inserting
oldMMI := &MongoMasterInfo{}
t := time.Now()
Expand All @@ -48,12 +49,12 @@ func (m *MongoBackend) Lock(info *backend.MasterInfo) error {
LastHeartbeat: t,
}

err := m.lock.Collection().FindId(MasterInfoID).One(oldMMI)
err := m.lock.Collection().Find(ctx, bson.M{"_id": MasterInfoID}).One(oldMMI)
// an error has occurred and it is not a NotFound
if err != nil {
if err == mgo.ErrNotFound {
if err == mgo.ErrNoSuchDocuments {
// perform an insert
if err := m.lock.Collection().Insert(mmi); err != nil {
if _, err := m.lock.Collection().InsertOne(ctx, mmi); err != nil {
return fmt.Errorf("unable to insert initial lock: %v", err)
}

Expand All @@ -62,9 +63,6 @@ func (m *MongoBackend) Lock(info *backend.MasterInfo) error {

err = fmt.Errorf("failed to fetch current master info: %v", err)

m.log.Debug("attempting to refresh sessions in case of db issues")
m.refresh()

return err
}

Expand All @@ -80,7 +78,7 @@ func (m *MongoBackend) Lock(info *backend.MasterInfo) error {
ReturnNew: true,
}

if _, err := m.lock.Collection().Find(query).Apply(change, mmi); err != nil {
if err := m.lock.Collection().Find(ctx, query).Apply(change, mmi); err != nil {
err = fmt.Errorf("unable to complete findModify: %v", err)
return err
}
Expand All @@ -89,19 +87,13 @@ func (m *MongoBackend) Lock(info *backend.MasterInfo) error {

}

// Force refresh all sessions (bypassing SmartCollection's auto-refresh)
// TODO: add auto-refreshing functionality across the board (GetNextJob, CompleteJob, etc.)
func (m *MongoBackend) refresh() {
m.lock.Collection().Database.Session.Refresh()
}

// Release the lock to relinquish the master role. This will not succeed if the
// provided masterID does not match the ID of the current master.
func (m *MongoBackend) UnLock(masterID string) error {
func (m *MongoBackend) UnLock(ctx context.Context, masterID string) error {
query := bson.M{"master_id": masterID}

if err := m.lock.Collection().Remove(query); err != nil {
if err == mgo.ErrNotFound { // not found is ok, already gone
if err := m.lock.Collection().Remove(ctx, query); err != nil {
if err == mgo.ErrNoSuchDocuments { // not found is ok, already gone
return nil
}

Expand All @@ -113,7 +105,7 @@ func (m *MongoBackend) UnLock(masterID string) error {

// Write a heartbeat to ensure that the master role is not lost.
// If successful, the last heartbeat time is written to the passed MasterInfo
func (m *MongoBackend) WriteHeartbeat(info *backend.MasterInfo) error {
func (m *MongoBackend) WriteHeartbeat(ctx context.Context, info *backend.MasterInfo) error {
query := bson.M{"master_id": info.MasterID}

lastHeartbeat := time.Now()
Expand All @@ -124,7 +116,7 @@ func (m *MongoBackend) WriteHeartbeat(info *backend.MasterInfo) error {
},
}

if err := m.lock.coll.Update(query, change); err != nil {
if err := m.lock.coll.UpdateOne(ctx, query, change); err != nil {
return fmt.Errorf("Unable to complete heartbeat update: %v", err)
}

Expand All @@ -134,9 +126,9 @@ func (m *MongoBackend) WriteHeartbeat(info *backend.MasterInfo) error {
}

// Get the current master status. Provides the MasterInfo of the current master.
func (m *MongoBackend) Status() (*backend.MasterInfo, error) {
func (m *MongoBackend) Status(ctx context.Context) (*backend.MasterInfo, error) {
mi := &backend.MasterInfo{}
if err := m.lock.Collection().FindId(MasterInfoID).One(mi); err != nil {
if err := m.lock.Collection().Find(ctx, bson.M{"_id": MasterInfoID}).One(mi); err != nil {
return nil, fmt.Errorf("failed to fetch master info: %v", err)
}

Expand Down
Loading

0 comments on commit 8b97500

Please sign in to comment.