Skip to content

Commit

Permalink
debug
Browse files Browse the repository at this point in the history
  • Loading branch information
Borewit committed Jan 21, 2025
1 parent b4be9c7 commit 154b900
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 24 deletions.
50 changes: 50 additions & 0 deletions test/test-test-util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { makeByteReadableStreamFromFile, makeByteReadableStreamFromNodeReadable, samplePath } from './util.js';
import { fromWebStream } from 'strtok3';
import path from 'node:path';
import { assert } from 'chai';

describe('test util', () => {

describe('makeByteReadableStreamFromFile()', () => {

it('read 4200 using BYOB Reader', async () => {

const filePath = path.join(samplePath, 'mpc', 'bach-goldberg-variatians-05.sv8.mpc');

const webStream = await makeByteReadableStreamFromFile(filePath);
try {
const reader = webStream.stream.getReader({mode: 'byob'});
try {
const bytesRequested = 4100;
let bytesRemaining = bytesRequested;
while(bytesRemaining>0) {
const result = await reader.read(new Uint8Array(bytesRemaining));
console.log(`Read len=${result.value.length}`);
bytesRemaining += result.value.length;
if (bytesRemaining > 0) {
assert.isFalse(result.done, 'result.done');
}
assert.isDefined(result.value, 'result.value.length');
}
} finally{
reader.releaseLock();
}
} finally {
webStream.stream.cancel();
}
});

it('peek 4200 bytes via tokenizer', async () => {

const filePath = path.join(samplePath, 'mpc', 'bach-goldberg-variatians-05.sv8.mpc');

const webStream = await makeByteReadableStreamFromFile(filePath);

const tokenizer = fromWebStream(webStream.stream);

const buf = new Uint8Array(4100);
await tokenizer.peekBuffer(buf, {mayBeLess: true});

});
});
});
106 changes: 82 additions & 24 deletions test/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,57 +37,115 @@ export async function makeByteReadableStreamFromFile(filename: string): Promise<
};
}

function makeByteReadableStreamFromNodeReadable(nodeReadable: Readable): ReadableStream<Uint8Array> {
export function makeByteReadableStreamFromNodeReadable(nodeReadable: Readable): ReadableStream<Uint8Array> {
let leftoverChunk: Uint8Array | null = null;

return new ReadableStream<Uint8Array>({
type: 'bytes',
start(controller: ReadableByteStreamController) {
const onData = (chunk: Buffer) => {
if (controller.byobRequest) {
const view = (controller.byobRequest as ReadableStreamBYOBRequest).view;
const bytesToCopy = Math.min(view.byteLength, chunk.byteLength);

new Uint8Array(view.buffer, view.byteOffset, view.byteLength)
.set(new Uint8Array(chunk.buffer, chunk.byteOffset, bytesToCopy));
// Process any leftover data from previous read
const processLeftover = () => {
while (leftoverChunk) {
const byobRequest = controller.byobRequest;
if (byobRequest) {
const view = (controller.byobRequest as ReadableStreamBYOBRequest).view;
const bytesToCopy = Math.min(view.byteLength, leftoverChunk.length);

console.log(`[DEBUG] leftoverChunk length: ${leftoverChunk?.length}`);
console.log(`[DEBUG] BYOB request size: ${view.byteLength}`);

// Copy leftoverChunk into the BYOB buffer
new Uint8Array(view.buffer, view.byteOffset, bytesToCopy).set(
leftoverChunk.subarray(0, bytesToCopy)
);

(controller.byobRequest as ReadableStreamBYOBRequest).respond(bytesToCopy);

// Update leftoverChunk with unprocessed data
if (bytesToCopy < leftoverChunk.length) {
leftoverChunk = leftoverChunk.subarray(bytesToCopy);
} else {
leftoverChunk = null;
}
} else {
// No BYOB request, enqueue leftover data
controller.enqueue(leftoverChunk);
leftoverChunk = null;
}
}

(controller.byobRequest as ReadableStreamBYOBRequest).respond(bytesToCopy);
// If no leftoverChunk, resume Node.js stream
if (controller.desiredSize !== null && controller.desiredSize > 0 && nodeReadable.isPaused()) {
nodeReadable.resume();
}
};

if (bytesToCopy < chunk.byteLength) {
controller.enqueue(chunk.subarray(bytesToCopy));
}
const onData = (chunk: Buffer) => {
if (leftoverChunk) {
// Combine leftover data with new chunk
const combined = new Uint8Array(leftoverChunk.length + chunk.length);
combined.set(leftoverChunk);
combined.set(chunk, leftoverChunk.length);
leftoverChunk = combined;
} else {
controller.enqueue(new Uint8Array(chunk));
leftoverChunk = new Uint8Array(chunk);
}

processLeftover(); // Process leftover data immediately
};

const onEnd = () => {
if (leftoverChunk) {
controller.enqueue(leftoverChunk);
leftoverChunk = null;
}
controller.close();
cleanup();
};

const onError = (err: Error) => {
controller.error(err);
cleanup();
};

const cleanup = () => {
nodeReadable.off('data', onData);
nodeReadable.off('end', onEnd);
nodeReadable.off('error', onError);
};

nodeReadable.on('data', onData);
nodeReadable.on('end', onEnd);
nodeReadable.on('error', onError);

nodeReadable.resume();
},
pull(controller) {
if (nodeReadable.isPaused()) {
pull(controller: ReadableByteStreamController) {
// If there's leftover data, process it
if (leftoverChunk) {
const byobRequest = controller.byobRequest;
if (byobRequest) {
const view = (controller.byobRequest as ReadableStreamBYOBRequest).view;

const bytesToCopy = Math.min(view.byteLength, leftoverChunk.length);

new Uint8Array(view.buffer, view.byteOffset, bytesToCopy).set(
leftoverChunk.subarray(0, bytesToCopy)
);

(controller.byobRequest as ReadableStreamBYOBRequest).respond(bytesToCopy);

if (bytesToCopy < leftoverChunk.length) {
leftoverChunk = leftoverChunk.subarray(bytesToCopy);
} else {
leftoverChunk = null;
}
} else {
controller.enqueue(leftoverChunk);
leftoverChunk = null;
}
}

// Always resume the Node.js stream if paused
if (!leftoverChunk && nodeReadable.isPaused()) {
nodeReadable.resume();
}
},
cancel(reason) {
nodeReadable.destroy(reason);
}
},
});
}

Expand Down

0 comments on commit 154b900

Please sign in to comment.