Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Process TSV files as streams and validate only the first 1000 rows by default #139

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"@std/io": "jsr:@std/[email protected]",
"@std/log": "jsr:@std/[email protected]",
"@std/path": "jsr:@std/[email protected]",
"@std/streams": "jsr:@std/[email protected]",
"@std/yaml": "jsr:@std/yaml@^1.0.4"
},
"tasks": {
Expand Down
20 changes: 16 additions & 4 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/files/deno.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import { readAll, readerFromStreamReader } from '@std/io'
import { basename, dirname, fromFileUrl, join } from '@std/path'
import { EOL } from '@std/fs'
import type { FileTree } from '../types/filetree.ts'
import { BIDSFileDeno, readFileTree, UnicodeDecodeError } from './deno.ts'
import { BIDSFileDeno, readFileTree } from './deno.ts'
import { UnicodeDecodeError } from './streams.ts'
import { requestReadPermission } from '../setup/requestPermissions.ts'
import { FileIgnoreRules } from './ignore.ts'

Expand Down
36 changes: 5 additions & 31 deletions src/files/deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,9 @@ import { type BIDSFile, FileTree } from '../types/filetree.ts'
import { requestReadPermission } from '../setup/requestPermissions.ts'
import { FileIgnoreRules, readBidsIgnore } from './ignore.ts'
import { logger } from '../utils/logger.ts'
import { createUTF8Stream } from './streams.ts'
export { type BIDSFile, FileTree }

/**
* Thrown when a text file is decoded as UTF-8 but contains UTF-16 characters
*/
export class UnicodeDecodeError extends Error {
constructor(message: string) {
super(message)
this.name = 'UnicodeDecode'
}
}

/**
* Deno implementation of BIDSFile
*/
Expand Down Expand Up @@ -67,28 +58,11 @@ export class BIDSFileDeno implements BIDSFile {
* Read the entire file and decode as utf-8 text
*/
async text(): Promise<string> {
const streamReader = this.stream
.pipeThrough(new TextDecoderStream('utf-8'))
.getReader()
let data = ''
try {
// Read once to check for unicode issues
const { done, value } = await streamReader.read()
// Check for UTF-16 BOM
if (value && value.startsWith('\uFFFD')) {
throw new UnicodeDecodeError('This file appears to be UTF-16')
}
if (done) return data
data += value
// Continue reading the rest of the file if no unicode issues were found
while (true) {
const { done, value } = await streamReader.read()
if (done) return data
data += value
}
} finally {
streamReader.releaseLock()
let chunks: string[] = []
for await (const chunk of this.stream.pipeThrough(createUTF8Stream())) {
chunks.push(chunk)
}
return chunks.join('')
}

/**
Expand Down
6 changes: 5 additions & 1 deletion src/files/filetree.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import { FileIgnoreRules } from './ignore.ts'

const nullFile = {
size: 0,
stream: new ReadableStream(),
stream: new ReadableStream({
start(controller) {
controller.close()
}
}),
text: () => Promise.resolve(''),
readBytes: async (size: number, offset?: number) => new Uint8Array(),
parent: new FileTree('', '/'),
Expand Down
1 change: 0 additions & 1 deletion src/files/json.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { type assert, assertObjectMatch } from '@std/assert'
import type { BIDSFileDeno, UnicodeDecodeError } from './deno.ts'
import type { BIDSFile } from '../types/filetree.ts'
import type { FileIgnoreRules } from './ignore.ts'

Expand Down
37 changes: 37 additions & 0 deletions src/files/streams.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { assert, assertEquals } from '@std/assert'
import { createUTF8Stream, UnicodeDecodeError } from './streams.ts'
import { streamFromUint8Array, streamFromString } from '../tests/utils.ts'

Deno.test('createUTF8Stream', async (t) => {
await t.step('should return a TransformStream with UTF8StreamTransformer', () => {
const stream = createUTF8Stream()
assertEquals(stream instanceof TransformStream, true)
})

await t.step('should correctly transform UTF-8 input', async () => {
const rawstream = streamFromString('Hello, world!')
const reader = rawstream.pipeThrough(createUTF8Stream()).getReader()
const { value } = await reader.read()
assertEquals(value, 'Hello, world!')

await reader.cancel()
})

await t.step('should throw UnicodeDecodeError for UTF-16 input', async () => {
const rawStream = streamFromUint8Array(new Uint8Array([0xFF, 0xFE, 0x00, 0x00]))

let reader
try {
// The exception can't be localized to either of the following lines
// but is raised before the second returns
reader = rawStream.pipeThrough(createUTF8Stream()).getReader()
const { value } = await reader.read()
assert(false, 'Expected UnicodeDecodeError, got ' + value)
} catch (e: any) {
assertEquals(e instanceof UnicodeDecodeError, true)
assertEquals(e?.message, 'This file appears to be UTF-16')
} finally {
if (reader) await reader.cancel
}
})
})
51 changes: 51 additions & 0 deletions src/files/streams.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Thrown when a text file is decoded as UTF-8 but contains UTF-16 characters
*/
export class UnicodeDecodeError extends Error {
constructor(message: string) {
super(message)
this.name = 'UnicodeDecode'
}
}

/**
* A transformer that ensures the input stream is valid UTF-8 and throws
* a UnicodeDecodeError if UTF-16 BOM is detected
*/
export class UTF8StreamTransformer implements Transformer<Uint8Array, string> {
private decoder: TextDecoder
private firstChunk: boolean

constructor() {
this.decoder = new TextDecoder('utf-8')
this.firstChunk = true
}

transform(chunk: Uint8Array, controller: TransformStreamDefaultController<string>) {
// Check first chunk for UTF-16 BOM
if (this.firstChunk) {
const decoded = this.decoder.decode(chunk, { stream: true })
if (decoded.startsWith('\uFFFD')) {
throw new UnicodeDecodeError('This file appears to be UTF-16')
}
this.firstChunk = false
controller.enqueue(decoded)
} else {
controller.enqueue(this.decoder.decode(chunk, { stream: true }))
}
}

flush(controller: TransformStreamDefaultController<string>) {
const final = this.decoder.decode()
if (final) {
controller.enqueue(final)
}
}
}

/**
* Creates a TransformStream that validates and decodes UTF-8 text
*/
export function createUTF8Stream() {
return new TransformStream(new UTF8StreamTransformer())
}
109 changes: 109 additions & 0 deletions src/files/tsv.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import { assert, assertEquals, assertObjectMatch } from '@std/assert'
import { pathToFile } from './filetree.ts'
import { loadTSV } from './tsv.ts'
import { streamFromString } from '../tests/utils.ts'
import { ColumnsMap } from '../types/columns.ts'

Deno.test('TSV loading', async (t) => {
await t.step('Empty file produces empty map', async () => {
const file = pathToFile('/empty.tsv')
file.stream = streamFromString('')

const map = await loadTSV(file)
// map.size looks for a column called map, so work around it
assertEquals(Object.keys(map).length, 0)
})

await t.step('Single row file produces header-only map', async () => {
const file = pathToFile('/single_row.tsv')
file.stream = streamFromString('a\tb\tc\n')

const map = await loadTSV(file)
assertEquals(map.a, [])
assertEquals(map.b, [])
assertEquals(map.c, [])
})

await t.step('Single column file produces single column map', async () => {
const file = pathToFile('/single_column.tsv')
file.stream = streamFromString('a\n1\n2\n3\n')

const map = await loadTSV(file)
assertEquals(map.a, ['1', '2', '3'])
})

await t.step('Missing final newline is ignored', async () => {
const file = pathToFile('/missing_newline.tsv')
file.stream = streamFromString('a\n1\n2\n3')

const map = await loadTSV(file)
assertEquals(map.a, ['1', '2', '3'])
})

await t.step('Empty row throws issue', async () => {
const file = pathToFile('/empty_row.tsv')
file.stream = streamFromString('a\tb\tc\n1\t2\t3\n\n4\t5\t6\n')

try {
await loadTSV(file)
} catch (e: any) {
assertObjectMatch(e, { key: 'TSV_EMPTY_LINE', line: 3 })
}
})

await t.step('Mismatched row length throws issue', async () => {
const file = pathToFile('/mismatched_row.tsv')
file.stream = streamFromString('a\tb\tc\n1\t2\t3\n4\t5\n')

try {
await loadTSV(file)
} catch (e: any) {
assertObjectMatch(e, { key: 'TSV_EQUAL_ROWS', line: 3 })
}
})

await t.step('maxRows limits the number of rows read', async () => {
const file = pathToFile('/long.tsv')
// Use 1500 to avoid overlap with default initial capacity
const text = 'a\tb\tc\n' + '1\t2\t3\n'.repeat(1500)
file.stream = streamFromString(text)

let map = await loadTSV(file, 0)
assertEquals(map.a, [])
assertEquals(map.b, [])
assertEquals(map.c, [])

// Clear memoization cache. We currently do not key on maxRows.
loadTSV.cache.clear()
file.stream = streamFromString(text)
map = await loadTSV(file, 1)
assertEquals(map.a, ['1'])
assertEquals(map.b, ['2'])
assertEquals(map.c, ['3'])

loadTSV.cache.clear()
file.stream = streamFromString(text)
map = await loadTSV(file, 2)
assertEquals(map.a, ['1', '1'])
assertEquals(map.b, ['2', '2'])
assertEquals(map.c, ['3', '3'])

loadTSV.cache.clear()
file.stream = streamFromString(text)
map = await loadTSV(file, -1)
assertEquals(map.a, Array(1500).fill('1'))
assertEquals(map.b, Array(1500).fill('2'))
assertEquals(map.c, Array(1500).fill('3'))

loadTSV.cache.clear()
// Check that maxRows does not truncate shorter files
file.stream = streamFromString('a\tb\tc\n1\t2\t3\n4\t5\t6\n7\t8\t9\n')
map = await loadTSV(file, 4)
assertEquals(map.a, ['1', '4', '7'])
assertEquals(map.b, ['2', '5', '8'])
assertEquals(map.c, ['3', '6', '9'])
})

// Tests will have populated the memoization cache
await loadTSV.cache.clear()
})
Loading
Loading