diff --git a/.mockery.yaml b/.mockery.yaml index 25e5682f..75c760ae 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -149,6 +149,7 @@ packages: config: interfaces: Client: + ScheduleClient: opencsg.com/csghub-server/builder/importer: config: interfaces: diff --git a/_mocks/opencsg.com/csghub-server/builder/temporal/mock_Client.go b/_mocks/opencsg.com/csghub-server/builder/temporal/mock_Client.go index cd0fde1a..6d034e95 100644 --- a/_mocks/opencsg.com/csghub-server/builder/temporal/mock_Client.go +++ b/_mocks/opencsg.com/csghub-server/builder/temporal/mock_Client.go @@ -15,6 +15,8 @@ import ( operatorservice "go.temporal.io/api/operatorservice/v1" + temporal "opencsg.com/csghub-server/builder/temporal" + worker "go.temporal.io/sdk/worker" workflowservice "go.temporal.io/api/workflowservice/v1" @@ -580,6 +582,53 @@ func (_c *MockClient_ExecuteWorkflow_Call) RunAndReturn(run func(context.Context return _c } +// GetScheduleClient provides a mock function with given fields: +func (_m *MockClient) GetScheduleClient() temporal.ScheduleClient { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetScheduleClient") + } + + var r0 temporal.ScheduleClient + if rf, ok := ret.Get(0).(func() temporal.ScheduleClient); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(temporal.ScheduleClient) + } + } + + return r0 +} + +// MockClient_GetScheduleClient_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetScheduleClient' +type MockClient_GetScheduleClient_Call struct { + *mock.Call +} + +// GetScheduleClient is a helper method to define mock.On call +func (_e *MockClient_Expecter) GetScheduleClient() *MockClient_GetScheduleClient_Call { + return &MockClient_GetScheduleClient_Call{Call: _e.mock.On("GetScheduleClient")} +} + +func (_c *MockClient_GetScheduleClient_Call) Run(run func()) *MockClient_GetScheduleClient_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockClient_GetScheduleClient_Call) Return(_a0 temporal.ScheduleClient) *MockClient_GetScheduleClient_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockClient_GetScheduleClient_Call) RunAndReturn(run func() temporal.ScheduleClient) *MockClient_GetScheduleClient_Call { + _c.Call.Return(run) + return _c +} + // GetSearchAttributes provides a mock function with given fields: ctx func (_m *MockClient) GetSearchAttributes(ctx context.Context) (*workflowservice.GetSearchAttributesResponse, error) { ret := _m.Called(ctx) diff --git a/_mocks/opencsg.com/csghub-server/builder/temporal/mock_ScheduleClient.go b/_mocks/opencsg.com/csghub-server/builder/temporal/mock_ScheduleClient.go new file mode 100644 index 00000000..6cb84dde --- /dev/null +++ b/_mocks/opencsg.com/csghub-server/builder/temporal/mock_ScheduleClient.go @@ -0,0 +1,97 @@ +// Code generated by mockery v2.49.1. DO NOT EDIT. + +package temporal + +import ( + context "context" + + client "go.temporal.io/sdk/client" + + mock "github.com/stretchr/testify/mock" +) + +// MockScheduleClient is an autogenerated mock type for the ScheduleClient type +type MockScheduleClient struct { + mock.Mock +} + +type MockScheduleClient_Expecter struct { + mock *mock.Mock +} + +func (_m *MockScheduleClient) EXPECT() *MockScheduleClient_Expecter { + return &MockScheduleClient_Expecter{mock: &_m.Mock} +} + +// Create provides a mock function with given fields: ctx, options +func (_m *MockScheduleClient) Create(ctx context.Context, options client.ScheduleOptions) (client.ScheduleHandle, error) { + ret := _m.Called(ctx, options) + + if len(ret) == 0 { + panic("no return value specified for Create") + } + + var r0 client.ScheduleHandle + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, client.ScheduleOptions) (client.ScheduleHandle, error)); ok { + return rf(ctx, options) + } + if rf, ok := ret.Get(0).(func(context.Context, client.ScheduleOptions) client.ScheduleHandle); ok { + r0 = rf(ctx, options) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(client.ScheduleHandle) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, client.ScheduleOptions) error); ok { + r1 = rf(ctx, options) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockScheduleClient_Create_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Create' +type MockScheduleClient_Create_Call struct { + *mock.Call +} + +// Create is a helper method to define mock.On call +// - ctx context.Context +// - options client.ScheduleOptions +func (_e *MockScheduleClient_Expecter) Create(ctx interface{}, options interface{}) *MockScheduleClient_Create_Call { + return &MockScheduleClient_Create_Call{Call: _e.mock.On("Create", ctx, options)} +} + +func (_c *MockScheduleClient_Create_Call) Run(run func(ctx context.Context, options client.ScheduleOptions)) *MockScheduleClient_Create_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(client.ScheduleOptions)) + }) + return _c +} + +func (_c *MockScheduleClient_Create_Call) Return(_a0 client.ScheduleHandle, _a1 error) *MockScheduleClient_Create_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockScheduleClient_Create_Call) RunAndReturn(run func(context.Context, client.ScheduleOptions) (client.ScheduleHandle, error)) *MockScheduleClient_Create_Call { + _c.Call.Return(run) + return _c +} + +// NewMockScheduleClient creates a new instance of MockScheduleClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockScheduleClient(t interface { + mock.TestingT + Cleanup(func()) +}) *MockScheduleClient { + mock := &MockScheduleClient{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/api/workflow/cron_worker_ce.go b/api/workflow/cron_worker_ce.go index faee075f..d6cfce57 100644 --- a/api/workflow/cron_worker_ce.go +++ b/api/workflow/cron_worker_ce.go @@ -9,13 +9,14 @@ import ( enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" + "opencsg.com/csghub-server/api/workflow/activity" "opencsg.com/csghub-server/builder/temporal" "opencsg.com/csghub-server/common/config" ) func RegisterCronJobs(config *config.Config, temporalClient temporal.Client) error { var err error - scheduler := temporalClient.ScheduleClient() + scheduler := temporalClient.GetScheduleClient() _, err = scheduler.Create(context.Background(), client.ScheduleOptions{ ID: "sync-as-client-schedule", @@ -54,9 +55,10 @@ func RegisterCronJobs(config *config.Config, temporalClient temporal.Client) err return nil } -func RegisterCronWorker(config *config.Config, temporalClient temporal.Client) { +func RegisterCronWorker(config *config.Config, temporalClient temporal.Client, activities *activity.Activities) { wfWorker := temporalClient.NewWorker(CronJobQueueName, worker.Options{}) + wfWorker.RegisterActivity(activities) wfWorker.RegisterWorkflow(SyncAsClientWorkflow) wfWorker.RegisterWorkflow(CalcRecomScoreWorkflow) diff --git a/api/workflow/worker_ce.go b/api/workflow/worker_ce.go index 0af82e7a..077c9dad 100644 --- a/api/workflow/worker_ce.go +++ b/api/workflow/worker_ce.go @@ -69,9 +69,13 @@ func StartWorkflowDI( worker.RegisterWorkflow(HandlePushWorkflow) - RegisterCronWorker(cfg, temporalClient) + RegisterCronWorker(cfg, temporalClient, act) + err := RegisterCronJobs(cfg, temporalClient) + if err != nil { + return fmt.Errorf("failed to register cron jobs: %w", err) + } - err := temporalClient.Start() + err = temporalClient.Start() if err != nil { return fmt.Errorf("failed to start worker: %w", err) } diff --git a/api/workflow/workflow_ce_test.go b/api/workflow/workflow_ce_test.go index 1e499d60..82502281 100644 --- a/api/workflow/workflow_ce_test.go +++ b/api/workflow/workflow_ce_test.go @@ -3,9 +3,12 @@ package workflow_test import ( + "context" "testing" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/client" "go.temporal.io/sdk/testsuite" mock_git "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/git/gitserver" mock_temporal "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/temporal" @@ -18,7 +21,10 @@ import ( func newWorkflowTester(t *testing.T) (*workflowTester, error) { suite := testsuite.WorkflowTestSuite{} - tester := &workflowTester{env: suite.NewTestWorkflowEnvironment()} + tester := &workflowTester{ + env: suite.NewTestWorkflowEnvironment(), + cronEnv: suite.NewTestWorkflowEnvironment(), + } // Mock the dependencies tester.mocks.stores = tests.NewMockStores(t) @@ -35,13 +41,22 @@ func newWorkflowTester(t *testing.T) (*workflowTester, error) { mg := mock_git.NewMockGitServer(t) tester.mocks.gitServer = mg + cfg := &config.Config{} mtc := mock_temporal.NewMockClient(t) mtc.EXPECT().NewWorker(workflow.HandlePushQueueName, mock.Anything).Return(tester.env) - mtc.EXPECT().NewWorker(workflow.CronJobQueueName, mock.Anything).Return(tester.env) + mtc.EXPECT().NewWorker(workflow.CronJobQueueName, mock.Anything).Return(tester.cronEnv) mtc.EXPECT().Start().Return(nil) tester.mocks.temporal = mtc - - cfg := &config.Config{} + msc := mock_temporal.NewMockScheduleClient(t) + mtc.EXPECT().GetScheduleClient().Return(msc) + msc.EXPECT().Create(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, so client.ScheduleOptions) (client.ScheduleHandle, error) { + require.Equal(t, "sync-as-client-schedule", so.ID) + return nil, nil + }).Once() + msc.EXPECT().Create(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, so client.ScheduleOptions) (client.ScheduleHandle, error) { + require.Equal(t, "calc-recom-score-schedule", so.ID) + return nil, nil + }).Once() err := workflow.StartWorkflowDI( cfg, mcb, mr, mg, mm, tester.mocks.stores.SyncClientSettingMock(), mtc, diff --git a/api/workflow/workflow_test.go b/api/workflow/workflow_test.go index 6f29e91e..bb74ad61 100644 --- a/api/workflow/workflow_test.go +++ b/api/workflow/workflow_test.go @@ -18,8 +18,9 @@ import ( ) type workflowTester struct { - env *testsuite.TestWorkflowEnvironment - mocks struct { + env *testsuite.TestWorkflowEnvironment + cronEnv *testsuite.TestWorkflowEnvironment + mocks struct { callback *mock_callback.MockGitCallbackComponent recom *mock_component.MockRecomComponent multisync *mock_component.MockMultiSyncComponent @@ -34,9 +35,9 @@ func TestWorkflow_CalcRecomScoreWorkflow(t *testing.T) { require.NoError(t, err) tester.mocks.recom.EXPECT().CalculateRecomScore(mock.Anything).Return() - tester.env.ExecuteWorkflow(workflow.CalcRecomScoreWorkflow) - require.True(t, tester.env.IsWorkflowCompleted()) - require.NoError(t, tester.env.GetWorkflowError()) + tester.cronEnv.ExecuteWorkflow(workflow.CalcRecomScoreWorkflow) + require.True(t, tester.cronEnv.IsWorkflowCompleted()) + require.NoError(t, tester.cronEnv.GetWorkflowError()) } func TestWorkflow_SyncAsClient(t *testing.T) { @@ -50,9 +51,9 @@ func TestWorkflow_SyncAsClient(t *testing.T) { mock.Anything, multisync.FromOpenCSG("", "tk"), ).Return(nil) - tester.env.ExecuteWorkflow(workflow.SyncAsClientWorkflow) - require.True(t, tester.env.IsWorkflowCompleted()) - require.NoError(t, tester.env.GetWorkflowError()) + tester.cronEnv.ExecuteWorkflow(workflow.SyncAsClientWorkflow) + require.True(t, tester.cronEnv.IsWorkflowCompleted()) + require.NoError(t, tester.cronEnv.GetWorkflowError()) } diff --git a/builder/temporal/temporal.go b/builder/temporal/temporal.go index 43439db9..b1c4afa0 100644 --- a/builder/temporal/temporal.go +++ b/builder/temporal/temporal.go @@ -1,6 +1,8 @@ package temporal import ( + "context" + "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" ) @@ -8,10 +10,15 @@ import ( type Client interface { client.Client NewWorker(queue string, options worker.Options) worker.Registry + GetScheduleClient() ScheduleClient Start() error Stop() } +type ScheduleClient interface { + Create(ctx context.Context, options client.ScheduleOptions) (client.ScheduleHandle, error) +} + type clientImpl struct { client.Client workers []worker.Worker @@ -32,6 +39,10 @@ func (c *clientImpl) NewWorker(queue string, options worker.Options) worker.Regi return w } +func (c *clientImpl) GetScheduleClient() ScheduleClient { + return c.ScheduleClient() +} + func (c *clientImpl) Start() error { for _, worker := range c.workers { err := worker.Start()