client body chunking initial clientside impl

This commit is contained in:
Spencer Pogorzelski 2023-08-14 17:11:01 -07:00
parent 0ffb386048
commit e51b5d90d4
5 changed files with 158 additions and 45 deletions

View file

@ -7,11 +7,92 @@ import {
ReadyStateCallback,
WebSocketImpl,
} from "bare-client-custom";
import { ReadableStream, TransformStream } from "node:stream/web";
import { MAX_CHUNK_SIZE } from "protocol";
import { Connection } from "./Connection";
// https://fetch.spec.whatwg.org/#statuses
const NULL_BODY_STATUSES = [101, 103, 204, 205, 304];
/**
* given a completely unknown body type, returns a stream that yields Uint8Arrays
* below MAX_CHUNK_SIZE.
*/
function createBodyStream(
body: BodyInit | null
): ReadableStream<ArrayBuffer | Uint8Array> | null {
if (body === null) return null;
const transformer = () =>
new TransformStream({
transform: async (
chunk: any,
controller: TransformStreamDefaultController<Uint8Array>
) => {
// attempt to transform a couple types into an ArrayBuffer
if (typeof chunk === "string") {
chunk = new TextEncoder().encode(chunk);
}
if (chunk instanceof Blob) {
chunk = await chunk.arrayBuffer();
}
if (ArrayBuffer.isView(chunk)) {
chunk = chunk.buffer.slice(
chunk.byteOffset,
chunk.byteOffset + chunk.byteLength
);
}
// if none of those worked, give up.
if (!(chunk instanceof ArrayBuffer)) {
console.error({ chunk });
throw new Error("Invalid type read from body stream: " + chunk);
}
let current = null;
let remaining = chunk;
do {
current = remaining.slice(0, MAX_CHUNK_SIZE);
remaining = remaining.slice(MAX_CHUNK_SIZE);
controller.enqueue(new Uint8Array(current));
} while (remaining.byteLength > 0);
},
});
if (body instanceof ReadableStream) {
return body.pipeThrough(transformer());
}
if (body instanceof ArrayBuffer) {
if (body.byteLength == 0) {
return null;
}
let remaining = body;
return new ReadableStream({
type: "bytes",
pull: (controller) => {
if (remaining.byteLength <= 0) {
return controller.close();
}
const current = remaining.slice(0, MAX_CHUNK_SIZE);
remaining = remaining.slice(MAX_CHUNK_SIZE);
controller.enqueue(new Uint8Array(current));
},
});
}
if (body instanceof Blob) {
// @ts-expect-error
return body.stream().pipeThrough(transformer());
}
if (body instanceof FormData) {
throw new Error("formdata todo");
}
throw new Error("Unexpected body type: " + body);
}
export class AdriftBareClient extends Client {
constructor(private connection: Connection) {
super();
@ -25,20 +106,15 @@ export class AdriftBareClient extends Client {
duplex: string | undefined,
signal: AbortSignal | undefined
): Promise<BareResponse> {
if (
body !== null &&
typeof body !== "undefined" &&
typeof body !== "string"
) {
console.log({ body });
throw new Error("bare-client-custom passed an unexpected body type");
}
let { payload, body: respRawBody } = await this.connection.httprequest({
method,
requestHeaders,
body,
remote,
});
const bodyStream = createBodyStream(body);
let { payload, body: respRawBody } = await this.connection.httprequest(
{
method,
requestHeaders,
remote,
},
bodyStream
);
const headers = new Headers();
for (const [header, values] of Object.entries(payload.headers)) {
for (const value of <string[]>values) {