Skip to content

Commit

Permalink
Generate Web ReadableStream by converting Node.js Readable
Browse files Browse the repository at this point in the history
  • Loading branch information
Borewit committed Jan 21, 2025
1 parent a29bcc2 commit b4be9c7
Show file tree
Hide file tree
Showing 3 changed files with 2,515 additions and 3,874 deletions.
4 changes: 2 additions & 2 deletions test/metadata-parsers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import fs from 'node:fs';

import * as mm from '../lib/index.js';
import type { IAudioMetadata, IOptions } from '../lib/index.js';
import { makeReadableByteFileStream } from './util.js';
import { makeByteReadableStreamFromFile } from './util.js';

type ParseFileMethod = (skipTest: () => void, filePath: string, mimeType?: string, options?: IOptions) => Promise<IAudioMetadata>;

Expand Down Expand Up @@ -39,7 +39,7 @@ export const Parsers: IParser[] = [
description: 'parseWebStream',
webStream: true,
initParser: async (skipTest, filePath: string, mimeType?: string, options?: IOptions) => {
const webStream = await makeReadableByteFileStream(filePath);
const webStream = await makeByteReadableStreamFromFile(filePath);
try {
return await mm.parseWebStream(webStream.stream, {mimeType: mimeType, size: webStream.fileSize}, options);
} finally {
Expand Down
96 changes: 58 additions & 38 deletions test/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import { Readable } from 'node:stream';
import path from 'node:path';
import { fileURLToPath } from 'node:url';
import { createReadStream } from 'node:fs';
import fs from 'node:fs/promises';
import { ReadableStream } from 'node:stream/web';
import type {ReadableByteStreamController, ReadableStreamBYOBRequest} from 'node:stream/web';

const filename = fileURLToPath(import.meta.url);
const dirname = path.dirname(filename);
Expand All @@ -24,52 +25,71 @@ export class SourceStream extends Readable {
}
}

export async function makeReadableByteFileStream(filename: string, delay = 0): Promise<{ fileSize: number, stream: ReadableStream<Uint8Array>, closeFile: () => Promise<void> }> {
export async function makeByteReadableStreamFromFile(filename: string): Promise<{ fileSize: number, stream: ReadableStream<Uint8Array>, closeFile: () => Promise<void> }> {

let position = 0;
const fileInfo = await fs.stat(filename);
const fileHandle = await fs.open(filename, 'r');
const nodeStream = createReadStream(filename);

return {
fileSize: fileInfo.size,
stream: new ReadableStream({
type: 'bytes',

async pull(controller) {

// @ts-ignore
const view = controller.byobRequest.view;

setTimeout(async () => {
try {
const {bytesRead} = await fileHandle.read(view, 0, view.byteLength, position);
if (bytesRead === 0) {
await fileHandle.close();
controller.close();
// @ts-ignore
controller.byobRequest.respond(0);
} else {
position += bytesRead;
// @ts-ignore
controller.byobRequest.respond(bytesRead);
}
} catch (err) {
controller.error(err);
await fileHandle.close();
stream: makeByteReadableStreamFromNodeReadable(nodeStream),
closeFile: () => Promise.resolve()
};
}

function makeByteReadableStreamFromNodeReadable(nodeReadable: Readable): ReadableStream<Uint8Array> {
return new ReadableStream<Uint8Array>({
type: 'bytes',
start(controller: ReadableByteStreamController) {
const onData = (chunk: Buffer) => {
if (controller.byobRequest) {
const view = (controller.byobRequest as ReadableStreamBYOBRequest).view;
const bytesToCopy = Math.min(view.byteLength, chunk.byteLength);

new Uint8Array(view.buffer, view.byteOffset, view.byteLength)
.set(new Uint8Array(chunk.buffer, chunk.byteOffset, bytesToCopy));

(controller.byobRequest as ReadableStreamBYOBRequest).respond(bytesToCopy);

if (bytesToCopy < chunk.byteLength) {
controller.enqueue(chunk.subarray(bytesToCopy));
}
}, delay);
},
} else {
controller.enqueue(new Uint8Array(chunk));
}
};

const onEnd = () => {
controller.close();
cleanup();
};

const onError = (err: Error) => {
controller.error(err);
cleanup();
};

cancel() {
return fileHandle.close();
},
const cleanup = () => {
nodeReadable.off('data', onData);
nodeReadable.off('end', onEnd);
nodeReadable.off('error', onError);
};

autoAllocateChunkSize: 1024
}),
closeFile: () => {
return fileHandle.close();
nodeReadable.on('data', onData);
nodeReadable.on('end', onEnd);
nodeReadable.on('error', onError);
nodeReadable.resume();
},
pull(controller) {
if (nodeReadable.isPaused()) {
nodeReadable.resume();
}
},
cancel(reason) {
nodeReadable.destroy(reason);
}
};
});
}


export const samplePath = path.join(dirname, 'samples');
Loading

0 comments on commit b4be9c7

Please sign in to comment.