diff --git a/backend/instancemgmt/instance_manager.go b/backend/instancemgmt/instance_manager.go index 08e1d0608..4ca761d81 100644 --- a/backend/instancemgmt/instance_manager.go +++ b/backend/instancemgmt/instance_manager.go @@ -4,6 +4,7 @@ import ( "context" "reflect" "sync" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -17,6 +18,7 @@ var ( Name: "active_instances", Help: "The number of active plugin instances", }) + disposeTTL = 5 * time.Second ) // Instance is a marker interface for an instance. @@ -119,7 +121,11 @@ func (im *instanceManager) Get(ctx context.Context, pluginContext backend.Plugin } if disposer, valid := ci.instance.(InstanceDisposer); valid { - disposer.Dispose() + time.AfterFunc(disposeTTL, func() { + disposer.Dispose() + activeInstances.Dec() + }) + } else { activeInstances.Dec() } } diff --git a/backend/instancemgmt/instance_manager_test.go b/backend/instancemgmt/instance_manager_test.go index e86e67c75..8351fe240 100644 --- a/backend/instancemgmt/instance_manager_test.go +++ b/backend/instancemgmt/instance_manager_test.go @@ -43,6 +43,11 @@ func TestInstanceManager(t *testing.T) { Updated: time.Now(), }, } + origDisposeTTL := disposeTTL + disposeTTL = time.Millisecond + t.Cleanup(func() { + disposeTTL = origDisposeTTL + }) newInstance, err := im.Get(ctx, pCtxUpdated) t.Run("New instance should be created", func(t *testing.T) { @@ -57,7 +62,9 @@ func TestInstanceManager(t *testing.T) { }) t.Run("Old instance should be disposed", func(t *testing.T) { - require.True(t, instance.(*testInstance).disposed) + instance.(*testInstance).wg.Wait() + require.True(t, instance.(*testInstance).disposed.Load()) + require.Equal(t, int64(1), instance.(*testInstance).disposedTimes.Load()) }) }) }) @@ -95,7 +102,7 @@ func TestInstanceManagerConcurrency(t *testing.T) { t.Run("All created instances should be either disposed or exist in cache for later disposing", func(t *testing.T) { cachedInstance, _ := im.Get(ctx, pCtx) for _, instance := range createdInstances { - if cachedInstance.(*testInstance) != instance && instance.disposedTimes < 1 { + if cachedInstance.(*testInstance) != instance && instance.disposedTimes.Load() < 1 { require.FailNow(t, "Found lost reference to un-disposed instance") } } @@ -103,6 +110,12 @@ func TestInstanceManagerConcurrency(t *testing.T) { }) t.Run("Check possible race condition issues when re-creating instance on settings update", func(t *testing.T) { + origDisposeTTL := disposeTTL + disposeTTL = time.Millisecond + t.Cleanup(func() { + disposeTTL = origDisposeTTL + }) + ctx := context.Background() initialPCtx := backend.PluginContext{ OrgID: 1, @@ -141,12 +154,13 @@ func TestInstanceManagerConcurrency(t *testing.T) { wg.Wait() t.Run("Initial instance should be disposed only once", func(t *testing.T) { - require.Equal(t, int64(1), instanceToDispose.(*testInstance).disposedTimes, "Instance should be disposed only once") + instanceToDispose.(*testInstance).wg.Wait() + require.Equal(t, int64(1), instanceToDispose.(*testInstance).disposedTimes.Load(), "Instance should be disposed only once") }) t.Run("All created instances should be either disposed or exist in cache for later disposing", func(t *testing.T) { cachedInstance, _ := im.Get(ctx, updatedPCtx) for _, instance := range createdInstances { - if cachedInstance.(*testInstance) != instance && instance.disposedTimes < 1 { + if cachedInstance.(*testInstance) != instance && instance.disposedTimes.Load() < 1 { require.FailNow(t, "Found lost reference to un-disposed instance") } } @@ -204,13 +218,15 @@ func TestInstanceManagerConcurrency(t *testing.T) { type testInstance struct { orgID int64 updated time.Time - disposed bool - disposedTimes int64 + disposed atomic.Bool + disposedTimes atomic.Int64 + wg sync.WaitGroup } func (ti *testInstance) Dispose() { - ti.disposed = true - atomic.AddInt64(&ti.disposedTimes, 1) + ti.disposed.Store(true) + ti.disposedTimes.Add(1) + ti.wg.Done() } type testInstanceProvider struct { @@ -231,8 +247,12 @@ func (tip *testInstanceProvider) NewInstance(_ context.Context, pluginContext ba if tip.delay > 0 { time.Sleep(tip.delay) } - return &testInstance{ + + ti := &testInstance{ orgID: pluginContext.OrgID, updated: pluginContext.AppInstanceSettings.Updated, - }, nil + } + ti.wg.Add(1) + + return ti, nil } diff --git a/experimental/e2e/storage/har_test.go b/experimental/e2e/storage/har_test.go index 84d6088ed..cc8245a79 100644 --- a/experimental/e2e/storage/har_test.go +++ b/experimental/e2e/storage/har_test.go @@ -44,11 +44,16 @@ func TestHARStorage(t *testing.T) { require.NoError(t, e) c <- true }() - req, res := exampleRequest() - defer res.Body.Close() - req.URL.Path = "/two" - err = two.Add(req, res) - require.NoError(t, err) + go func() { + req, res := exampleRequest() + defer res.Body.Close() + req.URL.Path = "/two" + err = two.Add(req, res) + require.NoError(t, err) + c <- true + }() + + <-c <-c require.Len(t, one.Entries(), 2) require.Len(t, two.Entries(), 2)