From 161dc948f9adbc088c44c57235a6d4f52f80471f Mon Sep 17 00:00:00 2001 From: Florent BEAUCHAMP Date: Wed, 8 Jan 2025 13:40:48 +0000 Subject: [PATCH] feat(disk-transform): first step --- @xen-orchestra/disk-transform/package.json | 19 +++ .../disk-transform/src/ForkableIterator.mts | 145 ++++++++++++++++++ .../src/PortableDifferencingDisk.mts | 43 ++++++ @xen-orchestra/disk-transform/src/demo.mts | 20 +++ .../src/file-accessor/FileAccessor.mts | 14 ++ .../file-accessor/InBrowserFileAccessor.mts | 16 ++ .../src/file-accessor/RemoteFileAccessor.mts | 14 ++ .../disk-transform/src/from/VhdRemote.mts | 127 +++++++++++++++ .../disk-transform/src/from/XapiExportNbd.mts | 0 .../disk-transform/src/from/XapiVhdExport.mts | 0 .../disk-transform/src/from/utils.mts | 18 +++ .../disk-transform/src/to/VhdFileRemote.mts | 32 ++++ @xen-orchestra/disk-transform/tsconfig.json | 16 ++ 13 files changed, 464 insertions(+) create mode 100644 @xen-orchestra/disk-transform/package.json create mode 100644 @xen-orchestra/disk-transform/src/ForkableIterator.mts create mode 100644 @xen-orchestra/disk-transform/src/PortableDifferencingDisk.mts create mode 100644 @xen-orchestra/disk-transform/src/demo.mts create mode 100644 @xen-orchestra/disk-transform/src/file-accessor/FileAccessor.mts create mode 100644 @xen-orchestra/disk-transform/src/file-accessor/InBrowserFileAccessor.mts create mode 100644 @xen-orchestra/disk-transform/src/file-accessor/RemoteFileAccessor.mts create mode 100644 @xen-orchestra/disk-transform/src/from/VhdRemote.mts create mode 100644 @xen-orchestra/disk-transform/src/from/XapiExportNbd.mts create mode 100644 @xen-orchestra/disk-transform/src/from/XapiVhdExport.mts create mode 100644 @xen-orchestra/disk-transform/src/from/utils.mts create mode 100644 @xen-orchestra/disk-transform/src/to/VhdFileRemote.mts create mode 100644 @xen-orchestra/disk-transform/tsconfig.json diff --git a/@xen-orchestra/disk-transform/package.json b/@xen-orchestra/disk-transform/package.json new file mode 100644 index 00000000000..5807767d560 --- /dev/null +++ b/@xen-orchestra/disk-transform/package.json @@ -0,0 +1,19 @@ +{ + "name": "@xen-orchestra/disk-transform", + "version": "0.0.0", + "main": "index.js", + "license": "MIT", + "private": true, + "type": "module", + "devDependencies": { + "@tsconfig/node-lts": "^20.1.3", + "@tsconfig/recommended": "^1.0.7", + "@types/node": "^22.3.0", + "typescript": "^5.5.4" + }, + "scripts": { + "build": "tsc", + "dev": "tsc --watch", + "start": "node ." + } +} diff --git a/@xen-orchestra/disk-transform/src/ForkableIterator.mts b/@xen-orchestra/disk-transform/src/ForkableIterator.mts new file mode 100644 index 00000000000..026ffc75538 --- /dev/null +++ b/@xen-orchestra/disk-transform/src/ForkableIterator.mts @@ -0,0 +1,145 @@ +import assert from 'node:assert' + +export class ForkableIterator { + #forks = new Set>() + #source: AsyncIterator + + #started = false + + #forksWaiting = new Set>() + #promiseWaitingForForks?: { + promise: Promise + resolve(): void + reject(err: Error): void + } + #promiseWaitingForNext?: Promise> + + constructor(source: AsyncIterator) { + if (typeof source[Symbol.asyncIterator] !== 'function') { + throw new Error('Source must be an async iterator') + } + this.#source = source + } + + fork(): ForkedIterator { + assert.notEqual(this.#started, true, 'Can t be forked once started') + const fork = new ForkedIterator(this) + this.#forks.add(fork) + return fork + } + + async #waitForAllForks(): Promise { + // first fork waiting for the data + if (this.#promiseWaitingForForks === undefined) { + let resolve = () => {} + let reject: (err: Error) => void = () => {} + const promise = new Promise(function (_resolve, _reject) { + resolve = _resolve + reject = _reject + }) + + this.#promiseWaitingForForks = { promise, resolve, reject } + } + // all the forks are waiting + const { promise, resolve } = this.#promiseWaitingForForks + if (this.#forksWaiting.size === this.#forks.size) { + // reset data + this.#promiseWaitingForForks = undefined + this.#forksWaiting = new Set() + resolve() // mark the wait of the other forks as over + } + return promise + } + + async next(fork: ForkedIterator): Promise> { + // ensure a fork can't wait twice + assert.strictEqual(this.#forksWaiting.has(fork), false, 'fork is already waiting') + assert.strictEqual(this.#forks.has(fork), true, 'fork is not from this source') + this.#forksWaiting.add(fork) + this.#started = true + + // ask for value only once for the first fork asking + if (this.#promiseWaitingForNext === undefined) { + this.#promiseWaitingForNext = this.#source.next() + } + // keep a copy of the promise locally since it may be removed from other forks + const nextValue = this.#promiseWaitingForNext?.catch(err => { + // handle the error and ensure it is thrown from an awaited place + // ( the call to waitForAllForks) + this.#promiseWaitingForForks?.reject(err) + }) as Promise> + await this.#waitForAllForks() + // ready to ask for a new value + this.#promiseWaitingForNext = undefined + return nextValue + } + + remove(fork: ForkedIterator) { + assert.ok(this.#forks.has(fork)) + this.#forks.delete(fork) + // this fork may be waiting, blocking the others + this.#forksWaiting.delete(fork) + + // removing a fork can free the data to flow again + if (this.#forksWaiting.size === this.#forks.size && this.#promiseWaitingForForks !== undefined) { + const { resolve } = this.#promiseWaitingForForks + this.#promiseWaitingForForks = undefined + this.#forksWaiting = new Set() + resolve() // mark the wait of the other forks as over + } + } +} + +class ForkedIterator { + #parent: ForkableIterator + constructor(parent: ForkableIterator) { + this.#parent = parent + } + + async *[Symbol.asyncIterator]() { + let res: IteratorResult + do { + res = await this.#parent.next(this) + console.log({ res }) + yield res.value + } while (!res.done) + } + + destroy() { + this.#parent.remove(this) + } +} + +async function* makeRangeIterator(start = 0, end = Infinity, step = 1) { + let iterationCount = 0 + for (let i = start; i < end; i += step) { + iterationCount++ + await new Promise(resolve => setTimeout(resolve, 100)) + if (Math.random() > 0.5) { + // throw new Error('fuck') + } + yield i + } +} + +const source = makeRangeIterator(0, 5) +const forkable = new ForkableIterator(source) +const fork1 = forkable.fork() +const fork2 = forkable.fork() + +async function consume(iterable, label) { + console.log('consume') + try { + for await (const val of iterable) { + console.log({ val, label }) + await new Promise(resolve => setTimeout(resolve, Math.random() * 2500)) + } + } catch (err) { + console.error(label, err) + } + console.log('consumed', label) +} + +await Promise.all([consume(fork1, 'A'), consume(fork2, 'B')]) + +console.log('done') diff --git a/@xen-orchestra/disk-transform/src/PortableDifferencingDisk.mts b/@xen-orchestra/disk-transform/src/PortableDifferencingDisk.mts new file mode 100644 index 00000000000..0da3ca55750 --- /dev/null +++ b/@xen-orchestra/disk-transform/src/PortableDifferencingDisk.mts @@ -0,0 +1,43 @@ +import { Disposable } from 'promise-toolbox' + +export type DiskBlockData = Buffer +export type DiskBlock = { + index: number + data: DiskBlockData +} + +export type BytesLength = number +export type Uuid = string + +export abstract class DiskBlockGenerator { + blockSize: number + expectedNbBlocks?: number + consumedBlocks: number = 0 + + get isSizeComputable(): boolean { + return this.expectedNbBlocks !== undefined + } + + abstract hasBlock(index: number): boolean + abstract readBlock(index: number): Promise + abstract [Symbol.asyncIterator](): AsyncIterator +} + +export type Disposable = { + value: T + dispose: () => Promise +} + +export abstract class PortableDiskMetadata { + id: Uuid + label: string + description: string + virtualSize: number + parentUuid?: Uuid + parentPath?: String +} + +export abstract class PortableDifferencingDisk { + abstract getMetadata(): Promise + abstract getBlockIterator(): Promise> +} diff --git a/@xen-orchestra/disk-transform/src/demo.mts b/@xen-orchestra/disk-transform/src/demo.mts new file mode 100644 index 00000000000..6968b180897 --- /dev/null +++ b/@xen-orchestra/disk-transform/src/demo.mts @@ -0,0 +1,20 @@ +import { getSyncedHandler } from '@xen-orchestra/fs' +import { VhdRemote, RemoteMetadata } from './from/VhdRemote.mjs' +import { FileAccessor } from './file-accessor/FileAccessor.mjs' + +async function run() { + const { value: handler } = await getSyncedHandler({ url: 'file:///mnt/ssd/vhdblock' }) + const metadataPath = './xo-vm-backups/cbb46b48-12aa-59dc-4039-8a587fdc67d5/20230831T100000Z.json' + + const vhd = new VhdRemote({ + handler: handler as FileAccessor, + metadataPath, + diskUuid: '1282b678-cb12-4b13-ab17-7a4fdac403d8', + }) + const { value: iterator } = await vhd.getBlockIterator() + for await (const block of iterator) { + console.log(block) + } +} + +run() diff --git a/@xen-orchestra/disk-transform/src/file-accessor/FileAccessor.mts b/@xen-orchestra/disk-transform/src/file-accessor/FileAccessor.mts new file mode 100644 index 00000000000..6531a6752b9 --- /dev/null +++ b/@xen-orchestra/disk-transform/src/file-accessor/FileAccessor.mts @@ -0,0 +1,14 @@ +export type FileDescriptor = Number + +export abstract class FileAccessor { + abstract getSize: () => Promise + abstract getSizeOnDisk: () => Promise + abstract openFile: (path: string, opts: object) => Promise + abstract closeFile: (path: string, opts: object) => Promise + abstract read: (path: string, buffer: Buffer, offset: number) => Promise + abstract readFile: (path: string) => Promise + abstract write: (path: string, data: Buffer, offset: number) => Promise + abstract writeFile: (path: string, data: Buffer | string) => Promise + abstract rename: (from: string, to: string) => Promise + abstract unlink: (path: string) => Promise +} diff --git a/@xen-orchestra/disk-transform/src/file-accessor/InBrowserFileAccessor.mts b/@xen-orchestra/disk-transform/src/file-accessor/InBrowserFileAccessor.mts new file mode 100644 index 00000000000..4e7008054ed --- /dev/null +++ b/@xen-orchestra/disk-transform/src/file-accessor/InBrowserFileAccessor.mts @@ -0,0 +1,16 @@ +import { FileAccessor, FileDescriptor } from './FileAccessor.mjs' + +// handle file access inside a form +// to transform disks into an acceptable format from the browser +export class InBrowserFileAccessor extends FileAccessor { + getSize: () => Promise + getSizeOnDisk: () => Promise + openFile: (path: string, opts: object) => Promise + closeFile: (path: string, opts: object) => Promise + read: (path: string, buffer: Buffer, offset: number) => Promise + readFile: (path: string) => Promise + write: (path: string, data: Buffer, offset: number) => Promise + writeFile: (path: string, data: Buffer | string) => Promise + rename: (from: string, to: string) => Promise + unlink: (path: string) => Promise +} diff --git a/@xen-orchestra/disk-transform/src/file-accessor/RemoteFileAccessor.mts b/@xen-orchestra/disk-transform/src/file-accessor/RemoteFileAccessor.mts new file mode 100644 index 00000000000..7ea6cfb6428 --- /dev/null +++ b/@xen-orchestra/disk-transform/src/file-accessor/RemoteFileAccessor.mts @@ -0,0 +1,14 @@ +import { FileAccessor, FileDescriptor } from './FileAccessor.mjs' + +export class RemoteFileAccessor extends FileAccessor { + getSize: () => Promise + getSizeOnDisk: () => Promise + openFile: (path: string, opts: object) => Promise + closeFile: (path: string, opts: object) => Promise + read: (path: string, buffer: Buffer, offset: number) => Promise + readFile: (path: string) => Promise + write: (path: string, data: Buffer, offset: number) => Promise + writeFile: (path: string, data: Buffer | string) => Promise + rename: (from: string, to: string) => Promise + unlink: (path: string) => Promise +} diff --git a/@xen-orchestra/disk-transform/src/from/VhdRemote.mts b/@xen-orchestra/disk-transform/src/from/VhdRemote.mts new file mode 100644 index 00000000000..a52ae4f3720 --- /dev/null +++ b/@xen-orchestra/disk-transform/src/from/VhdRemote.mts @@ -0,0 +1,127 @@ +import { openVhd } from 'vhd-lib' +import { + DiskBlock, + DiskBlockData, + DiskBlockGenerator, + Disposable, + PortableDifferencingDisk, + PortableDiskMetadata, + Uuid, +} from '../PortableDifferencingDisk.mjs' +import { FileAccessor } from '../file-accessor/FileAccessor.mjs' +import { dirname, join, relative } from 'path' + +type VhdBlock = { + id: number + bitmap: Buffer + data: Buffer + buffer: Buffer +} + +export type Vhd = { + header: { maxTableEntries: number } + footer: { blockSize: number } + open(handler: any /* remoteadapter */, path: string): Promise> + containsBlock(id: number): boolean + readHeaderAndFooter(): Promise + readBlockAllocationTable(): Promise + readBlock(index: number): Promise + blocks(): AsyncIterator + writeHeaderAndFooter(): Promise + writeBlockAllocationTable(): Promise + writeEntireBlock(block: VhdBlock): Promise +} + +class VhdFileGenerator extends DiskBlockGenerator { + #vhd: Vhd + constructor(vhd: Vhd) { + super() + this.#vhd = vhd + this.expectedNbBlocks = vhd.header.maxTableEntries + this.blockSize = vhd.footer.blockSize * 512 + } + hasBlock(index: number): boolean { + return this.#vhd.containsBlock(index) + } + async readBlock(index: number): Promise { + const block = await this.#vhd.readBlock(index) + return block.buffer + } + async *[Symbol.asyncIterator](): AsyncIterator { + const blockIterator = this.#vhd.blocks() + let res: { value: VhdBlock; done?: boolean } + do { + res = await blockIterator.next(this) + this.consumedBlocks++ + yield { + index: res.value.id, + data: res.value.data, + } + } while (!res.done) + } +} + +type Vdi = { + virtual_size: number + name_label: string + desc: string + uuid: Uuid + ref: string +} +export type RemoteMetadata = { + vdis: Array + vhds: Object +} + +type VhdRemoteDiskMetadata = PortableDiskMetadata & { + path: string +} + +export class VhdRemote extends PortableDifferencingDisk { + #handler: FileAccessor + #metadataPath: string + #diskUuid: Uuid + + constructor({ handler, metadataPath, diskUuid }: { handler: FileAccessor; metadataPath: string; diskUuid: Uuid }) { + super() + this.#handler = handler + this.#metadataPath = metadataPath + this.#diskUuid = diskUuid + } + async getMetadata(): Promise { + const metadata = JSON.parse((await this.#handler.readFile(this.#metadataPath)).toString('utf8')) as RemoteMetadata + + const vdi = Object.entries(metadata.vdis) + .map(([ref, vdi]) => ({ ...vdi, ref })) + .find(({ uuid }) => uuid === this.#diskUuid) + if (vdi === undefined) { + throw new Error(`Couldn't find disk with uuid ${this.#diskUuid}`) + } + const vhd = metadata.vhds[vdi.ref] + if (vhd === undefined) { + throw new Error(`Couldn't find vhd with ref ${vdi.ref}`) + } + return Promise.resolve({ + id: this.#diskUuid, + label: vdi.name_label, + description: vdi.desc, + virtualSize: vdi.virtual_size, + path: vhd, + }) + } + + async getBlockIterator(): Promise> { + const { path } = await this.getMetadata() + console.log(join(dirname(this.#metadataPath), path)) + const disposable: unknown = await openVhd(this.#handler, join(dirname(this.#metadataPath), path)) + const { value: vhd, dispose } = disposable as Disposable // @todo : type openVhd correctly + await vhd.readBlockAllocationTable() + console.log({ vhd }) + const generator = new VhdFileGenerator(vhd) + + return { + value: generator, + dispose, + } + } +} diff --git a/@xen-orchestra/disk-transform/src/from/XapiExportNbd.mts b/@xen-orchestra/disk-transform/src/from/XapiExportNbd.mts new file mode 100644 index 00000000000..e69de29bb2d diff --git a/@xen-orchestra/disk-transform/src/from/XapiVhdExport.mts b/@xen-orchestra/disk-transform/src/from/XapiVhdExport.mts new file mode 100644 index 00000000000..e69de29bb2d diff --git a/@xen-orchestra/disk-transform/src/from/utils.mts b/@xen-orchestra/disk-transform/src/from/utils.mts new file mode 100644 index 00000000000..c7accc9ff55 --- /dev/null +++ b/@xen-orchestra/disk-transform/src/from/utils.mts @@ -0,0 +1,18 @@ +import { PortableDiskMetadata } from '../PortableDifferencingDisk.mjs' + +export async function getXapiMetadata(xapi: any, uuid: string): Promise { + return Promise.resolve({ + id: uuid, + label: 'vdi', + description: '', + virtualSize: 2, + }) +} +export async function getRemoteMetadata(metadata, uuid): Promise { + return Promise.resolve({ + id: uuid, + label: 'vdi', + description: '', + virtualSize: 2, + }) +} diff --git a/@xen-orchestra/disk-transform/src/to/VhdFileRemote.mts b/@xen-orchestra/disk-transform/src/to/VhdFileRemote.mts new file mode 100644 index 00000000000..0677977a51b --- /dev/null +++ b/@xen-orchestra/disk-transform/src/to/VhdFileRemote.mts @@ -0,0 +1,32 @@ +import { VhdFile } from 'vhd-lib' +import { DiskBlockGenerator, PortableDifferencingDisk } from '../PortableDifferencingDisk.mjs' +import { Disposable } from 'promise-toolbox' +import { Vhd } from '../from/VhdRemote.mjs' + +async function writeVhdToRemote(targetVhd: Vhd, disk: PortableDifferencingDisk): Promise { + return Disposable.use(disk.getBlockIterator(), async (blockIterator: DiskBlockGenerator): Promise => { + // @todo : create header/footer from size/label/parent + const metada = await disk.getMetadata() + const bitmap = Buffer.alloc(255, 512) + for await (const block of blockIterator) { + await targetVhd.writeEntireBlock({ + id: block.index, + bitmap, + data: block.data, + buffer: Buffer.concat([bitmap, block.data]), + }) + } + await targetVhd.writeHeaderAndFooter() + await targetVhd.writeBlockAllocationTable() + }) +} + +export async function writeVhdFileToRemote(remote, path: string, disk: PortableDifferencingDisk) { + const handler = remote._handler + return Disposable.use(VhdFile.create(handler, path), async (vhd: Vhd) => { + // @todo : precompute target bat to ensure we can write the block without updating the bat at each block + return writeVhdToRemote(vhd, disk) + }) +} + +// @todo: vhddirectory diff --git a/@xen-orchestra/disk-transform/tsconfig.json b/@xen-orchestra/disk-transform/tsconfig.json new file mode 100644 index 00000000000..dac85e3f5db --- /dev/null +++ b/@xen-orchestra/disk-transform/tsconfig.json @@ -0,0 +1,16 @@ +{ + "compilerOptions": { + "allowJs": true, // Permet d'importer des fichiers .js + "checkJs": false, // EmpĂȘche TypeScript de signaler des erreurs dans les fichiers .js + "esModuleInterop": true, // Facilite les imports de modules CommonJS/ESM + // "resolveJsonModule": true, // Permet d'importer des fichiers JSON + "skipLibCheck": true, // Ignore les erreurs de type dans les dĂ©pendances externes + "target": "es2017", + "module": "NodeNext", + "moduleResolution": "nodenext", + "declaration": true, // Generates `.d.ts` files for type safety + "outDir": "dist" // Outputs compiled code to the `dist` directory + }, + "include": ["src/**/*"], + "exclude": ["node_modules", "dist"] +}