Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added stream support #5

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ myTable:
- AttributeName: id
KeyType: HASH
region: us-east-1
delete: true
stream: true #optional
streamViewType: NEW_IMAGE #optional
```

### 4. Deploy
Expand Down
31 changes: 22 additions & 9 deletions serverless.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
const { mergeDeepRight, pick, equals } = require('ramda')
const AWS = require('aws-sdk')
const { Component } = require('@serverless/core')
const { createTable, deleteTable, describeTable, updateTable, configChanged } = require('./utils')
const { createTable, deleteTable, describeTable, updateTable, configChanged, validate, getStreamArn } = require('./utils')

const outputsList = ['name', 'arn', 'region']
const outputsList = ['name', 'arn', 'region', 'streamArn']

const defaults = {
attributeDefinitions: [
Expand All @@ -18,8 +18,11 @@ const defaults = {
KeyType: 'HASH'
}
],

region: 'us-east-1',
name: false,
region: 'us-east-1'
stream: false,
streamViewType: 'NEW_IMAGE'
}

const setTableName = (component, inputs, config) => {
Expand Down Expand Up @@ -61,30 +64,39 @@ class AwsDynamoDb extends Component {
setTableName(this, inputs, config)

const prevTable = await describeTable({ dynamodb, name: this.state.name })

if (!prevTable) {
validate.streamViewType(inputs)
this.context.status('Creating')
this.context.debug(`Table ${config.name} does not exist. Creating...`)

config.arn = await createTable({ dynamodb, ...config })
const { tableArn, streamArn } = await createTable({ dynamodb, ...config })
config.arn = tableArn
config.streamArn = streamArn
} else {
validate.streamViewType(inputs)
validate.streamViewTypeUpdate(this, prevTable, inputs)
this.context.debug(`Table ${config.name} already exists. Comparing config changes...`)

config.arn = prevTable.arn

config.streamArn = prevTable.streamArn
config.streamEnabled = inputs.stream
config.streamViewType = inputs.streamViewType

if (configChanged(prevTable, config)) {
this.context.status('Updating')
this.context.debug(`Config changed for table ${config.name}. Updating...`)

if (!equals(prevTable.name, config.name)) {
// If "delete: false", don't delete the table
if (config.delete === false) {
throw new Error(`You're attempting to change your table name from ${this.state.name} to ${config.name} which will result in you deleting your table, but you've specified the "delete" input to "false" which prevents your original table from being deleted.`)
}

await deleteTable({ dynamodb, name: prevTable.name })
config.arn = await createTable({ dynamodb, ...config })
} else {
await updateTable({ dynamodb, ...config })
const { streamArn } = await updateTable({ prevTable, dynamodb, ...config })
config.streamArn = streamArn
}
}
}
Expand All @@ -95,12 +107,13 @@ class AwsDynamoDb extends Component {

this.state.arn = config.arn
this.state.name = config.name
this.state.stream = config.streamArn
this.state.streamViewType = config.streamViewType
this.state.region = config.region
this.state.delete = config.delete === false ? config.delete : true
await this.save()

const outputs = pick(outputsList, config)

return outputs
}

Expand Down
4 changes: 3 additions & 1 deletion testProjects/serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ table:
component: ../
inputs:
name: test-table
delete: false
delete: true
stream: true
streamViewType: NEW_IMAGE
111 changes: 101 additions & 10 deletions utils.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,69 @@
const { not, equals, pick } = require('ramda')

async function createTable({ dynamodb, name, attributeDefinitions, keySchema }) {
const validate = {
streamViewType: (inputs) => {
if (!inputs.streamViewType) {
return
}

const validStreamTypes = ['NEW_IMAGE', 'OLD_IMAGE', 'NEW_AND_OLD_IMAGES', 'KEYS_ONLY']
if (!validStreamTypes.includes(inputs.streamViewType)) {
throw Error(`${inputs.streamViewType} is not a valid streamViewType.`)
}
},

streamViewTypeUpdate: (comp, previousTable, inputs) => {
if (!previousTable.streamArn || !inputs.streamViewType) {
return
}

if (comp.state.stream && inputs.stream && comp.state.streamViewType !== inputs.streamViewType) {
throw Error(`You cannot change the view type of an existing DynamoDB stream.`)
}
}
}
Comment on lines +3 to +24
Copy link
Contributor Author

@dodgeblaster dodgeblaster Dec 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added validation around stream types for creating and updating tables.
streamViewType just confirms they are using a correct view type, and will throw an error if they are not

streamViewTypeUpdate is a bit more subtle. If you change the viewType on an table that already has a stream, it wont do anything. Its possible people might assume it all worked unless they see a message explaining nothing actually happened. So I made an error for it.

@eahefnawy is throwing errors the best way to give messages like this in the CLI?


async function getStreamArn ({dynamodb, name}) {
const maxTries = 5
let tries = 0

const getStreamArn = async () => {
if (tries > maxTries) {
throw Error(`There was a problem getting the arn for your DynamoDB stream. Please try again.`)
}

const {streamArn } = await describeTable({ dynamodb, name})
if (!streamArn && tries <= maxTries) {
tries++
const sleep = ms => new Promise(r => setTimeout(r,ms))
await sleep(3000)
return await getStreamArn()
}
return streamArn
}

return await getStreamArn()
}
Comment on lines +26 to +46
Copy link
Contributor Author

@dodgeblaster dodgeblaster Dec 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is used in the update table function below.
I found that if I created a table with a stream, I would get returnValueFromCreateTable.TableDescription.LatestStreamArn successfully. But if I created a table with no stream, then set the stream to true in an update, the return value of the updateTable sdk call woud have returnValueFromUpdatTable.TableDescription.LatestStreamArn set to undefined (I am assuming that is because it takes a bit of time to register?). Running sls a second time will correctly give you the stream arn.

Anyways, that is a problem, because users will likely depend on getting a stream arn back from the outputs of this component. So the above function will try to get the stream arn, if it doesnt, it will wait 3 seconds and try again. It will do this 5 more times until it gets a stream arn. If it does not get it in 5 tries, it will throw an error.

The error does not mean the stream was not created, it just means we did not get the stream arn, which might be really important if the user is depending on the stream arn in other components.


async function createTable({ dynamodb, name, attributeDefinitions, keySchema, stream, streamViewType = false }) {
const res = await dynamodb
.createTable({
TableName: name,
AttributeDefinitions: attributeDefinitions,
KeySchema: keySchema,
BillingMode: 'PAY_PER_REQUEST'
BillingMode: 'PAY_PER_REQUEST',
...(stream && {
StreamSpecification: {
StreamEnabled: true,
StreamViewType: streamViewType
}
dodgeblaster marked this conversation as resolved.
Show resolved Hide resolved
})
})
.promise()
return res.TableDescription.TableArn
return {
tableArn: res.TableDescription.TableArn,
streamArn: res.TableDescription.LatestStreamArn || false
}
}

async function describeTable({ dynamodb, name }) {
Expand All @@ -21,7 +75,14 @@ async function describeTable({ dynamodb, name }) {
arn: data.Table.TableArn,
name: data.Table.TableName,
attributeDefinitions: data.Table.AttributeDefinitions,
keySchema: data.Table.KeySchema
keySchema: data.Table.KeySchema,
streamArn: data.Table.LatestStreamArn,
streamEnabled: data.Table.StreamSpecification
? data.Table.StreamSpecification.StreamEnabled
: false,
streamViewType: data.Table.StreamSpecification
? data.Table.StreamSpecification.StreamViewType
: false
}
} catch (error) {
if (error.code === 'ResourceNotFoundException') {
Expand All @@ -32,15 +93,41 @@ async function describeTable({ dynamodb, name }) {
}
}

async function updateTable({ dynamodb, name, attributeDefinitions }) {
async function updateTable({prevTable, dynamodb, name, attributeDefinitions, stream, streamViewType }) {
const disableStream = prevTable.streamEnabled && !stream
const enableStream = !prevTable.streamEnabled && stream

const res = await dynamodb
.updateTable({
TableName: name,
AttributeDefinitions: attributeDefinitions,
BillingMode: 'PAY_PER_REQUEST'
BillingMode: 'PAY_PER_REQUEST',

...(disableStream && {
StreamSpecification: {
StreamEnabled: false
}
}),
Comment on lines +106 to +110
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ive added this disableStream flag here because the SDK will error if you try to disable a non existing stream. So this flag insures we only add this to the object if the inputs are disabling a stream that exists.


...(enableStream && {
StreamSpecification: {
StreamEnabled: true,
StreamViewType: streamViewType
}
})
})
.promise()
return res.TableDescription.TableArn


let streamArn = res.TableDescription.LatestStreamArn || false
if (stream && !res.TableDescription.LatestStreamArn) {
streamArn = await getStreamArn({dynamodb, name})
}

return {
tableArn: res.TableDescription.TableArn,
streamArn
}
}

async function deleteTable({ dynamodb, name }) {
Expand All @@ -60,16 +147,20 @@ async function deleteTable({ dynamodb, name }) {
}

function configChanged(prevTable, table) {
const prevInputs = pick(['name', 'attributeDefinitions'], prevTable)
const inputs = pick(['name', 'attributeDefinitions'], table)
const prevInputs = pick(['name', 'attributeDefinitions', 'streamArn', 'streamViewType', 'streamEnabled'], prevTable)
const inputs = pick(['name', 'attributeDefinitions', 'streamArn', 'streamViewType', 'streamEnabled'], table)

return not(equals(inputs, prevInputs))
}



module.exports = {
createTable,
describeTable,
updateTable,
deleteTable,
configChanged
configChanged,
validate,
getStreamArn
}