Skip to content

Commit

Permalink
Fix temporal cron job register (#233)
Browse files Browse the repository at this point in the history
* Merge branch 'fix/cron_not_run' into 'main'

fix cron job schedule and add tests

See merge request product/starhub/starhub-server!793

* Merge branch 'fix/cron_not_run' into 'main'

fix cron activity not registered

See merge request product/starhub/starhub-server!795

---------

Co-authored-by: yiling.ji <[email protected]>
  • Loading branch information
Yiling-J and yiling.ji authored Jan 8, 2025
1 parent 54a7714 commit fac4e11
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 16 deletions.
1 change: 1 addition & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ packages:
config:
interfaces:
Client:
ScheduleClient:
opencsg.com/csghub-server/builder/importer:
config:
interfaces:
Expand Down
49 changes: 49 additions & 0 deletions _mocks/opencsg.com/csghub-server/builder/temporal/mock_Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions api/workflow/cron_worker_ce.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"opencsg.com/csghub-server/api/workflow/activity"
"opencsg.com/csghub-server/builder/temporal"
"opencsg.com/csghub-server/common/config"
)

func RegisterCronJobs(config *config.Config, temporalClient temporal.Client) error {
var err error
scheduler := temporalClient.ScheduleClient()
scheduler := temporalClient.GetScheduleClient()

_, err = scheduler.Create(context.Background(), client.ScheduleOptions{
ID: "sync-as-client-schedule",
Expand Down Expand Up @@ -54,9 +55,10 @@ func RegisterCronJobs(config *config.Config, temporalClient temporal.Client) err
return nil
}

func RegisterCronWorker(config *config.Config, temporalClient temporal.Client) {
func RegisterCronWorker(config *config.Config, temporalClient temporal.Client, activities *activity.Activities) {

wfWorker := temporalClient.NewWorker(CronJobQueueName, worker.Options{})
wfWorker.RegisterActivity(activities)
wfWorker.RegisterWorkflow(SyncAsClientWorkflow)
wfWorker.RegisterWorkflow(CalcRecomScoreWorkflow)

Expand Down
8 changes: 6 additions & 2 deletions api/workflow/worker_ce.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,13 @@ func StartWorkflowDI(

worker.RegisterWorkflow(HandlePushWorkflow)

RegisterCronWorker(cfg, temporalClient)
RegisterCronWorker(cfg, temporalClient, act)
err := RegisterCronJobs(cfg, temporalClient)
if err != nil {
return fmt.Errorf("failed to register cron jobs: %w", err)
}

err := temporalClient.Start()
err = temporalClient.Start()
if err != nil {
return fmt.Errorf("failed to start worker: %w", err)
}
Expand Down
23 changes: 19 additions & 4 deletions api/workflow/workflow_ce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
package workflow_test

import (
"context"
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/testsuite"
mock_git "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/git/gitserver"
mock_temporal "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/temporal"
Expand All @@ -18,7 +21,10 @@ import (

func newWorkflowTester(t *testing.T) (*workflowTester, error) {
suite := testsuite.WorkflowTestSuite{}
tester := &workflowTester{env: suite.NewTestWorkflowEnvironment()}
tester := &workflowTester{
env: suite.NewTestWorkflowEnvironment(),
cronEnv: suite.NewTestWorkflowEnvironment(),
}

// Mock the dependencies
tester.mocks.stores = tests.NewMockStores(t)
Expand All @@ -35,13 +41,22 @@ func newWorkflowTester(t *testing.T) (*workflowTester, error) {
mg := mock_git.NewMockGitServer(t)
tester.mocks.gitServer = mg

cfg := &config.Config{}
mtc := mock_temporal.NewMockClient(t)
mtc.EXPECT().NewWorker(workflow.HandlePushQueueName, mock.Anything).Return(tester.env)
mtc.EXPECT().NewWorker(workflow.CronJobQueueName, mock.Anything).Return(tester.env)
mtc.EXPECT().NewWorker(workflow.CronJobQueueName, mock.Anything).Return(tester.cronEnv)
mtc.EXPECT().Start().Return(nil)
tester.mocks.temporal = mtc

cfg := &config.Config{}
msc := mock_temporal.NewMockScheduleClient(t)
mtc.EXPECT().GetScheduleClient().Return(msc)
msc.EXPECT().Create(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, so client.ScheduleOptions) (client.ScheduleHandle, error) {
require.Equal(t, "sync-as-client-schedule", so.ID)
return nil, nil
}).Once()
msc.EXPECT().Create(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, so client.ScheduleOptions) (client.ScheduleHandle, error) {
require.Equal(t, "calc-recom-score-schedule", so.ID)
return nil, nil
}).Once()

err := workflow.StartWorkflowDI(
cfg, mcb, mr, mg, mm, tester.mocks.stores.SyncClientSettingMock(), mtc,
Expand Down
17 changes: 9 additions & 8 deletions api/workflow/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import (
)

type workflowTester struct {
env *testsuite.TestWorkflowEnvironment
mocks struct {
env *testsuite.TestWorkflowEnvironment
cronEnv *testsuite.TestWorkflowEnvironment
mocks struct {
callback *mock_callback.MockGitCallbackComponent
recom *mock_component.MockRecomComponent
multisync *mock_component.MockMultiSyncComponent
Expand All @@ -34,9 +35,9 @@ func TestWorkflow_CalcRecomScoreWorkflow(t *testing.T) {
require.NoError(t, err)

tester.mocks.recom.EXPECT().CalculateRecomScore(mock.Anything).Return()
tester.env.ExecuteWorkflow(workflow.CalcRecomScoreWorkflow)
require.True(t, tester.env.IsWorkflowCompleted())
require.NoError(t, tester.env.GetWorkflowError())
tester.cronEnv.ExecuteWorkflow(workflow.CalcRecomScoreWorkflow)
require.True(t, tester.cronEnv.IsWorkflowCompleted())
require.NoError(t, tester.cronEnv.GetWorkflowError())
}

func TestWorkflow_SyncAsClient(t *testing.T) {
Expand All @@ -50,9 +51,9 @@ func TestWorkflow_SyncAsClient(t *testing.T) {
mock.Anything, multisync.FromOpenCSG("", "tk"),
).Return(nil)

tester.env.ExecuteWorkflow(workflow.SyncAsClientWorkflow)
require.True(t, tester.env.IsWorkflowCompleted())
require.NoError(t, tester.env.GetWorkflowError())
tester.cronEnv.ExecuteWorkflow(workflow.SyncAsClientWorkflow)
require.True(t, tester.cronEnv.IsWorkflowCompleted())
require.NoError(t, tester.cronEnv.GetWorkflowError())

}

Expand Down
11 changes: 11 additions & 0 deletions builder/temporal/temporal.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
package temporal

import (
"context"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

type Client interface {
client.Client
NewWorker(queue string, options worker.Options) worker.Registry
GetScheduleClient() ScheduleClient
Start() error
Stop()
}

type ScheduleClient interface {
Create(ctx context.Context, options client.ScheduleOptions) (client.ScheduleHandle, error)
}

type clientImpl struct {
client.Client
workers []worker.Worker
Expand All @@ -32,6 +39,10 @@ func (c *clientImpl) NewWorker(queue string, options worker.Options) worker.Regi
return w
}

func (c *clientImpl) GetScheduleClient() ScheduleClient {
return c.ScheduleClient()
}

func (c *clientImpl) Start() error {
for _, worker := range c.workers {
err := worker.Start()
Expand Down

0 comments on commit fac4e11

Please sign in to comment.