Skip to content

Commit

Permalink
Hotfix/2.17 (#1955)
Browse files Browse the repository at this point in the history
* gergo/apolloQueryDuration (#1949)

* add apollo query duration

* feat: add more details to apollo query logging

* fix: pr review

* feat: format log messages as clef (#1950)

* fix(logging): pinoClef log levels must be a string

* chore(fe2): reducing log level for some spammy req logs

* minor adjustment

* more robust path resolution

* better req log text

* feat(fe2): improved and more thorough logging to help with observability (#1948)

* better req log text

* minor improvements to server logging

* WIP FE2 req logging

* FE2 apollo operation logging

* undid apolloPlugin changes due to Gergos PR

* seq message templates introduced

* fix: request logs (#1964)

* fix: request logs

* chore: remove comments

* feat: add graphql subscription metrics (#1970)

* optimized preview msg resultListener

* fix(server): locking to avoid postgres notification listeners processing the same message multiple times (#1972)

* fix(server): locking to avoid postgres notification listeners processing the same message multiple times

* optimized locking

* minor cleanup

* msg update

* log level adjustments

* reduce failsafe expiry

---------

Co-authored-by: Iain Sproat <[email protected]>
Co-authored-by: Kristaps Fabians Geikins <[email protected]>
Co-authored-by: Kristaps Fabians Geikins <[email protected]>
  • Loading branch information
4 people authored Jan 17, 2024
1 parent ceeb30e commit c2085d6
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 25 deletions.
30 changes: 30 additions & 0 deletions packages/server/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,24 @@ function buildApolloSubscriptionServer(
help: 'Number of currently connected clients'
})

prometheusClient.register.removeSingleMetric(
'speckle_server_apollo_graphql_total_subscription_operations'
)
const metricSubscriptionTotalOperations = new prometheusClient.Counter({
name: 'speckle_server_apollo_graphql_total_subscription_operations',
help: 'Number of total subscription operations served by this instance',
labelNames: ['subscriptionType'] as const
})

prometheusClient.register.removeSingleMetric(
'speckle_server_apollo_graphql_total_subscription_responses'
)
const metricSubscriptionTotalResponses = new prometheusClient.Counter({
name: 'speckle_server_apollo_graphql_total_subscription_responses',
help: 'Number of total subscription responses served by this instance',
labelNames: ['subscriptionType', 'status'] as const
})

return SubscriptionServer.create(
{
schema,
Expand Down Expand Up @@ -178,17 +196,29 @@ function buildApolloSubscriptionServer(
// kinda hacky, but we're using this as an "subscription event emitted"
// callback to clear subscription connection dataloaders to avoid stale cache
const baseParams = params[1]
metricSubscriptionTotalOperations.inc({
subscriptionType: baseParams.operationName
})
const ctx = baseParams.context as GraphQLContext

// eslint-disable-next-line @typescript-eslint/no-explicit-any
baseParams.formatResponse = (val: SubscriptionResponse) => {
ctx.loaders.clearAll()
logSubscriptionOperation({ ctx, execParams: baseParams, response: val })
metricSubscriptionTotalResponses.inc({
subscriptionType: baseParams.operationName,
status: 'success'
})
return val
}
baseParams.formatError = (e: Error) => {
ctx.loaders.clearAll()
logSubscriptionOperation({ ctx, execParams: baseParams, error: e })

metricSubscriptionTotalResponses.inc({
subscriptionType: baseParams.operationName,
status: 'error'
})
return e
}

Expand Down
20 changes: 18 additions & 2 deletions packages/server/modules/core/repositories/commits.ts
Original file line number Diff line number Diff line change
Expand Up @@ -373,15 +373,31 @@ export async function createCommit(
return item
}

export async function getObjectCommitsWithStreamIds(objectIds: string[]) {
export async function getObjectCommitsWithStreamIds(
objectIds: string[],
options?: {
/**
* Optionally also filter by stream ids
*/
streamIds?: string[]
}
) {
if (!objectIds?.length) return []
return await Commits.knex()
const { streamIds } = options || {}

const q = Commits.knex()
.select<Array<CommitRecord & { streamId: string }>>([
...Commits.cols,
StreamCommits.col.streamId
])
.whereIn(Commits.col.referencedObject, objectIds)
.innerJoin(StreamCommits.name, StreamCommits.col.commitId, Commits.col.id)

if (streamIds?.length) {
q.whereIn(StreamCommits.col.streamId, streamIds)
}

return await q
}

export async function getAllBranchCommits(params: {
Expand Down
85 changes: 67 additions & 18 deletions packages/server/modules/core/utils/dbNotificationListener.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,74 @@
import { MaybeAsync, Optional } from '@speckle/shared'
import { dbNotificationLogger, moduleLogger } from '@/logging/logging'
import { MaybeAsync, Optional, md5 } from '@speckle/shared'
import { dbNotificationLogger } from '@/logging/logging'
import { knex } from '@/modules/core/dbSchema'
import * as Knex from 'knex'
import * as pg from 'pg'

/**
* TODO: This currently will emit duplicate events when there are multiple server instances running. Not a big deal currently while there aren't that many events,
* but we need to figure this out
*/
import { createRedisClient } from '@/modules/shared/redis/redis'
import { getRedisUrl } from '@/modules/shared/helpers/envHelper'
import Redis from 'ioredis'
import { LogicError } from '@/modules/shared/errors'

export type MessageType = { channel: string; payload: string }
export type ListenerType = (msg: MessageType) => MaybeAsync<void>

let shuttingDown = false
let connection: Optional<pg.Connection> = undefined
let redisClient: Optional<Redis> = undefined

const listeners: Record<string, { setup: boolean; listener: ListenerType }> = {}
const lockName = 'server_postgres_listener_lock'

function getMessageId(msg: MessageType) {
const str = JSON.stringify(msg)
return md5(str)
}

async function getTaskLock(taskId: string) {
if (!redisClient) {
throw new LogicError(
'Unexpected failure! Attempting to get task lock before redis client is initialized'
)
}

const lockKey = `${lockName}:${taskId}`
const lock = await redisClient.set(lockKey, '1', 'EX', 60, 'NX')
const releaseLock = async () => {
if (!redisClient) {
throw new LogicError(
'Unexpected failure! Attempting to release task lock before redis client is initialized'
)
}
await redisClient.del(lockKey)
}
return lock ? releaseLock : null
}

function messageProcessor(msg: MessageType) {
async function messageProcessor(msg: MessageType) {
const listener = listeners[msg.channel]
dbNotificationLogger.info(
{
...msg,
listenerRegistered: !!listener
},
'Message received'
)
const messageId = getMessageId(msg)

const logPayload = {
...msg,
listenerRegistered: !!listener,
messageId
}
if (!listener) return

return listener.listener(msg)
// Only process if lock acquired
const unlock = await getTaskLock(messageId)
if (unlock) {
dbNotificationLogger.info(
logPayload,
'Message #{messageId} of channel {channel} starting processing...'
)
await Promise.resolve(listener.listener(msg))
await unlock()
} else {
dbNotificationLogger.debug(
logPayload,
'Message #{messageId} of channel {channel} skipped due to missing lock...'
)
}
}

function setupListeners(connection: pg.Connection) {
Expand Down Expand Up @@ -58,7 +99,9 @@ function reconnectClient() {
const newConnection = await (
knex.client as Knex.Knex.Client
).acquireRawConnection()

connection = newConnection
redisClient = createRedisClient(getRedisUrl(), {})

clearInterval(interval)
setupConnection(newConnection)
Expand All @@ -72,20 +115,26 @@ function reconnectClient() {
}

export function setupResultListener() {
moduleLogger.info('🔔 Initializing postgres notification listening...')
dbNotificationLogger.info('🔔 Initializing postgres notification listening...')
reconnectClient()
}

export function shutdownResultListener() {
moduleLogger.info('...Shutting down postgres notification listening')
dbNotificationLogger.info('...Shutting down postgres notification listening')
shuttingDown = true

if (connection) {
connection.end()
connection = undefined
}
}

export function listenFor(eventName: string, cb: ListenerType) {
dbNotificationLogger.info(
{ eventName },
'Registering postgres event listener for {eventName}'
)

listeners[eventName] = {
setup: false,
listener: cb
Expand Down
6 changes: 4 additions & 2 deletions packages/server/modules/fileuploads/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const saveFileUploads = async ({ userId, streamId, branchName, uploadResults })
)
}

exports.init = async (app) => {
exports.init = async (app, isInitial) => {
if (process.env.DISABLE_FILE_UPLOADS) {
moduleLogger.warn('📄 FileUploads module is DISABLED')
return
Expand Down Expand Up @@ -77,7 +77,9 @@ exports.init = async (app) => {
}
)

listenForImportUpdates()
if (isInitial) {
listenForImportUpdates()
}
}

exports.finalize = () => {}
6 changes: 4 additions & 2 deletions packages/server/modules/previews/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const cors = require('cors')
const noPreviewImage = require.resolve('#/assets/previews/images/no_preview.png')
const previewErrorImage = require.resolve('#/assets/previews/images/preview_error.png')

exports.init = (app) => {
exports.init = (app, isInitial) => {
if (process.env.DISABLE_PREVIEWS) {
moduleLogger.warn('📸 Object preview module is DISABLED')
} else {
Expand Down Expand Up @@ -266,7 +266,9 @@ exports.init = (app) => {
)
})

listenForPreviewGenerationUpdates()
if (isInitial) {
listenForPreviewGenerationUpdates()
}
}

exports.finalize = () => {}
4 changes: 3 additions & 1 deletion packages/server/modules/previews/services/resultListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ async function messageProcessor(msg: MessageType) {
if (status !== 'finished' || !objectId || !streamId) return

// Get all commits with that objectId
const commits = await getObjectCommitsWithStreamIds([objectId])
const commits = await getObjectCommitsWithStreamIds([objectId], {
streamIds: [streamId]
})
if (!commits.length) return

await Promise.all(
Expand Down

0 comments on commit c2085d6

Please sign in to comment.