Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gateway #10

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 138 additions & 79 deletions getSignedURL/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,19 @@ import {
} from '@aws-sdk/lib-dynamodb'
import { InvocationRequest } from 'aws-sdk/clients/lambda'

// @ts-ignore
const S3_BUCKET = process.env.UploadBucket

// @ts-ignore
// AWS.config.update({region: 'us-east-1'})
AWS.config.update({ region: process.env.AWS_REGION })
const client = new DynamoDBClient({})
const dynamo = DynamoDBDocumentClient.from(client)
const lambda = new AWS.Lambda()
const tableName = 'metaStore'

const stackName = process.env.STACK_NAME;
const tableName = `${stackName}-metastore`;

const s3 = new AWS.S3({
signatureVersion: 'v4'
})
Expand All @@ -45,7 +51,7 @@ export const handler = async event => {
return await getUploadURL(event).catch(error => {
console.error('Error:', error)
return {
status: 500,
statusCode: 500,
body: JSON.stringify({
message: 'Internal Server Error',
error: error.message
Expand All @@ -54,6 +60,13 @@ export const handler = async event => {
})
}

interface CRDTEntry {
data: string;
cid: string;
parents: string[];
}


const getUploadURL = async function (event) {
const { queryStringParameters } = event
const type = queryStringParameters.type
Expand All @@ -74,106 +87,128 @@ const getUploadURL = async function (event) {
})
} else if (type === 'meta') {
return await metaUploadParams(queryStringParameters, event)
} else if (type === 'wal') {

s3Params = walUploadParams(queryStringParameters, event)

const uploadURL = await s3.getSignedUrlPromise('putObject', s3Params)

return JSON.stringify({
uploadURL: uploadURL,
Key: s3Params.Key
})
} else {
throw new Error('Unsupported upload type: ' + type)
}
}

async function invokelambda(event, tableName, dbname) {
const command = new QueryCommand({
ExpressionAttributeValues: {
":v1": {
S: dbname,
},
},
ExpressionAttributeNames: {
"#nameAttr": "name",
"#dataAttr": "data",
},
KeyConditionExpression: "#nameAttr = :v1",
ProjectionExpression: "cid, #dataAttr",
TableName: tableName,
});
const data = await dynamo.send(command)
let items:{ [key: string]: any; }[] = []
if (data.Items && data.Items.length > 0) {
items = data.Items.map((item) => AWS.DynamoDB.Converter.unmarshall(item));
}
// async function invokelambda(event, tableName, dbname) {
// const commandArgs = {
// ExpressionAttributeValues: {
// ":v1": {
// S: dbname,
// },
// },
// ExpressionAttributeNames: {
// "#nameAttr": "name",
// "#dataAttr": "data",
// },
// KeyConditionExpression: "#nameAttr = :v1",
// ProjectionExpression: "cid, #dataAttr",
// TableName: tableName,
// };
// console.log('invokelambda QueryCommand Args:', commandArgs);
// const command = new QueryCommand(commandArgs);
// const data = await dynamo.send(command)
// let items: { [key: string]: any; }[] = []

// if (data.Items && data.Items.length > 0) {
// items = data.Items.map((item) => {
// console.log('Before unmarshall:', item);
// const unmarshalledItem = AWS.DynamoDB.Converter.unmarshall(item);
// console.log('After unmarshall:', unmarshalledItem);
// return unmarshalledItem;
// });
// }

event.body = JSON.stringify({
action: "sendmessage",
data: JSON.stringify({ items }),
});
// event.body = JSON.stringify({
// action: "sendmessage",
// data: JSON.stringify(items),
// });

event.API_ENDPOINT = process.env.API_ENDPOINT;
let str = dbname;
let extractedName = str.match(/\.([^.]+)\./)[1]
event.databasename=extractedName;
// event.API_ENDPOINT = process.env.API_ENDPOINT;
// // let str = dbname;
// // let extractedName = str.match(/\.([^.]+)\./)[1]
// event.databasename = dbname;

const params:InvocationRequest = {
FunctionName: process.env.SendMessage as string,
InvocationType: "RequestResponse",
Payload: JSON.stringify(event),
}
// const params: InvocationRequest = {
// FunctionName: process.env.SendMessage as string,
// InvocationType: "RequestResponse",
// Payload: JSON.stringify(event),
// }

const returnedresult:any = await lambda.invoke(params).promise();
const result = JSON.parse(returnedresult.Payload);
return result;
}
// console.log('Invoking Lambda with Params:', params);
// const returnedresult: any = await lambda.invoke(params).promise();
// const result = JSON.parse(returnedresult.Payload);
// return result;
// }

async function metaUploadParams(queryStringParameters, event) {
const name = queryStringParameters.name
const httpMethod = event.requestContext.http.method
if (httpMethod == 'PUT') {
const requestBody = JSON.parse(event.body)
console.log('Event:', JSON.stringify(event, null, 2));
console.log('QueryStringParameters:', JSON.stringify(queryStringParameters, null, 2));
console.log('HTTP Method:', httpMethod);
console.log('TableName:', tableName);
const requestBody = JSON.parse(event.body) as CRDTEntry[]
if (requestBody) {
const { data, cid, parents } = requestBody
const { data, cid, parents } = requestBody[0]
if (!data || !cid) {
throw new Error('Missing data or cid from the metadata:' + event.rawQueryString)
}

//name is the partition key and cid is the sort key for the DynamoDB table
await dynamo.send(
new PutCommand({
const putCommand = new PutCommand({
TableName: tableName,
Item: {
name: name,
cid: cid,
data: JSON.stringify(requestBody[0])
}
});
console.log('PutCommand:', putCommand);
await dynamo.send(putCommand);

for (const p of parents) {
const deleteCommand = new DeleteCommand({
TableName: tableName,
Item: {
Key: {
name: name,
cid: cid,
data: data
cid: p
}
})
)

for (const p of parents) {
await dynamo.send(
new DeleteCommand({
TableName: tableName,
Key: {
name: name,
cid: p
}
})
)
});
console.log('DeleteCommand:', deleteCommand);
await dynamo.send(deleteCommand);
}

try {
const result = await invokelambda(event, tableName, name)
console.log("This is the response", result)
} catch (error) {
console.log(error, "This is the error when calling other Lambda")
return {
statusCode: 500,
body: JSON.stringify({ error: "Failed to connected to websocket server" }),
};
}
// void invokelambda(event, tableName, name).then((result) => {
// console.log("This is the response", result)
// }).catch((error) => {
// console.log(error, "This is the error when calling other Lambda")
// // return {
// // statusCode: 500,
// // body: JSON.stringify({ error: "Failed to connected to websocket server" }),
// // };
// });

return {
status: 201,
statusCode: 201,
body: JSON.stringify({ message: 'Metadata has been added' })
}
} else {
return {
status: 400,
statusCode: 400,
body: JSON.stringify({ message: 'JSON Payload data not found!' })
}
}
Expand All @@ -196,27 +231,52 @@ async function metaUploadParams(queryStringParameters, event) {
// const data = await dynamoDB.scan(params).promise();
//This means items is an array of objects where each object contains a string key and a value of any type
//: { [key: string]: any; }[]
// console.log('dynamo result',data)
let items: { [key: string]: any }[] = []
if (data.Items && data.Items.length > 0) {
items = data.Items.map(item => AWS.DynamoDB.Converter.unmarshall(item))
items = data.Items.map((item) => {
// console.log('Before unmarshall:', item);
const dataString = item.data.S;
// console.log('Data string:', dataString);
return JSON.parse(dataString);
});
console.log('getmeta Items:', items);
return {
status: 200,
body: JSON.stringify({ items })
statusCode: 200,
body: JSON.stringify(items)
}
} else {
return {
status: 200,
body: JSON.stringify({ items: [] })
statusCode: 200,
body: JSON.stringify([])
}
}
} else {
return {
status: 400,
statusCode: 400,
body: JSON.stringify({ message: 'Invalid HTTP method' })
}
}
}

function walUploadParams(queryStringParameters, event) {
const name = queryStringParameters.name
if (!name) {
throw new Error('Missing name query parameter: ' + event.rawQueryString)
}

const Key = `wal/${name}.wal`

const s3Params = {
Bucket: S3_BUCKET,
Key,
Expires: URL_EXPIRATION_SECONDS,
ContentType: 'application/octet-stream',
ACL: 'public-read'
}
return s3Params
}

function carUploadParams(queryStringParameters, event, type) {
const name = queryStringParameters.name
const carCid = queryStringParameters.car
Expand All @@ -230,8 +290,7 @@ function carUploadParams(queryStringParameters, event, type) {
const Key = `${type}/${name}/${cid.toString()}.car`

const s3Params = {
// @ts-ignore
Bucket: process.env.UploadBucket,
Bucket: S3_BUCKET,
Key,
Expires: URL_EXPIRATION_SECONDS,
ContentType: 'application/car',
Expand Down
5 changes: 1 addition & 4 deletions onconnect/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import {
PutCommand,
} from "@aws-sdk/lib-dynamodb";

AWS.config.update({ region: process.env.AWS_REGION });
const client = new DynamoDBClient({});
const dynamo = DynamoDBDocumentClient.from(client);
const tableName = process.env.TABLE_NAME;
Expand All @@ -31,7 +30,7 @@ export const handler = async (event) => {
return await addSubscribers(event).catch((error) => {
console.error("Error:", error);
return {
status: 500,
statusCode: 500,
body: JSON.stringify({
message: "Internal Server Error",
error: error.message,
Expand All @@ -41,10 +40,8 @@ export const handler = async (event) => {
};

async function addSubscribers(event) {
// console.log("This is the route key", event.requestContext.routeKey);
const { queryStringParameters } = event;
const database = queryStringParameters.database;
// console.log("This is the request context", event.requestContext);
try {
await dynamo.send(
new PutCommand({
Expand Down
10 changes: 2 additions & 8 deletions ondisconnect/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import {
DeleteCommand,
} from "@aws-sdk/lib-dynamodb";

AWS.config.update({ region: process.env.AWS_REGION });
const client = new DynamoDBClient({});
const dynamo = DynamoDBDocumentClient.from(client);
const tableName = process.env.TABLE_NAME;
Expand All @@ -31,7 +30,7 @@ export const handler = async (event) => {
return await deleteSubscribers(event).catch((error) => {
console.error("Error:", error);
return {
status: 500,
statusCode: 500,
body: JSON.stringify({
message: "Internal Server Error",
error: error.message,
Expand All @@ -41,22 +40,17 @@ export const handler = async (event) => {
};

async function deleteSubscribers(event) {
// console.log("This is the route key", event.requestContext.routeKey);
// const { queryStringParameters } = event;
// const database = queryStringParameters.database;
// console.log("This is the request context", event.requestContext);
try {
await dynamo.send(
new DeleteCommand({
TableName: tableName,
Key: {
connectionId: event.requestContext.connectionId,
// databasename: database,
},
})
);
} catch (err) {
console.log("Error deleting susbcriber", err);
console.log("Error deleting subscriber", err);
return {
statusCode: 500,
body: "Failed to disconnect: " + JSON.stringify(err),
Expand Down
8 changes: 8 additions & 0 deletions sam2config.tml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version = 0.1
[default.deploy.parameters]
stack_name = "crdt"
resolve_s3 = true
s3_prefix = "crdt"
region = "us-east-2"
capabilities = "CAPABILITY_IAM"
image_repositories = []
Loading