Skip to content

Commit

Permalink
feat: custom spans (#2287)
Browse files Browse the repository at this point in the history
* feat: add custom spans in trace

* refactor: printConfirmSubscription span

* refactor: tracer wrap

* chore: unwrap functions

* chore: unwrap functions

* chore: manually add reference parent span

* chore: add more spans

* fix: link spans correctly

* chore: add more spans

* chore: link spans to parent

* refactor: use tracer.wrap for parseNotficationAndEvent

* chore: fix span

* chore: move span to top level

* chore: parseNotificationAndEvent should be child of active span

* chore: add more spans

* chore: add span for parseNotificationAndEvent

---------

Co-authored-by: KishenKumarrrrr <[email protected]>
  • Loading branch information
KishenKumarrrrr and KishenKumarrrrr authored Nov 27, 2024
1 parent f7d3ecb commit 98d176f
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 11 deletions.
2 changes: 1 addition & 1 deletion backend/src/email/middlewares/email-callback.middleware.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Request, Response, NextFunction } from 'express'
import { EmailCallbackService } from '@email/services'
import { loggerWithLabel } from '@core/logger'

const logger = loggerWithLabel(module)

const isAuthenticated = (
Expand Down Expand Up @@ -58,6 +57,7 @@ const printConfirmSubscription = (
subscribeUrl,
action: 'printConfirmSubscription',
})

return res.sendStatus(202)
}
}
Expand Down
14 changes: 14 additions & 0 deletions backend/src/email/services/email-callback.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Request } from 'express'
import { ses, sendgrid } from '@email/utils/callback/parsers'
import config from '@core/config'
import { loggerWithLabel } from '@core/logger'
import { tracer } from 'dd-trace'

const logger = loggerWithLabel(module)

Expand All @@ -24,20 +25,33 @@ const isAuthenticated = (authHeader?: string): boolean => {
}

const parseEvent = async (req: Request): Promise<void> => {
const parseJsonSpan = tracer.startSpan('parseJson', {
childOf: tracer.scope().active() || undefined,
})
const parsed = JSON.parse(req.body)
parseJsonSpan.finish()
let records: Promise<void>[] = []
if (ses.isEvent(req)) {
// body could be one record or an array of records, hence we concat
const body: ses.SesRecord[] = []
const sesHttpEvent = body.concat(parsed)
const parseAllRecordsSpan = tracer.startSpan('parseAllRecords', {
childOf: tracer.scope().active() || undefined,
})
records = sesHttpEvent.map(ses.parseRecord)
parseAllRecordsSpan.finish()
} else if (sendgrid.isEvent(req)) {
// body is always an array
const sgEvent = parsed
records = sgEvent.map(sendgrid.parseRecord)
} else {
throw new Error('Unable to handle this event')
}
const parseNotificationAndEventSpan = tracer.startSpan(
'parseAllNotificationAndEvents',
{ childOf: tracer.scope().active() || undefined }
)
await Promise.all(records)
parseNotificationAndEventSpan.finish()
}
export const EmailCallbackService = { isAuthenticated, parseEvent }
8 changes: 7 additions & 1 deletion backend/src/email/services/email-transactional.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
} from '@core/constants'
import { Order } from 'sequelize/types/model'
import { Op, WhereOptions } from 'sequelize'
import tracer from 'dd-trace'

const logger = loggerWithLabel(module)

Expand Down Expand Up @@ -128,8 +129,12 @@ type CallbackMetaData = {
async function handleStatusCallbacks(
type: SesEventType,
id: string,
metadata: CallbackMetaData
metadata: CallbackMetaData,
parentSpan?: tracer.Span
): Promise<void> {
const handleStatusCallbacksSpan = tracer.startSpan('handleStatusCallbacks', {
childOf: parentSpan,
})
const emailMessageTransactional = await EmailMessageTransactional.findByPk(id)
if (!emailMessageTransactional) {
throw new Error(`Failed to find emailMessageTransactional for id: ${id}`)
Expand Down Expand Up @@ -227,6 +232,7 @@ async function handleStatusCallbacks(
metadata,
})
}
handleStatusCallbacksSpan.finish()
}

async function listMessages({
Expand Down
45 changes: 36 additions & 9 deletions backend/src/email/utils/callback/parsers/ses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import config from '@core/config'
import { compareSha256Hash } from '@shared/utils/crypto'
import { EmailTransactionalService } from '@email/services/email-transactional.service'
import { SesEventType, Metadata } from '@email/interfaces/callback.interface'
import tracer from 'dd-trace'

const logger = loggerWithLabel(module)
const REFERENCE_ID_HEADER_V2 = 'X-SMTPAPI' // Case sensitive
Expand Down Expand Up @@ -135,8 +136,13 @@ const shouldBlacklist = ({
const parseNotificationAndEvent = async (
type: SesEventType,
message: any,
metadata: Metadata
metadata: Metadata,
parentSpan?: tracer.Span
): Promise<void> => {
const parseNotificationAndEventSpan = tracer.startSpan(
'parseNotificationAndEvent',
{ childOf: parentSpan }
)
if (!isNotificationAndEventForMainRecipient(message, type)) {
logger.info({
message: 'SES notification or event is not for the main recipient',
Expand Down Expand Up @@ -176,6 +182,7 @@ const parseNotificationAndEvent = async (
})
return
}
parseNotificationAndEventSpan.finish()
}

// Validate SES record hash, returns message ID if valid, otherwise throw errors
Expand Down Expand Up @@ -223,16 +230,30 @@ const blacklistIfNeeded = async (message: any): Promise<void> => {
}
}
const parseRecord = async (record: SesRecord): Promise<void> => {
const parseRecordSpan = tracer.startSpan('parseRecord', {
childOf: tracer.scope().active() || undefined,
})
logger.info({
message: 'Parsing SES callback record',
})
const parseRecordJson = tracer.startSpan('parseRecordJson', {
childOf: parseRecordSpan,
})
const message = JSON.parse(record.Message)
parseRecordJson.finish()
const smtpApiHeader = getSmtpApiHeader(message)
const validateRecordSpan = tracer.startSpan('validateRecord', {
childOf: parseRecordSpan,
})
await validateRecord(record, smtpApiHeader)

validateRecordSpan.finish()
// Transactional emails don't have message IDs, so blacklist
// relevant email addresses before everything else
const blacklistIfNeededSpan = tracer.startSpan('blacklistIfNeeded', {
childOf: parseRecordSpan,
})
await blacklistIfNeeded(message)
blacklistIfNeededSpan.finish()

// primary key
const messageId = smtpApiHeader?.unique_args?.message_id
Expand All @@ -248,15 +269,21 @@ const parseRecord = async (record: SesRecord): Promise<void> => {
type,
})
if (isTransactional) {
return EmailTransactionalService.handleStatusCallbacks(type, messageId, {
timestamp: new Date(record.Timestamp),
bounce: message.bounce,
complaint: message.complaint,
delivery: message.delivery,
})
return EmailTransactionalService.handleStatusCallbacks(
type,
messageId,
{
timestamp: new Date(record.Timestamp),
bounce: message.bounce,
complaint: message.complaint,
delivery: message.delivery,
},
parseRecordSpan
)
}
return parseNotificationAndEvent(type, message, metadata)
return parseNotificationAndEvent(type, message, metadata, parseRecordSpan)
}
parseRecordSpan.finish()
}

// Checks whether the notification/event is meant for the main recipient of the email.
Expand Down

0 comments on commit 98d176f

Please sign in to comment.