Skip to content

Commit

Permalink
Merge pull request #2 from vtex-apps/feature/event-broadcaster
Browse files Browse the repository at this point in the history
Feature/event broadcaster
  • Loading branch information
gabiruuuuu authored Feb 14, 2022
2 parents e1f3099 + 381e0ef commit f1d2aa1
Show file tree
Hide file tree
Showing 16 changed files with 318 additions and 14 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

### Added

- EventBroadcaster handler and middlewares

## [0.1.0] - 2022-02-09

### Added
Expand Down
11 changes: 11 additions & 0 deletions manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,24 @@
"dependencies": {},
"builders": {
"node": "6.x",
"masterdata": "1.x",
"docs": "0.x"
},
"scripts": {
"prereleasy": "bash lint.sh"
},
"credentialType": "absolute",
"policies": [
{
"name": "ADMIN_DS"
},
{
"name": "outbound-access",
"attrs": {
"host": "api.vtex.com",
"path": "/api/dataentities/*"
}
},
{
"name": "colossus-fire-event"
},
Expand Down
12 changes: 12 additions & 0 deletions masterdata/eventRegistry/schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"$schema": "http://json-schema.org/schema#",
"title": "Event Registry",
"type": "object",
"properties": {
"hasBeenProcessed": {
"type": "boolean"
}
},
"required": ["hasBeenProcessed"],
"v-cache": false
}
11 changes: 10 additions & 1 deletion node/clients/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
import { IOClients } from '@vtex/api'
import { masterDataFor } from '@vtex/clients'
import type { EventRegistry } from 'vtex.spreadsheet-event-broadcaster'

export class Clients extends IOClients {}
export class Clients extends IOClients {
public get eventRegistry() {
return this.getOrSet(
'eventRegistry',
masterDataFor<EventRegistry>('eventRegistry')
)
}
}
25 changes: 23 additions & 2 deletions node/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import type { ClientsConfig, ServiceContext, RecorderState } from '@vtex/api'
import type {
ClientsConfig,
ServiceContext,
RecorderState,
EventContext,
} from '@vtex/api'
import { method, Service } from '@vtex/api'

import { Clients } from './clients'
import { broadcast } from './middlewares/broadcaster/broadcast'
import { splitPayload } from './middlewares/broadcaster/splitPayload'
import { verifyUniqueness } from './middlewares/broadcaster/verifyUniqueness'
import { parseFile } from './middlewares/notify/parseFile'
import { startEventChain } from './middlewares/notify/startEventChain'

Expand All @@ -22,7 +30,17 @@ declare global {

interface State extends RecorderState {
payload: unknown[]
appId: string
senderAppId: string
clientAppId: string
}

interface BroadcasterEventContext extends EventContext<Clients> {
body: {
eventId: string
payload: unknown[]
senderAppId: string
clientAppId: string
}
}
}

Expand All @@ -33,4 +51,7 @@ export default new Service({
POST: [parseFile, startEventChain],
}),
},
events: {
eventBroadcaster: [verifyUniqueness, splitPayload, broadcast],
},
})
20 changes: 20 additions & 0 deletions node/middlewares/broadcaster/broadcast.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { OUTPUT_EVENT_KEY as eventKey } from '../../utils/constants'

export async function broadcast(
ctx: BroadcasterEventContext,
next: () => Promise<unknown>
) {
const {
clients: { events },
body: { payload, senderAppId, clientAppId },
} = ctx

payload.forEach((row) => {
events.sendEvent(clientAppId, eventKey, {
data: row,
senderAppId,
})
})

await next()
}
39 changes: 39 additions & 0 deletions node/middlewares/broadcaster/splitPayload.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { v4 as uuid } from 'uuid'

import {
APP_ID as thisAppId,
EVENT_CHAIN_KEY as eventKey,
ARRAY_SIZE_TARGET,
ARRAY_SPLIT_FACTOR,
} from '../../utils/constants'

export async function splitPayload(
ctx: BroadcasterEventContext,
next: () => Promise<unknown>
) {
const {
clients: { events },
body: { payload, senderAppId, clientAppId },
} = ctx

if (payload.length > ARRAY_SIZE_TARGET) {
const chunkSize = Math.ceil(payload.length / ARRAY_SPLIT_FACTOR)

for (let i = 0; i < payload.length; i += chunkSize) {
const eventId = uuid()

const chunk = payload.slice(i, i + chunkSize)

events.sendEvent(thisAppId, eventKey, {
eventId,
payload: chunk,
senderAppId,
clientAppId,
})
}

return
}

await next()
}
19 changes: 19 additions & 0 deletions node/middlewares/broadcaster/verifyUniqueness.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { EventRegistryService } from '../../services/eventRegistry'

export async function verifyUniqueness(
ctx: BroadcasterEventContext,
next: () => Promise<unknown>
) {
const {
clients: { eventRegistry },
body: { eventId },
} = ctx

const registryService = new EventRegistryService(eventRegistry)

const isUnique = await registryService.isEventUnique(eventId)

if (!isUnique) return

await next()
}
9 changes: 6 additions & 3 deletions node/middlewares/notify/parseFile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ import {
MAXIMUM_FILE_SIZE,
MAXIMUM_FILE_SIZE_STRING,
} from '../../utils/constants'
import { asyncBusboyWrapper } from '../../utils/parsing'
import { asyncBusboyWrapper, senderAppIdFromHeaders } from '../../utils/parsing'

export async function parseFile(ctx: Context, next: () => Promise<unknown>) {
const {
req,
vtex: { logger },
} = ctx

const senderAppId = senderAppIdFromHeaders(req.headers)

const {
fields: { appId },
fields: { appId: clientAppId },
files: [file],
} = await asyncBusboyWrapper<NotifyInputParameters>(req)

Expand Down Expand Up @@ -55,7 +57,8 @@ export async function parseFile(ctx: Context, next: () => Promise<unknown>) {
const payload = utils.sheet_to_json(sheet)

ctx.state.payload = payload
ctx.state.appId = appId ?? ''
ctx.state.senderAppId = senderAppId
ctx.state.clientAppId = clientAppId ?? ''
} catch (error) {
logger.error(error.message)

Expand Down
10 changes: 7 additions & 3 deletions node/middlewares/notify/startEventChain.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import { v4 as uuid } from 'uuid'

import { APP_ID as thisAppId } from '../../utils/constants'
import {
APP_ID as thisAppId,
EVENT_CHAIN_KEY as eventKey,
} from '../../utils/constants'

export async function startEventChain(ctx: Context) {
const {
state: { payload, appId: clientAppId },
state: { payload, senderAppId, clientAppId },
clients: { events },
} = ctx

const eventId = uuid()

events.sendEvent(thisAppId, 'spreadsheet.event.broadcast', {
events.sendEvent(thisAppId, eventKey, {
eventId,
payload,
senderAppId,
clientAppId,
})

Expand Down
4 changes: 3 additions & 1 deletion node/package.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"dependencies": {
"@vtex/clients": "^2.19.4",
"async-busboy": "^1.1.0",
"co-body": "^6.0.0",
"get-stream": "^6.0.1",
Expand All @@ -17,7 +18,8 @@
"@vtex/api": "6.45.6",
"@vtex/test-tools": "^1.0.0",
"@vtex/tsconfig": "^0.5.6",
"typescript": "3.9.7"
"typescript": "3.9.7",
"vtex.spreadsheet-event-broadcaster": "http://vtex.vtexassets.com/_v/public/typings/v1/[email protected]/public/_types/react"
},
"scripts": {
"lint": "tsc --noEmit --pretty"
Expand Down
8 changes: 7 additions & 1 deletion node/service.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"memory": 256,
"ttl": 10,
"timeout": 2,
"timeout": 30,
"minReplicas": 2,
"maxReplicas": 4,
"workers": 1,
Expand All @@ -18,5 +18,11 @@
}
]
}
},
"events": {
"eventBroadcaster": {
"sender": "vtex.spreadsheet-event-broadcaster",
"keys": ["event.chain"]
}
}
}
26 changes: 26 additions & 0 deletions node/services/eventRegistry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import type { MasterDataEntity } from '@vtex/clients'
import type { EventRegistry } from 'vtex.spreadsheet-event-broadcaster'

import { MASTER_DATA_NO_CHANGES_RESPONSE_STATUS } from '../utils/constants'

export class EventRegistryService {
private client: MasterDataEntity<EventRegistry>
constructor(client: MasterDataEntity<EventRegistry>) {
this.client = client
}

public async isEventUnique(eventId: string) {
try {
await this.client.saveOrUpdate({
id: eventId,
hasBeenProcessed: true,
})
} catch (error) {
if (error?.response?.status === MASTER_DATA_NO_CHANGES_RESPONSE_STATUS) {
return false
}
}

return true
}
}
12 changes: 11 additions & 1 deletion node/utils/constants.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
export const APP_ID = '[email protected]'
export const APP_ID = 'vtex.spreadsheet-event-broadcaster'

export const EVENT_CHAIN_KEY = 'event.chain'

export const OUTPUT_EVENT_KEY = 'row.output'

export const MASTER_DATA_NO_CHANGES_RESPONSE_STATUS = 304

const _1GB = 1024 ** 3

export const MAXIMUM_FILE_SIZE = _1GB

export const MAXIMUM_FILE_SIZE_STRING = '1GB'

export const ARRAY_SIZE_TARGET = 1000

export const ARRAY_SPLIT_FACTOR = 100
6 changes: 6 additions & 0 deletions node/utils/parsing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ export async function asyncBusboyWrapper<T>(req: IncomingMessage): Promise<T> {

return (result as unknown) as T
}

export function senderAppIdFromHeaders(headers: IncomingMessage['headers']) {
const appIdWithVersion = headers['x-vtex-caller'] as string

return appIdWithVersion.split('@')[0]
}
Loading

0 comments on commit f1d2aa1

Please sign in to comment.