Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core]: recuding DynamoDB data volume #97

Merged
merged 9 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lerna.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"$schema": "node_modules/lerna/schemas/lerna-schema.json",
"version": "0.1.42-beta.0",
"version": "0.1.43-beta.0",
"packages": ["packages/*"]
}
16 changes: 8 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@mbc-cqrs-serverless/cli",
"version": "0.1.42-beta.0",
"version": "0.1.43-beta.0",
"description": "a CLI to get started with MBC CQRS serverless framework",
"keywords": [
"mbc",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"Value": "[email protected]"
},
{
"Name": "custom:tenant_code",
"Name": "custom:tenant",
"Value": "mbc"
},
{
Expand Down Expand Up @@ -261,7 +261,7 @@
}
},
{
"Name": "custom:tenant_code",
"Name": "custom:tenant",
"AttributeDataType": "String",
"DeveloperOnlyAttribute": false,
"Mutable": true,
Expand Down
20 changes: 19 additions & 1 deletion packages/cli/templates/infra-local/serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ stepFunctions:
Choices:
- Variable: $.result
NumericEquals: 0
Next: history_copy
Next: set_ttl_command
- Variable: $.result
NumericEquals: 1
Next: wait_prev_command
Expand All @@ -213,6 +213,24 @@ stepFunctions:
MaxAttempts: 5
BackoffRate: 2
OutputPath: $.Payload[0][0]
Next: set_ttl_command
set_ttl_command:
Type: Task
Resource: arn:aws:states:::lambda:invoke
Parameters:
FunctionName: arn:aws:lambda:ap-northeast-1:101010101010:function:serverless-example-dev-main
Payload:
input.$: $
context.$: $$
Retry:
- ErrorEquals:
- Lambda.ServiceException
- Lambda.AWSLambdaException
- Lambda.SdkClientException
IntervalSeconds: 2
MaxAttempts: 5
BackoffRate: 2
OutputPath: $.Payload[0][0]
Next: history_copy
history_copy:
Type: Task
Expand Down
12 changes: 9 additions & 3 deletions packages/cli/templates/infra/libs/infra-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export class InfraStack extends cdk.Stack {
mfa: cdk.aws_cognito.Mfa.OFF,
accountRecovery: cdk.aws_cognito.AccountRecovery.NONE,
customAttributes: {
tenant_code: new cdk.aws_cognito.StringAttribute({
tenant: new cdk.aws_cognito.StringAttribute({
mutable: true,
maxLen: 50,
}),
Expand Down Expand Up @@ -697,9 +697,15 @@ export class InfraStack extends cdk.Stack {
transformData,
cdk.aws_stepfunctions.IntegrationPattern.REQUEST_RESPONSE,
)
const setTtlCommand = lambdaInvoke(
'set_ttl_command',
historyCopy,
cdk.aws_stepfunctions.IntegrationPattern.REQUEST_RESPONSE,
)

const waitPrevCommand = lambdaInvoke(
'wait_prev_command',
historyCopy,
setTtlCommand,
cdk.aws_stepfunctions.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
)

Expand All @@ -713,7 +719,7 @@ export class InfraStack extends cdk.Stack {
)
.when(
cdk.aws_stepfunctions.Condition.numberEquals('$.result', 0),
historyCopy,
setTtlCommand,
)
.when(
cdk.aws_stepfunctions.Condition.numberEquals('$.result', 1),
Expand Down
15 changes: 13 additions & 2 deletions packages/cli/templates/infra/test/__snapshots__/infra.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ exports[`snapshot test for InfraStack 1`] = `
"Arn",
],
},
"","Payload":{"input.$":"$","context.$":"$$"}}},"check_version_result":{"Type":"Choice","Choices":[{"Variable":"$.result","NumericEquals":0,"Next":"history_copy"},{"Variable":"$.result","NumericEquals":1,"Next":"wait_prev_command"},{"Variable":"$.result","NumericEquals":-1,"Next":"fail"}],"Default":"wait_prev_command"},"wait_prev_command":{"Next":"history_copy","Retry":[{"ErrorEquals":["Lambda.ClientExecutionTimeoutException","Lambda.ServiceException","Lambda.AWSLambdaException","Lambda.SdkClientException"],"IntervalSeconds":2,"MaxAttempts":6,"BackoffRate":2}],"Type":"Task","OutputPath":"$.Payload[0][0]","Resource":"arn:",
"","Payload":{"input.$":"$","context.$":"$$"}}},"check_version_result":{"Type":"Choice","Choices":[{"Variable":"$.result","NumericEquals":0,"Next":"set_ttl_command"},{"Variable":"$.result","NumericEquals":1,"Next":"wait_prev_command"},{"Variable":"$.result","NumericEquals":-1,"Next":"fail"}],"Default":"wait_prev_command"},"wait_prev_command":{"Next":"set_ttl_command","Retry":[{"ErrorEquals":["Lambda.ClientExecutionTimeoutException","Lambda.ServiceException","Lambda.AWSLambdaException","Lambda.SdkClientException"],"IntervalSeconds":2,"MaxAttempts":6,"BackoffRate":2}],"Type":"Task","OutputPath":"$.Payload[0][0]","Resource":"arn:",
{
"Ref": "AWS::Partition",
},
Expand All @@ -253,7 +253,18 @@ exports[`snapshot test for InfraStack 1`] = `
"Arn",
],
},
"","Payload":{"input.$":"$","context.$":"$$","taskToken.$":"$$.Task.Token"}}},"history_copy":{"Next":"transform_data","Retry":[{"ErrorEquals":["Lambda.ClientExecutionTimeoutException","Lambda.ServiceException","Lambda.AWSLambdaException","Lambda.SdkClientException"],"IntervalSeconds":2,"MaxAttempts":6,"BackoffRate":2}],"Type":"Task","OutputPath":"$.Payload[0][0]","Resource":"arn:",
"","Payload":{"input.$":"$","context.$":"$$","taskToken.$":"$$.Task.Token"}}},"set_ttl_command":{"Next":"history_copy","Retry":[{"ErrorEquals":["Lambda.ClientExecutionTimeoutException","Lambda.ServiceException","Lambda.AWSLambdaException","Lambda.SdkClientException"],"IntervalSeconds":2,"MaxAttempts":6,"BackoffRate":2}],"Type":"Task","OutputPath":"$.Payload[0][0]","Resource":"arn:",
{
"Ref": "AWS::Partition",
},
":states:::lambda:invoke","Parameters":{"FunctionName":"",
{
"Fn::GetAtt": [
"lambdaapi893CD94E",
"Arn",
],
},
"","Payload":{"input.$":"$","context.$":"$$"}}},"history_copy":{"Next":"transform_data","Retry":[{"ErrorEquals":["Lambda.ClientExecutionTimeoutException","Lambda.ServiceException","Lambda.AWSLambdaException","Lambda.SdkClientException"],"IntervalSeconds":2,"MaxAttempts":6,"BackoffRate":2}],"Type":"Task","OutputPath":"$.Payload[0][0]","Resource":"arn:",
{
"Ref": "AWS::Partition",
},
Expand Down
1 change: 1 addition & 0 deletions packages/core/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,4 @@ DATABASE_URL="mysql://root:[email protected]:3306/cqrs?schema=public&connection_l
# serverless dynamodb local stream

LOCAL_DDB_TESTING_TABLE_STREAM=arn:aws:dynamodb:ddblocal:000000000000:table/local-demo-testing_table-command/stream/2024-09-20T07:06:05.837
LOCAL_DDB_MASTER_STREAM=arn:aws:dynamodb:ddblocal:000000000000:table/local-demo-master-command/stream/2024-12-13T07:55:58.001
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@mbc-cqrs-serverless/core",
"version": "0.1.42-beta.0",
"version": "0.1.43-beta.0",
"description": "CQRS and event base core",
"keywords": [
"mbc",
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/command-events/sfn-name.enum.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export enum DataSyncCommandSfnName {
CHECK_VERSION = 'check_version',
WAIT_PREV_COMMAND = 'wait_prev_command',
SET_TTL_COMMAND = 'set_ttl_command',
HISTORY_COPY = 'history_copy',
TRANSFORM_DATA = 'transform_data',
SYNC_DATA = 'sync_data',
Expand Down
78 changes: 73 additions & 5 deletions packages/core/src/commands/command.event.handler.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { CommandService } from './command.service'
import { DataSyncDdsHandler } from './handlers/data-sync-dds.handler'
import { HistoryService } from './history.service'
import { DataSyncCommandSfnName } from '../command-events/sfn-name.enum'
import { TtlService } from './ttl.service'

export class MockedHandler implements IDataSyncHandler {
async up(cmd: CommandModel): Promise<any> {
Expand Down Expand Up @@ -98,8 +99,15 @@ const createEvent = (

const sfnCheckVersionEvent = createEvent(DataSyncCommandSfnName.CHECK_VERSION)

const sfnSetTtlCommandEvent = createEvent(
DataSyncCommandSfnName.SET_TTL_COMMAND,
{
result: 0,
},
)

const sfnHistoryCopyEvent = createEvent(DataSyncCommandSfnName.HISTORY_COPY, {
result: 0,
result: 'ok',
})

const sfnTransformDataEvent = createEvent(
Expand Down Expand Up @@ -138,6 +146,7 @@ describe('DataSyncCommandSfnEventHandler', () => {
DynamoDbService,
SnsService,
MockedHandler,
TtlService,
{
provide: MODULE_OPTIONS_TOKEN,
useValue: {
Expand Down Expand Up @@ -268,6 +277,54 @@ describe('DataSyncCommandSfnEventHandler', () => {
})
})

it('should return result = ok when executing the correct set ttl command event', async () => {
// Arrange
dynamoDBMock.on(UpdateItemCommand).resolves({} as any)
snsMock.on(PublishCommand).resolves({} as any)
dynamoDBMock
.on(GetItemCommand)
.resolves({ Item: { sk: { S: '1726027976' }, version: { N: '1' } } })

// Action
const result = await commandEventHandler.execute(sfnSetTtlCommandEvent)

// Assert
expect(result).toEqual({ result: 'ok' })
})

it('should call the AWS service with the correct parameters when executing the set ttl command event', async () => {
// Arrange
dynamoDBMock.on(UpdateItemCommand).resolves({} as any)
snsMock.on(PublishCommand).resolves({} as any)
dynamoDBMock
.on(GetItemCommand)
.resolves({ Item: { sk: { S: '1726027976' }, version: { N: '1' } } })

// Action
await commandEventHandler.execute(sfnSetTtlCommandEvent)

// Assert
expect(dynamoDBMock).toHaveReceivedCommandTimes(UpdateItemCommand, 2)

expect(snsMock).toHaveReceivedCommandWith(PublishCommand, {
Message: expect.stringContaining('set_ttl_command'),
})
expect(dynamoDBMock).toHaveReceivedNthCommandWith(1, UpdateItemCommand, {
TableName: 'env-app_name-table_name-command',
Key: { pk: { S: 'tenantCode#test' }, sk: { S: '1726027976@1' } },
ExpressionAttributeValues: expect.objectContaining({
':status': { S: 'set_ttl_command:STARTED' },
}),
})
expect(dynamoDBMock).toHaveReceivedNthCommandWith(2, UpdateItemCommand, {
TableName: 'env-app_name-table_name-command',
Key: { pk: { S: 'tenantCode#test' }, sk: { S: '1726027976@1' } },
ExpressionAttributeValues: expect.objectContaining({
':status': { S: 'set_ttl_command:FINISHED' },
}),
})
})

it('should return result = ok when executing the correct history copy event', async () => {
// Arrange
dynamoDBMock.on(UpdateItemCommand).resolves({} as any)
Expand Down Expand Up @@ -296,7 +353,7 @@ describe('DataSyncCommandSfnEventHandler', () => {

// Assert
expect(dynamoDBMock).toHaveReceivedCommandTimes(UpdateItemCommand, 2)
expect(dynamoDBMock).toHaveReceivedCommandTimes(GetItemCommand, 1)
expect(dynamoDBMock).toHaveReceivedCommandTimes(GetItemCommand, 2)
expect(snsMock).toHaveReceivedCommandTimes(PublishCommand, 2)

expect(snsMock).toHaveReceivedCommandWith(PublishCommand, {
Expand All @@ -313,11 +370,22 @@ describe('DataSyncCommandSfnEventHandler', () => {
TableName: 'env-app_name-table_name-data',
Key: { pk: { S: 'tenantCode#test' }, sk: { S: '1726027976' } },
})
expect(dynamoDBMock).toHaveReceivedNthCommandWith(3, PutItemCommand, {
expect(dynamoDBMock).toHaveReceivedNthCommandWith(3, GetItemCommand, {
Key: {
pk: { S: 'MASTER#test' },
sk: { S: 'TTL#env-app_name-table_name-history' },
},
TableName: 'env-app_name-master-data',
})
expect(dynamoDBMock).toHaveReceivedNthCommandWith(4, PutItemCommand, {
TableName: 'env-app_name-table_name-history',
Item: { sk: { S: '1726027976@1' }, version: { N: '1' } },
Item: {
sk: { S: '1726027976@1' },
version: { N: '1' },
ttl: { NULL: true },
},
})
expect(dynamoDBMock).toHaveReceivedNthCommandWith(4, UpdateItemCommand, {
expect(dynamoDBMock).toHaveReceivedNthCommandWith(5, UpdateItemCommand, {
TableName: 'env-app_name-table_name-command',
Key: { pk: { S: 'tenantCode#test' }, sk: { S: '1726027976@1' } },
ExpressionAttributeValues: expect.objectContaining({
Expand Down
18 changes: 18 additions & 0 deletions packages/core/src/commands/command.event.handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ export class CommandEventHandler {
case DataSyncCommandSfnName.WAIT_PREV_COMMAND:
return await this.waitConfirmToken(event)

case DataSyncCommandSfnName.SET_TTL_COMMAND:
return await this.setTtlCommand(event)

case DataSyncCommandSfnName.HISTORY_COPY:
return await this.historyCopy(event)

Expand Down Expand Up @@ -130,6 +133,21 @@ export class CommandEventHandler {
}
}

protected async setTtlCommand(
event: DataSyncCommandSfnEvent,
): Promise<StepFunctionStateInput> {
this.logger.debug('setTtlCommand:: ', event.commandRecord)

await this.commandService.updateTtl({
pk: event.commandRecord.pk,
sk: event.commandRecord.sk,
})

return {
result: 'ok',
}
}

protected async historyCopy(
event: DataSyncCommandSfnEvent,
): Promise<StepFunctionStateInput> {
Expand Down
10 changes: 9 additions & 1 deletion packages/core/src/commands/command.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,26 @@ import { CommandService } from './command.service'
import { DataService } from './data.service'
import { DataSyncDdsHandler } from './handlers/data-sync-dds.handler'
import { HistoryService } from './history.service'
import { TtlService } from './ttl.service'

@Module({
imports: [],
providers: [
ExplorerService,
CommandService,
DataService,
TtlService,
HistoryService,
CommandEventHandler,
DataSyncDdsHandler,
],
exports: [CommandService, DataService, HistoryService, CommandEventHandler],
exports: [
CommandService,
DataService,
HistoryService,
CommandEventHandler,
TtlService,
],
})
export class CommandModule extends ConfigurableModuleClass {
static register(options: typeof OPTIONS_TYPE): DynamicModule {
Expand Down
1 change: 0 additions & 1 deletion packages/core/src/commands/command.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ describe('CommandService', () => {
const item = await commandService.publishSync(inputItem, {
invokeContext: {},
})
console.log('$@#$@', item)
expect(item).toBeDefined()
expect(item).toMatchObject({
...inputItem,
Expand Down
Loading
Loading