Skip to content

Commit

Permalink
feat(disk-transform): first step
Browse files Browse the repository at this point in the history
  • Loading branch information
fbeauchamp committed Jan 8, 2025
1 parent 20bec89 commit 161dc94
Show file tree
Hide file tree
Showing 13 changed files with 464 additions and 0 deletions.
19 changes: 19 additions & 0 deletions @xen-orchestra/disk-transform/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "@xen-orchestra/disk-transform",
"version": "0.0.0",
"main": "index.js",
"license": "MIT",
"private": true,
"type": "module",
"devDependencies": {
"@tsconfig/node-lts": "^20.1.3",
"@tsconfig/recommended": "^1.0.7",
"@types/node": "^22.3.0",
"typescript": "^5.5.4"
},
"scripts": {
"build": "tsc",
"dev": "tsc --watch",
"start": "node ."
}
}
145 changes: 145 additions & 0 deletions @xen-orchestra/disk-transform/src/ForkableIterator.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import assert from 'node:assert'

export class ForkableIterator<T> {
#forks = new Set<ForkedIterator<T>>()
#source: AsyncIterator<T>

#started = false

#forksWaiting = new Set<ForkedIterator<T>>()
#promiseWaitingForForks?: {
promise: Promise<void>
resolve(): void
reject(err: Error): void
}
#promiseWaitingForNext?: Promise<IteratorResult<T>>

constructor(source: AsyncIterator<T>) {
if (typeof source[Symbol.asyncIterator] !== 'function') {
throw new Error('Source must be an async iterator')
}
this.#source = source
}

fork(): ForkedIterator<T> {
assert.notEqual(this.#started, true, 'Can t be forked once started')
const fork = new ForkedIterator(this)
this.#forks.add(fork)
return fork
}

async #waitForAllForks(): Promise<void> {
// first fork waiting for the data
if (this.#promiseWaitingForForks === undefined) {
let resolve = () => {}
let reject: (err: Error) => void = () => {}
const promise = new Promise<void>(function (_resolve, _reject) {
resolve = _resolve
reject = _reject
})

this.#promiseWaitingForForks = { promise, resolve, reject }
}
// all the forks are waiting
const { promise, resolve } = this.#promiseWaitingForForks
if (this.#forksWaiting.size === this.#forks.size) {
// reset data
this.#promiseWaitingForForks = undefined
this.#forksWaiting = new Set()
resolve() // mark the wait of the other forks as over
}
return promise
}

async next(fork: ForkedIterator<T>): Promise<IteratorResult<T>> {
// ensure a fork can't wait twice
assert.strictEqual(this.#forksWaiting.has(fork), false, 'fork is already waiting')
assert.strictEqual(this.#forks.has(fork), true, 'fork is not from this source')
this.#forksWaiting.add(fork)
this.#started = true

// ask for value only once for the first fork asking
if (this.#promiseWaitingForNext === undefined) {
this.#promiseWaitingForNext = this.#source.next()
}
// keep a copy of the promise locally since it may be removed from other forks
const nextValue = this.#promiseWaitingForNext?.catch(err => {
// handle the error and ensure it is thrown from an awaited place
// ( the call to waitForAllForks)
this.#promiseWaitingForForks?.reject(err)
}) as Promise<IteratorResult<T>>
await this.#waitForAllForks()
// ready to ask for a new value
this.#promiseWaitingForNext = undefined
return nextValue
}

remove(fork: ForkedIterator<T>) {
assert.ok(this.#forks.has(fork))
this.#forks.delete(fork)
// this fork may be waiting, blocking the others
this.#forksWaiting.delete(fork)

// removing a fork can free the data to flow again
if (this.#forksWaiting.size === this.#forks.size && this.#promiseWaitingForForks !== undefined) {
const { resolve } = this.#promiseWaitingForForks
this.#promiseWaitingForForks = undefined
this.#forksWaiting = new Set()
resolve() // mark the wait of the other forks as over
}
}
}

class ForkedIterator<T> {
#parent: ForkableIterator<T>
constructor(parent: ForkableIterator<T>) {
this.#parent = parent
}

async *[Symbol.asyncIterator]() {
let res: IteratorResult<T>
do {
res = await this.#parent.next(this)
console.log({ res })
yield res.value
} while (!res.done)
}

destroy() {
this.#parent.remove(this)
}
}

async function* makeRangeIterator(start = 0, end = Infinity, step = 1) {
let iterationCount = 0
for (let i = start; i < end; i += step) {
iterationCount++
await new Promise(resolve => setTimeout(resolve, 100))
if (Math.random() > 0.5) {
// throw new Error('fuck')
}
yield i
}
}

const source = makeRangeIterator(0, 5)
const forkable = new ForkableIterator(source)
const fork1 = forkable.fork()
const fork2 = forkable.fork()

async function consume(iterable, label) {
console.log('consume')
try {
for await (const val of iterable) {
console.log({ val, label })
await new Promise(resolve => setTimeout(resolve, Math.random() * 2500))
}
} catch (err) {
console.error(label, err)
}
console.log('consumed', label)
}

await Promise.all([consume(fork1, 'A'), consume(fork2, 'B')])

console.log('done')
43 changes: 43 additions & 0 deletions @xen-orchestra/disk-transform/src/PortableDifferencingDisk.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { Disposable } from 'promise-toolbox'

export type DiskBlockData = Buffer
export type DiskBlock = {
index: number
data: DiskBlockData
}

export type BytesLength = number
export type Uuid = string

export abstract class DiskBlockGenerator {
blockSize: number
expectedNbBlocks?: number
consumedBlocks: number = 0

get isSizeComputable(): boolean {
return this.expectedNbBlocks !== undefined
}

abstract hasBlock(index: number): boolean
abstract readBlock(index: number): Promise<DiskBlockData>
abstract [Symbol.asyncIterator](): AsyncIterator<DiskBlock>
}

export type Disposable<T> = {
value: T
dispose: () => Promise<void>
}

export abstract class PortableDiskMetadata {
id: Uuid
label: string
description: string
virtualSize: number
parentUuid?: Uuid
parentPath?: String
}

export abstract class PortableDifferencingDisk {
abstract getMetadata(): Promise<PortableDiskMetadata>
abstract getBlockIterator(): Promise<Disposable<DiskBlockGenerator>>
}
20 changes: 20 additions & 0 deletions @xen-orchestra/disk-transform/src/demo.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { getSyncedHandler } from '@xen-orchestra/fs'
import { VhdRemote, RemoteMetadata } from './from/VhdRemote.mjs'
import { FileAccessor } from './file-accessor/FileAccessor.mjs'

async function run() {
const { value: handler } = await getSyncedHandler({ url: 'file:///mnt/ssd/vhdblock' })
const metadataPath = './xo-vm-backups/cbb46b48-12aa-59dc-4039-8a587fdc67d5/20230831T100000Z.json'

const vhd = new VhdRemote({
handler: handler as FileAccessor,
metadataPath,
diskUuid: '1282b678-cb12-4b13-ab17-7a4fdac403d8',
})
const { value: iterator } = await vhd.getBlockIterator()
for await (const block of iterator) {
console.log(block)
}
}

run()
14 changes: 14 additions & 0 deletions @xen-orchestra/disk-transform/src/file-accessor/FileAccessor.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
export type FileDescriptor = Number

export abstract class FileAccessor {
abstract getSize: () => Promise<Number>
abstract getSizeOnDisk: () => Promise<Number>
abstract openFile: (path: string, opts: object) => Promise<FileDescriptor>
abstract closeFile: (path: string, opts: object) => Promise<void>
abstract read: (path: string, buffer: Buffer, offset: number) => Promise<Buffer>
abstract readFile: (path: string) => Promise<Buffer>
abstract write: (path: string, data: Buffer, offset: number) => Promise<void>
abstract writeFile: (path: string, data: Buffer | string) => Promise<Buffer>
abstract rename: (from: string, to: string) => Promise<void>
abstract unlink: (path: string) => Promise<void>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { FileAccessor, FileDescriptor } from './FileAccessor.mjs'

// handle file access inside a form
// to transform disks into an acceptable format from the browser
export class InBrowserFileAccessor extends FileAccessor {
getSize: () => Promise<Number>
getSizeOnDisk: () => Promise<Number>
openFile: (path: string, opts: object) => Promise<FileDescriptor>
closeFile: (path: string, opts: object) => Promise<void>
read: (path: string, buffer: Buffer, offset: number) => Promise<Buffer>
readFile: (path: string) => Promise<Buffer>
write: (path: string, data: Buffer, offset: number) => Promise<void>
writeFile: (path: string, data: Buffer | string) => Promise<Buffer>
rename: (from: string, to: string) => Promise<void>
unlink: (path: string) => Promise<void>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { FileAccessor, FileDescriptor } from './FileAccessor.mjs'

export class RemoteFileAccessor extends FileAccessor {
getSize: () => Promise<Number>
getSizeOnDisk: () => Promise<Number>
openFile: (path: string, opts: object) => Promise<FileDescriptor>
closeFile: (path: string, opts: object) => Promise<void>
read: (path: string, buffer: Buffer, offset: number) => Promise<Buffer>
readFile: (path: string) => Promise<Buffer>
write: (path: string, data: Buffer, offset: number) => Promise<void>
writeFile: (path: string, data: Buffer | string) => Promise<Buffer>
rename: (from: string, to: string) => Promise<void>
unlink: (path: string) => Promise<void>
}
Loading

0 comments on commit 161dc94

Please sign in to comment.