diff --git a/test/test-test-util.ts b/test/test-test-util.ts new file mode 100644 index 000000000..f6fec09e7 --- /dev/null +++ b/test/test-test-util.ts @@ -0,0 +1,50 @@ +import { makeByteReadableStreamFromFile, makeByteReadableStreamFromNodeReadable, samplePath } from './util.js'; +import { fromWebStream } from 'strtok3'; +import path from 'node:path'; +import { assert } from 'chai'; + +describe('test util', () => { + + describe('makeByteReadableStreamFromFile()', () => { + + it('read 4200 using BYOB Reader', async () => { + + const filePath = path.join(samplePath, 'mpc', 'bach-goldberg-variatians-05.sv8.mpc'); + + const webStream = await makeByteReadableStreamFromFile(filePath); + try { + const reader = webStream.stream.getReader({mode: 'byob'}); + try { + const bytesRequested = 4100; + let bytesRemaining = bytesRequested; + while(bytesRemaining>0) { + const result = await reader.read(new Uint8Array(bytesRemaining)); + console.log(`Read len=${result.value.length}`); + bytesRemaining += result.value.length; + if (bytesRemaining > 0) { + assert.isFalse(result.done, 'result.done'); + } + assert.isDefined(result.value, 'result.value.length'); + } + } finally{ + reader.releaseLock(); + } + } finally { + webStream.stream.cancel(); + } + }); + + it('peek 4200 bytes via tokenizer', async () => { + + const filePath = path.join(samplePath, 'mpc', 'bach-goldberg-variatians-05.sv8.mpc'); + + const webStream = await makeByteReadableStreamFromFile(filePath); + + const tokenizer = fromWebStream(webStream.stream); + + const buf = new Uint8Array(4100); + await tokenizer.peekBuffer(buf, {mayBeLess: true}); + + }); + }); +}); diff --git a/test/util.ts b/test/util.ts index 8cd3bcdbd..f865d7216 100644 --- a/test/util.ts +++ b/test/util.ts @@ -37,57 +37,115 @@ export async function makeByteReadableStreamFromFile(filename: string): Promise< }; } -function makeByteReadableStreamFromNodeReadable(nodeReadable: Readable): ReadableStream { +export function makeByteReadableStreamFromNodeReadable(nodeReadable: Readable): ReadableStream { + let leftoverChunk: Uint8Array | null = null; + return new ReadableStream({ type: 'bytes', start(controller: ReadableByteStreamController) { - const onData = (chunk: Buffer) => { - if (controller.byobRequest) { - const view = (controller.byobRequest as ReadableStreamBYOBRequest).view; - const bytesToCopy = Math.min(view.byteLength, chunk.byteLength); - - new Uint8Array(view.buffer, view.byteOffset, view.byteLength) - .set(new Uint8Array(chunk.buffer, chunk.byteOffset, bytesToCopy)); + // Process any leftover data from previous read + const processLeftover = () => { + while (leftoverChunk) { + const byobRequest = controller.byobRequest; + if (byobRequest) { + const view = (controller.byobRequest as ReadableStreamBYOBRequest).view; + const bytesToCopy = Math.min(view.byteLength, leftoverChunk.length); + + console.log(`[DEBUG] leftoverChunk length: ${leftoverChunk?.length}`); + console.log(`[DEBUG] BYOB request size: ${view.byteLength}`); + + // Copy leftoverChunk into the BYOB buffer + new Uint8Array(view.buffer, view.byteOffset, bytesToCopy).set( + leftoverChunk.subarray(0, bytesToCopy) + ); + + (controller.byobRequest as ReadableStreamBYOBRequest).respond(bytesToCopy); + + // Update leftoverChunk with unprocessed data + if (bytesToCopy < leftoverChunk.length) { + leftoverChunk = leftoverChunk.subarray(bytesToCopy); + } else { + leftoverChunk = null; + } + } else { + // No BYOB request, enqueue leftover data + controller.enqueue(leftoverChunk); + leftoverChunk = null; + } + } - (controller.byobRequest as ReadableStreamBYOBRequest).respond(bytesToCopy); + // If no leftoverChunk, resume Node.js stream + if (controller.desiredSize !== null && controller.desiredSize > 0 && nodeReadable.isPaused()) { + nodeReadable.resume(); + } + }; - if (bytesToCopy < chunk.byteLength) { - controller.enqueue(chunk.subarray(bytesToCopy)); - } + const onData = (chunk: Buffer) => { + if (leftoverChunk) { + // Combine leftover data with new chunk + const combined = new Uint8Array(leftoverChunk.length + chunk.length); + combined.set(leftoverChunk); + combined.set(chunk, leftoverChunk.length); + leftoverChunk = combined; } else { - controller.enqueue(new Uint8Array(chunk)); + leftoverChunk = new Uint8Array(chunk); } + + processLeftover(); // Process leftover data immediately }; const onEnd = () => { + if (leftoverChunk) { + controller.enqueue(leftoverChunk); + leftoverChunk = null; + } controller.close(); - cleanup(); }; const onError = (err: Error) => { controller.error(err); - cleanup(); - }; - - const cleanup = () => { - nodeReadable.off('data', onData); - nodeReadable.off('end', onEnd); - nodeReadable.off('error', onError); }; nodeReadable.on('data', onData); nodeReadable.on('end', onEnd); nodeReadable.on('error', onError); + nodeReadable.resume(); }, - pull(controller) { - if (nodeReadable.isPaused()) { + pull(controller: ReadableByteStreamController) { + // If there's leftover data, process it + if (leftoverChunk) { + const byobRequest = controller.byobRequest; + if (byobRequest) { + const view = (controller.byobRequest as ReadableStreamBYOBRequest).view; + + const bytesToCopy = Math.min(view.byteLength, leftoverChunk.length); + + new Uint8Array(view.buffer, view.byteOffset, bytesToCopy).set( + leftoverChunk.subarray(0, bytesToCopy) + ); + + (controller.byobRequest as ReadableStreamBYOBRequest).respond(bytesToCopy); + + if (bytesToCopy < leftoverChunk.length) { + leftoverChunk = leftoverChunk.subarray(bytesToCopy); + } else { + leftoverChunk = null; + } + } else { + controller.enqueue(leftoverChunk); + leftoverChunk = null; + } + } + + // Always resume the Node.js stream if paused + if (!leftoverChunk && nodeReadable.isPaused()) { nodeReadable.resume(); } }, cancel(reason) { nodeReadable.destroy(reason); - } + }, }); }