diff --git a/.changeset/orange-moles-kick.md b/.changeset/orange-moles-kick.md new file mode 100644 index 0000000000000..6e6054418ee87 --- /dev/null +++ b/.changeset/orange-moles-kick.md @@ -0,0 +1,5 @@ +--- +'@backstage/plugin-scaffolder-backend': minor +--- + +Emit scaffolder events using the optional `EventsService` diff --git a/plugins/scaffolder-backend/package.json b/plugins/scaffolder-backend/package.json index 36b8e00325d08..3b78cc827f6ed 100644 --- a/plugins/scaffolder-backend/package.json +++ b/plugins/scaffolder-backend/package.json @@ -73,6 +73,7 @@ "@backstage/plugin-bitbucket-cloud-common": "workspace:^", "@backstage/plugin-catalog-backend-module-scaffolder-entity-model": "workspace:^", "@backstage/plugin-catalog-node": "workspace:^", + "@backstage/plugin-events-node": "workspace:^", "@backstage/plugin-permission-common": "workspace:^", "@backstage/plugin-permission-node": "workspace:^", "@backstage/plugin-scaffolder-backend-module-azure": "workspace:^", diff --git a/plugins/scaffolder-backend/report.api.md b/plugins/scaffolder-backend/report.api.md index cb5da51ea4bf4..c2e6c18ad8d85 100644 --- a/plugins/scaffolder-backend/report.api.md +++ b/plugins/scaffolder-backend/report.api.md @@ -19,6 +19,7 @@ import { Config } from '@backstage/config'; import { DatabaseService } from '@backstage/backend-plugin-api'; import { DiscoveryService } from '@backstage/backend-plugin-api'; import { Duration } from 'luxon'; +import { EventsService } from '@backstage/plugin-events-node'; import { executeShellCommand as executeShellCommand_2 } from '@backstage/plugin-scaffolder-node'; import { ExecuteShellCommandOptions } from '@backstage/plugin-scaffolder-node'; import express from 'express'; @@ -520,6 +521,7 @@ export class DatabaseTaskStore implements TaskStore { // @public export type DatabaseTaskStoreOptions = { database: PluginDatabaseManager | Knex; + events?: EventsService; }; // @public @deprecated @@ -552,6 +554,8 @@ export interface RouterOptions { // (undocumented) discovery?: DiscoveryService; // (undocumented) + events?: EventsService; + // (undocumented) httpAuth?: HttpAuthService; // (undocumented) identity?: IdentityApi; diff --git a/plugins/scaffolder-backend/src/ScaffolderPlugin.ts b/plugins/scaffolder-backend/src/ScaffolderPlugin.ts index d986dbf6d07c6..1ee0634d45199 100644 --- a/plugins/scaffolder-backend/src/ScaffolderPlugin.ts +++ b/plugins/scaffolder-backend/src/ScaffolderPlugin.ts @@ -15,8 +15,8 @@ */ import { - createBackendPlugin, coreServices, + createBackendPlugin, } from '@backstage/backend-plugin-api'; import { loggerToWinstonLogger } from '@backstage/backend-common'; import { ScmIntegrations } from '@backstage/integration'; @@ -51,6 +51,7 @@ import { createWaitAction, } from './scaffolder'; import { createRouter } from './service/router'; +import { eventsServiceRef } from '@backstage/plugin-events-node'; /** * Scaffolder plugin @@ -115,6 +116,7 @@ export const scaffolderPlugin = createBackendPlugin({ httpRouter: coreServices.httpRouter, httpAuth: coreServices.httpAuth, catalogClient: catalogServiceRef, + events: eventsServiceRef, }, async init({ logger, @@ -128,6 +130,7 @@ export const scaffolderPlugin = createBackendPlugin({ httpAuth, catalogClient, permissions, + events, }) { const log = loggerToWinstonLogger(logger); const integrations = ScmIntegrations.fromConfig(config); @@ -191,6 +194,7 @@ export const scaffolderPlugin = createBackendPlugin({ permissions, autocompleteHandlers, additionalWorkspaceProviders, + events, }); httpRouter.use(router); }, diff --git a/plugins/scaffolder-backend/src/scaffolder/tasks/DatabaseTaskStore.test.ts b/plugins/scaffolder-backend/src/scaffolder/tasks/DatabaseTaskStore.test.ts index fbd314cbbcb31..eb0418b5edcdc 100644 --- a/plugins/scaffolder-backend/src/scaffolder/tasks/DatabaseTaskStore.test.ts +++ b/plugins/scaffolder-backend/src/scaffolder/tasks/DatabaseTaskStore.test.ts @@ -21,8 +21,9 @@ import { TaskSpec } from '@backstage/plugin-scaffolder-common'; import { ConflictError } from '@backstage/errors'; import { createMockDirectory } from '@backstage/backend-test-utils'; import fs from 'fs-extra'; +import { EventsService } from '@backstage/plugin-events-node'; -const createStore = async () => { +const createStore = async (events?: EventsService) => { const manager = DatabaseManager.fromConfig( new ConfigReader({ backend: { @@ -35,6 +36,7 @@ const createStore = async () => { ).forPlugin('scaffolder'); const store = await DatabaseTaskStore.create({ database: manager, + events, }); return { store, manager }; }; @@ -52,6 +54,14 @@ const workspaceDir = createMockDirectory({ }); describe('DatabaseTaskStore', () => { + const eventsService = { + publish: jest.fn(), + } as unknown as EventsService; + + beforeEach(() => { + jest.resetAllMocks(); + }); + it('should create the database store and run migration', async () => { const { store, manager } = await createStore(); expect(store).toBeDefined(); @@ -222,7 +232,7 @@ describe('DatabaseTaskStore', () => { }); it('should sent an event to start cancelling the task', async () => { - const { store } = await createStore(); + const { store } = await createStore(eventsService); const { taskId } = await store.createTask({ spec: {} as TaskSpec, @@ -244,10 +254,24 @@ describe('DatabaseTaskStore', () => { const event = events[0]; expect(event.taskId).toBe(taskId); expect(event.body.status).toBe('cancelled'); + + expect(eventsService.publish).toHaveBeenCalledWith({ + topic: 'scaffolder.task', + eventPayload: { + id: 1, + taskId, + status: 'cancelled', + body: { + message: `Step 2 has been cancelled.`, + stepId: 2, + status: 'cancelled', + }, + }, + }); }); it('should emit a log event', async () => { - const { store } = await createStore(); + const { store } = await createStore(eventsService); const { taskId } = await store.createTask({ spec: {} as TaskSpec, createdBy: 'me', @@ -310,7 +334,7 @@ describe('DatabaseTaskStore', () => { }); it('should complete the task', async () => { - const { store } = await createStore(); + const { store } = await createStore(eventsService); const { taskId } = await store.createTask({ spec: {} as TaskSpec, createdBy: 'me', @@ -327,10 +351,21 @@ describe('DatabaseTaskStore', () => { const taskAfterCompletion = await store.getTask(taskId); expect(taskAfterCompletion.status).toBe('cancelled'); + + expect(eventsService.publish).toHaveBeenCalledWith({ + topic: 'scaffolder.task', + eventPayload: { + id: taskId, + status: 'cancelled', + createdAt: expect.any(String), + lastHeartbeatAt: null, + createdBy: 'me', + }, + }); }); it('should claim a new task', async () => { - const { store } = await createStore(); + const { store } = await createStore(eventsService); const { taskId } = await store.createTask({ spec: {} as TaskSpec, createdBy: 'me', @@ -341,10 +376,22 @@ describe('DatabaseTaskStore', () => { const claimedTask = await store.getTask(taskId); expect(claimedTask.status).toBe('processing'); + + expect(eventsService.publish).toHaveBeenCalledWith({ + topic: 'scaffolder.task', + eventPayload: { + id: taskId, + status: 'processing', + createdAt: expect.any(String), + lastHeartbeatAt: null, + createdBy: 'me', + spec: {}, + }, + }); }); it('should restore the state of the task after the task recovery', async () => { - const { store } = await createStore(); + const { store } = await createStore(eventsService); const { taskId } = await store.createTask({ spec: {} as TaskSpec, createdBy: 'me', @@ -379,10 +426,22 @@ describe('DatabaseTaskStore', () => { const claimedTask = await store.getTask(taskId); expect(claimedTask.state).toEqual({ state: state.state }); + + expect(eventsService.publish).toHaveBeenCalledWith({ + topic: 'scaffolder.task', + eventPayload: { + id: 1, + taskId, + body: { + recoverStrategy: 'none', + }, + status: 'recovered', + }, + }); }); it('should shutdown the running task', async () => { - const { store } = await createStore(); + const { store } = await createStore(eventsService); const { taskId } = await store.createTask({ spec: {} as TaskSpec, createdBy: 'me', @@ -394,6 +453,17 @@ describe('DatabaseTaskStore', () => { const claimedTask = await store.getTask(taskId); expect(claimedTask.status).toBe('failed'); + + expect(eventsService.publish).toHaveBeenCalledWith({ + topic: 'scaffolder.task', + eventPayload: { + id: taskId, + status: 'failed', + createdAt: expect.any(String), + lastHeartbeatAt: expect.any(String), + createdBy: 'me', + }, + }); }); it('should be not possible to shutdown not running task', async () => { diff --git a/plugins/scaffolder-backend/src/scaffolder/tasks/DatabaseTaskStore.ts b/plugins/scaffolder-backend/src/scaffolder/tasks/DatabaseTaskStore.ts index 8913ec2675980..811453256683a 100644 --- a/plugins/scaffolder-backend/src/scaffolder/tasks/DatabaseTaskStore.ts +++ b/plugins/scaffolder-backend/src/scaffolder/tasks/DatabaseTaskStore.ts @@ -45,6 +45,7 @@ import { serializeWorkspace, } from '@backstage/plugin-scaffolder-node/alpha'; import { flattenParams } from '../../service/helpers'; +import { EventsService } from '@backstage/plugin-events-node'; const migrationsDir = resolvePackagePath( '@backstage/plugin-scaffolder-backend', @@ -78,6 +79,7 @@ export type RawDbTaskEventRow = { */ export type DatabaseTaskStoreOptions = { database: PluginDatabaseManager | Knex; + events?: EventsService; }; /** @@ -112,6 +114,7 @@ const parseSqlDateToIsoString = (input: T): T | string => { */ export class DatabaseTaskStore implements TaskStore { private readonly db: Knex; + private readonly events?: EventsService; static async create( options: DatabaseTaskStoreOptions, @@ -121,7 +124,7 @@ export class DatabaseTaskStore implements TaskStore { await this.runMigrations(database, client); - return new DatabaseTaskStore(client); + return new DatabaseTaskStore(client, options.events); } private isRecoverableTask(spec: TaskSpec): boolean { @@ -177,8 +180,19 @@ export class DatabaseTaskStore implements TaskStore { } } - private constructor(client: Knex) { + private constructor(client: Knex, events?: EventsService) { this.db = client; + this.events = events; + } + + private getState(task: RawDbTaskRow) { + try { + return task.state ? JSON.parse(task.state).state : undefined; + } catch (error) { + throw new Error( + `Failed to parse state of the task '${task.id}', ${error}`, + ); + } } async list(options: { @@ -259,7 +273,7 @@ export class DatabaseTaskStore implements TaskStore { try { const spec = JSON.parse(result.spec); const secrets = result.secrets ? JSON.parse(result.secrets) : undefined; - const state = result.state ? JSON.parse(result.state).state : undefined; + const state = this.getState(result); return { id: result.id, spec, @@ -286,6 +300,17 @@ export class DatabaseTaskStore implements TaskStore { created_by: options.createdBy ?? null, status: 'open', }); + + this.events?.publish({ + topic: 'scaffolder.task', + eventPayload: { + id: taskId, + spec: options.spec, + createdBy: options.createdBy, + status: 'open', + }, + }); + return { taskId }; } @@ -317,27 +342,23 @@ export class DatabaseTaskStore implements TaskStore { return undefined; } - const getState = () => { - try { - return task.state ? JSON.parse(task.state).state : undefined; - } catch (error) { - throw new Error( - `Failed to parse state of the task '${task.id}', ${error}`, - ); - } - }; - - const secrets = this.parseTaskSecrets(task); - return { + const ret: SerializedTask = { id: task.id, spec, status: 'processing', lastHeartbeatAt: task.last_heartbeat_at, createdAt: task.created_at, createdBy: task.created_by ?? undefined, - secrets, - state: getState(), + state: this.getState(task), }; + + this.events?.publish({ + topic: 'scaffolder.task', + eventPayload: ret, + }); + + const secrets = this.parseTaskSecrets(task); + return { ...ret, secrets }; }); } @@ -408,11 +429,25 @@ export class DatabaseTaskStore implements TaskStore { ); } - await tx('task_events').insert({ - task_id: taskId, - event_type: 'completion', - body: JSON.stringify(eventBody), + this.events?.publish({ + topic: 'scaffolder.task', + eventPayload: { + id: taskId, + status: status, + lastHeartbeatAt: task.last_heartbeat_at, + createdAt: task.created_at, + createdBy: task.created_by, + state: this.getState(task), + }, }); + + await tx('task_events') + .insert({ + task_id: taskId, + event_type: 'completion', + body: JSON.stringify(eventBody), + }) + .returning('id'); }; if (status === 'cancelled') { @@ -448,11 +483,13 @@ export class DatabaseTaskStore implements TaskStore { ): Promise { const { taskId, body } = options; const serializedBody = JSON.stringify(body); - await this.db('task_events').insert({ - task_id: taskId, - event_type: 'log', - body: serializedBody, - }); + await this.db('task_events') + .insert({ + task_id: taskId, + event_type: 'log', + body: serializedBody, + }) + .returning('id'); } async getTaskState({ taskId }: { taskId: string }): Promise< @@ -596,10 +633,22 @@ export class DatabaseTaskStore implements TaskStore { ): Promise { const { taskId, body } = options; const serializedBody = JSON.stringify(body); - await this.db('task_events').insert({ - task_id: taskId, - event_type: 'cancelled', - body: serializedBody, + const [ret] = await this.db('task_events') + .insert({ + task_id: taskId, + event_type: 'cancelled', + body: serializedBody, + }) + .returning('id'); + + this.events?.publish({ + topic: 'scaffolder.task', + eventPayload: { + id: ret.id, + taskId, + status: 'cancelled', + body, + }, }); } @@ -665,13 +714,26 @@ export class DatabaseTaskStore implements TaskStore { for (const { id, spec } of result) { const taskSpec = JSON.parse(spec as string) as TaskSpec; - await tx('task_events').insert({ - task_id: id, - event_type: 'recovered', - body: JSON.stringify({ - recoverStrategy: - taskSpec.EXPERIMENTAL_recovery?.EXPERIMENTAL_strategy ?? 'none', - }), + const event = { + recoverStrategy: + taskSpec.EXPERIMENTAL_recovery?.EXPERIMENTAL_strategy ?? 'none', + }; + const [ret] = await tx('task_events') + .insert({ + task_id: id, + event_type: 'recovered', + body: JSON.stringify(event), + }) + .returning('id'); + + this.events?.publish({ + topic: 'scaffolder.task', + eventPayload: { + id: ret.id, + taskId: id, + status: 'recovered', + body: event, + }, }); } }); diff --git a/plugins/scaffolder-backend/src/service/router.test.ts b/plugins/scaffolder-backend/src/service/router.test.ts index 715ada5de1ac4..55d8b91293d13 100644 --- a/plugins/scaffolder-backend/src/service/router.test.ts +++ b/plugins/scaffolder-backend/src/service/router.test.ts @@ -51,6 +51,7 @@ import { import { AutocompleteHandler } from '@backstage/plugin-scaffolder-node/alpha'; import { UrlReaders } from '@backstage/backend-defaults/urlReader'; import { catalogServiceMock } from '@backstage/plugin-catalog-node/testUtils'; +import { EventsService } from '@backstage/plugin-events-node'; const mockAccess = jest.fn(); @@ -99,6 +100,9 @@ describe('createRouter', () => { const auth = mockServices.auth(); const httpAuth = mockServices.httpAuth(); const discovery = mockServices.discovery(); + const events = { + publish: jest.fn(), + } as unknown as EventsService; const credentials = mockCredentials.user(); const token = mockCredentials.service.token({ @@ -211,6 +215,7 @@ describe('createRouter', () => { auth, httpAuth, discovery, + events, }); app = express().use(router); diff --git a/plugins/scaffolder-backend/src/service/router.ts b/plugins/scaffolder-backend/src/service/router.ts index 3bde8d19ae58c..d8888058e8b5e 100644 --- a/plugins/scaffolder-backend/src/service/router.ts +++ b/plugins/scaffolder-backend/src/service/router.ts @@ -92,9 +92,9 @@ import { HttpAuthService, LifecycleService, PermissionsService, + resolveSafeChildPath, SchedulerService, UrlReaderService, - resolveSafeChildPath, } from '@backstage/backend-plugin-api'; import { IdentityApi, @@ -108,6 +108,7 @@ import { } from '@backstage/plugin-scaffolder-node/alpha'; import { pathToFileURL } from 'url'; import { v4 as uuid } from 'uuid'; +import { EventsService } from '@backstage/plugin-events-node'; /** * @@ -182,6 +183,7 @@ export interface RouterOptions { httpAuth?: HttpAuthService; identity?: IdentityApi; discovery?: DiscoveryService; + events?: EventsService; autocompleteHandlers?: Record; } @@ -294,6 +296,7 @@ export async function createRouter( discovery = HostDiscovery.fromConfig(config), identity = buildDefaultIdentityClient(options), autocompleteHandlers = {}, + events: eventsService, } = options; const { auth, httpAuth } = createLegacyAuthAdapters({ @@ -313,7 +316,10 @@ export async function createRouter( let taskBroker: TaskBroker; if (!options.taskBroker) { - const databaseTaskStore = await DatabaseTaskStore.create({ database }); + const databaseTaskStore = await DatabaseTaskStore.create({ + database, + events: eventsService, + }); taskBroker = new StorageTaskBroker( databaseTaskStore, logger, diff --git a/yarn.lock b/yarn.lock index 366b6c30acf8d..a33416ec140ee 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7328,6 +7328,7 @@ __metadata: "@backstage/plugin-bitbucket-cloud-common": "workspace:^" "@backstage/plugin-catalog-backend-module-scaffolder-entity-model": "workspace:^" "@backstage/plugin-catalog-node": "workspace:^" + "@backstage/plugin-events-node": "workspace:^" "@backstage/plugin-permission-common": "workspace:^" "@backstage/plugin-permission-node": "workspace:^" "@backstage/plugin-scaffolder-backend-module-azure": "workspace:^"