Skip to content

Commit

Permalink
Merge pull request #100 from mbc-net/feat/sfn-alarm
Browse files Browse the repository at this point in the history
[core]: Publish an SNS event when a Step Function gets error
  • Loading branch information
koichimurakami authored Dec 20, 2024
2 parents 8625b29 + 4960765 commit 3e051a3
Show file tree
Hide file tree
Showing 23 changed files with 176 additions and 47 deletions.
2 changes: 1 addition & 1 deletion lerna.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"$schema": "node_modules/lerna/schemas/lerna-schema.json",
"version": "0.1.43-beta.0",
"version": "0.1.44-beta.0",
"packages": ["packages/*"]
}
16 changes: 8 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@mbc-cqrs-serverless/cli",
"version": "0.1.43-beta.0",
"version": "0.1.44-beta.0",
"description": "a CLI to get started with MBC CQRS serverless framework",
"keywords": [
"mbc",
Expand Down
1 change: 1 addition & 0 deletions packages/cli/templates/.env.local
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ SFN_COMMAND_ARN=arn:aws:states:ap-northeast-1:101010101010:stateMachine:command
SNS_ENDPOINT=http://localhost:4002
SNS_REGION=ap-northeast-1
SNS_TOPIC_ARN=arn:aws:sns:ap-northeast-1:101010101010:CqrsSnsTopic
SNS_ALARM_TOPIC_ARN=arn:aws:sns:ap-northeast-1:101010101010:AlarmSnsTopic
# Cognito endpoint, useful for local development
COGNITO_URL=http://localhost:9229
COGNITO_USER_POOL_ID=local_2G7noHgW
Expand Down
5 changes: 5 additions & 0 deletions packages/cli/templates/infra-local/elasticmq.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ queues {
delay = 5 seconds
receiveMessageWait = 0 seconds
}
alarm-queue {
defaultVisibilityTimeout = 60 seconds
delay = 5 seconds
receiveMessageWait = 0 seconds
}
}

aws {
Expand Down
4 changes: 4 additions & 0 deletions packages/cli/templates/infra-local/serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ custom:
['notification-action', 'command-status', 'task-status'],
}
queue: http://localhost:9324/101010101010/notification-queue
- topic:
topicName: AlarmSnsTopic
rawMessageDelivery: 'true'
queue: http://localhost:9324/101010101010/alarm-queue
# host: 0.0.0.0 # Optional, defaults to 127.0.0.1 if not provided to serverless-offline
# sns-endpoint: http://127.0.0.1:4567 # Optional. Only if you want to use a custom SNS provider endpoint
# sns-subscribe-endpoint: http://127.0.0.1:3000 # Optional. Only if you want to use a custom subscribe endpoint from SNS to send messages back to
Expand Down
3 changes: 3 additions & 0 deletions packages/cli/templates/infra/libs/infra-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ export class InfraStack extends cdk.Stack {
S3_BUCKET_NAME: ddbBucket.bucketName,
SFN_COMMAND_ARN: commandSfnArn,
SNS_TOPIC_ARN: mainSns.topicArn,
SNS_ALARM_TOPIC_ARN: alarmSns.topicArn,
COGNITO_USER_POOL_ID: userPool.userPoolId,
APPSYNC_ENDPOINT: appSyncApi.graphqlUrl,
SES_FROM_EMAIL: props.config.fromEmailAddress,
Expand Down Expand Up @@ -838,6 +839,7 @@ export class InfraStack extends cdk.Stack {
ddbBucket.grantReadWrite(lambdaApi)
publicBucket.grantReadWrite(lambdaApi)
mainSns.grantPublish(lambdaApi)
alarmSns.grantPublish(lambdaApi)
taskSqs.grantSendMessages(lambdaApi)
notifySqs.grantSendMessages(lambdaApi)
appSyncApi.grantMutation(lambdaApi)
Expand Down Expand Up @@ -912,6 +914,7 @@ export class InfraStack extends cdk.Stack {
ddbBucket.grantReadWrite(taskRole)
publicBucket.grantReadWrite(taskRole)
mainSns.grantPublish(taskRole)
alarmSns.grantPublish(taskRole)
taskSqs.grantSendMessages(taskRole)
notifySqs.grantSendMessages(taskRole)
appSyncApi.grantMutation(taskRole)
Expand Down
10 changes: 10 additions & 0 deletions packages/cli/templates/infra/test/__snapshots__/infra.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,9 @@ exports[`snapshot test for InfraStack 1`] = `
},
"SES_FROM_EMAIL": "[email protected]",
"SFN_COMMAND_ARN": "arn:aws:states:ap-northeast-1:101010101010:stateMachine:dev-cdk-test-deploy-command-handler",
"SNS_ALARM_TOPIC_ARN": {
"Ref": "alarmsnsFB7BBC3B",
},
"SNS_TOPIC_ARN": {
"Ref": "mainsnsC0381B34",
},
Expand Down Expand Up @@ -994,6 +997,13 @@ exports[`snapshot test for InfraStack 1`] = `
"Ref": "mainsnsC0381B34",
},
},
{
"Action": "sns:Publish",
"Effect": "Allow",
"Resource": {
"Ref": "alarmsnsFB7BBC3B",
},
},
{
"Action": [
"sqs:SendMessage",
Expand Down
3 changes: 2 additions & 1 deletion packages/core/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ LOG_LEVEL=verbose # debug, verbose, info, warn, error, fatal
# disable event route for API GW integration
EVENT_SOURCE_DISABLED=false
# DynamoDB endpoint, useful for local development
DYNAMODB_ENDPOINT=http://0.0.0.0:8000
DYNAMODB_ENDPOINT=http://0.0.0.0:8000
DYNAMODB_REGION=ap-northeast-1
# set the limit size for `attributes` of object in DDB
ATTRIBUTE_LIMIT_SIZE=389120 # bytes, refer to https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ServiceQuotas.html#limits-attributes
Expand All @@ -28,6 +28,7 @@ SFN_COMMAND_ARN=arn:aws:states:ap-northeast-1:101010101010:stateMachine:command
SNS_ENDPOINT=http://0.0.0.0:4002
SNS_REGION=ap-northeast-1
SNS_TOPIC_ARN=arn:aws:sns:ap-northeast-1:101010101010:CqrsSnsTopic
SNS_ALARM_TOPIC_ARN=arn:aws:sns:ap-northeast-1:101010101010:AlarmSnsTopic
# Cognito endpoint, useful for local development
COGNITO_URL=http://0.0.0.0:9229
COGNITO_USER_POOL_ID=local_2G7noHgW
Expand Down
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@mbc-cqrs-serverless/core",
"version": "0.1.43-beta.0",
"version": "0.1.44-beta.0",
"description": "CQRS and event base core",
"keywords": [
"mbc",
Expand Down
44 changes: 42 additions & 2 deletions packages/core/src/commands/command.event.handler.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { DataSyncDdsHandler } from './handlers/data-sync-dds.handler'
import { HistoryService } from './history.service'
import { DataSyncCommandSfnName } from '../command-events/sfn-name.enum'
import { TtlService } from './ttl.service'
import { SnsClientFactory } from '../queue/sns-client-factory'

export class MockedHandler implements IDataSyncHandler {
async up(cmd: CommandModel): Promise<any> {
Expand Down Expand Up @@ -127,6 +128,8 @@ const sfnFinishDataEvent = createEvent(DataSyncCommandSfnName.FINISH)
const keys = {
NODE_ENV: 'env',
APP_NAME: 'app_name',
SNS_TOPIC_ARN: 'main_topic_arn',
SNS_ALARM_TOPIC_ARN: 'alarm_topic_arn',
}

describe('DataSyncCommandSfnEventHandler', () => {
Expand All @@ -147,6 +150,7 @@ describe('DataSyncCommandSfnEventHandler', () => {
SnsService,
MockedHandler,
TtlService,
SnsClientFactory,
{
provide: MODULE_OPTIONS_TOKEN,
useValue: {
Expand Down Expand Up @@ -240,6 +244,30 @@ describe('DataSyncCommandSfnEventHandler', () => {
expect(result).toEqual(expect.objectContaining({ result: -1 }))
})

it('should publish sns alarm when executing the stale check version event ', async () => {
// Arrange
dynamoDBMock.on(UpdateItemCommand).resolves({} as any)
snsMock.on(PublishCommand).resolves({} as any)
dynamoDBMock.on(GetItemCommand).resolves({
Item: {
version: {
N: '1',
},
},
})

// Action
const result = await commandEventHandler.execute(sfnCheckVersionEvent)

// Assert
expect(snsMock).toHaveReceivedCommandTimes(PublishCommand, 3)
expect(snsMock).toHaveReceivedNthCommandWith(2, PublishCommand, {
Message: expect.stringMatching(
/(?=.*"action":"sfn-alarm")(?=.*next version must be 2 but got 1)/g,
),
})
})

it('should call the AWS service with the correct parameters when executing the correct check version event', async () => {
// Arrange
dynamoDBMock.on(UpdateItemCommand).resolves({} as any)
Expand Down Expand Up @@ -447,7 +475,7 @@ describe('DataSyncCommandSfnEventHandler', () => {
})
})

it('should throw not found handler error when executing the sync data event', async () => {
it('should throw not found handler error and publish sns alarm when executing the sync data event', async () => {
// Arrange
dynamoDBMock.on(UpdateItemCommand).resolves({} as any)
snsMock.on(PublishCommand).resolves({} as any)
Expand All @@ -461,9 +489,15 @@ describe('DataSyncCommandSfnEventHandler', () => {
}),
),
).rejects.toThrow('SyncDataHandler not found!')

expect(snsMock).toHaveReceivedNthCommandWith(3, PublishCommand, {
Message: expect.stringMatching(
/(?=.*"action":"sfn-alarm")(?=.*Error: SyncDataHandler not found!)/g,
),
})
})

it('should throw empty handler error when executing the sync data event', async () => {
it('should throw empty handler error and publish sns alarm when executing the sync data event', async () => {
// Arrange
dynamoDBMock.on(UpdateItemCommand).resolves({} as any)
snsMock.on(PublishCommand).resolves({} as any)
Expand All @@ -478,6 +512,12 @@ describe('DataSyncCommandSfnEventHandler', () => {
}),
),
).rejects.toThrow('SyncDataHandler empty!')

expect(snsMock).toHaveReceivedNthCommandWith(3, PublishCommand, {
Message: expect.stringMatching(
/(?=.*"action":"sfn-alarm")(?=.*Error: SyncDataHandler empty!)/g,
),
})
})

it('should call handler up when executing the correct sync data event', async () => {
Expand Down
37 changes: 35 additions & 2 deletions packages/core/src/commands/command.event.handler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Inject, Injectable, Logger } from '@nestjs/common'
import { ConfigService } from '@nestjs/config'

import {
DataSyncCommandSfnEvent,
Expand All @@ -7,7 +8,8 @@ import {
import { DataSyncCommandSfnName } from '../command-events/sfn-name.enum'
import { S3Service } from '../data-store'
import { removeSortKeyVersion } from '../helpers/key'
import { CommandModuleOptions } from '../interfaces'
import { CommandModuleOptions, INotification } from '../interfaces'
import { SnsService } from '../queue'
import { MODULE_OPTIONS_TOKEN } from './command.module-definition'
import { CommandService } from './command.service'
import { DataService } from './data.service'
Expand All @@ -17,6 +19,7 @@ import { HistoryService } from './history.service'
@Injectable()
export class CommandEventHandler {
private readonly logger: Logger
private readonly alarmTopicArn: string

constructor(
@Inject(MODULE_OPTIONS_TOKEN)
Expand All @@ -25,10 +28,13 @@ export class CommandEventHandler {
private readonly dataService: DataService,
private readonly historyService: HistoryService,
private readonly s3Service: S3Service,
private readonly snsService: SnsService,
private readonly config: ConfigService,
) {
this.logger = new Logger(
`${CommandEventHandler.name}:${this.options.tableName}`,
)
this.alarmTopicArn = this.config.get<string>('SNS_ALARM_TOPIC_ARN')
}

async execute(
Expand All @@ -55,6 +61,7 @@ export class CommandEventHandler {
getCommandStatus(event.stepStateName, CommandStatus.STATUS_FAILED),
event.commandRecord.requestId,
)
await this.publishAlarm(event, (error as Error).stack)
throw error
}
}
Expand Down Expand Up @@ -125,12 +132,15 @@ export class CommandEventHandler {
}
}

return {
const errorDetails = {
result: -1,
error: 'version is not match',
cause:
'next version must be ' + nextVersion + ' but got ' + new String(1),
}

await this.publishAlarm(event, errorDetails)
return errorDetails
}

protected async setTtlCommand(
Expand Down Expand Up @@ -198,4 +208,27 @@ export class CommandEventHandler {

return null
}

protected async publishAlarm(
event: DataSyncCommandSfnEvent,
errorDetails: any,
): Promise<void> {
this.logger.debug('event', event)
const alarm: INotification = {
action: 'sfn-alarm',
id: `${event.commandKey.pk}#${event.commandKey.sk}`,
table: this.options.tableName,
pk: event.commandKey.pk,
sk: event.commandKey.sk,
tenantCode: event.commandKey.pk.substring(
event.commandKey.pk.indexOf('#') + 1,
),
content: {
errorMessage: errorDetails,
sfnId: event.source,
},
}
this.logger.error('alarm:::', alarm)
await this.snsService.publish<INotification>(alarm, this.alarmTopicArn)
}
}
2 changes: 1 addition & 1 deletion packages/core/src/commands/ttl.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export class TtlService implements ITtlService {
startDate?: Date,
): Promise<number | null> {
const numberOfDay = await this.getTtlConfiguration(type, tenantCode)
this.logger.log('numberOfDay', numberOfDay)
this.logger.debug('numberOfDay', numberOfDay)
return numberOfDay ? this.calculateUnixTime(numberOfDay, startDate) : null
}

Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/queue/queue.module.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Global, Module } from '@nestjs/common'

import { SnsService } from './sns.service'
import { SnsClientFactory } from './sns-client-factory'

@Global()
@Module({
providers: [SnsService],
providers: [SnsService, SnsClientFactory],
exports: [SnsService],
})
export class QueueModule {}
21 changes: 21 additions & 0 deletions packages/core/src/queue/sns-client-factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { SNSClient } from '@aws-sdk/client-sns'
import { Injectable } from '@nestjs/common'
import { ConfigService } from '@nestjs/config'

@Injectable()
export class SnsClientFactory {
private clients = new Map<string, SNSClient>()

constructor(private readonly config: ConfigService) {}

getClient(topicArn: string): SNSClient {
if (!this.clients.has(topicArn)) {
const snsClient = new SNSClient({
endpoint: this.config.get<string>('SNS_ENDPOINT'),
region: this.config.get<string>('SNS_REGION'),
})
this.clients.set(topicArn, snsClient)
}
return this.clients.get(topicArn)
}
}
Loading

0 comments on commit 3e051a3

Please sign in to comment.