diff --git a/client/src/AdriftClient.ts b/client/src/AdriftClient.ts index 6ef705d..723189b 100644 --- a/client/src/AdriftClient.ts +++ b/client/src/AdriftClient.ts @@ -33,6 +33,28 @@ function createBodyStream( ); } + if (body instanceof ArrayBuffer) { + if (body.byteLength == 0) { + return null; + } + let remaining = body; + return new ReadableStream({ + type: "bytes", + pull: (controller) => { + if (remaining.byteLength <= 0) { + return controller.close(); + } + const current = remaining.slice(0, MAX_CHUNK_SIZE); + remaining = remaining.slice(MAX_CHUNK_SIZE); + controller.enqueue(new Uint8Array(current)); + }, + }); + } + + if (body instanceof FormData) { + throw new Error("formdata todo"); + } + const transformer = () => new TransformStream({ transform: async ( @@ -73,32 +95,10 @@ function createBodyStream( return body.pipeThrough(transformer()); } - if (body instanceof ArrayBuffer) { - if (body.byteLength == 0) { - return null; - } - let remaining = body; - return new ReadableStream({ - type: "bytes", - pull: (controller) => { - if (remaining.byteLength <= 0) { - return controller.close(); - } - const current = remaining.slice(0, MAX_CHUNK_SIZE); - remaining = remaining.slice(MAX_CHUNK_SIZE); - controller.enqueue(new Uint8Array(current)); - }, - }); - } - if (body instanceof Blob) { return body.stream().pipeThrough(transformer()); } - if (body instanceof FormData) { - throw new Error("formdata todo"); - } - throw new Error("Unexpected body type: " + body); } @@ -139,10 +139,6 @@ export class AdriftBareClient extends Client { respBody = null; } - console.log("constructing Response", { - status: payload.status, - body: respBody?.byteLength, - }); return new Response(respBody, { status: payload.status, statusText: payload.statusText, diff --git a/client/src/Connection.ts b/client/src/Connection.ts index b26cfdb..996be0d 100644 --- a/client/src/Connection.ts +++ b/client/src/Connection.ts @@ -4,6 +4,7 @@ import { C2SWSOpenPayload, HTTPRequestPayload, HTTPResponsePayload, + ProtoBareHeaders, S2CRequestType, S2CRequestTypes, Transport, @@ -152,17 +153,22 @@ export class Connection { } httprequest( - data: HTTPRequestPayload, + data: { + method: string; + requestHeaders: ProtoBareHeaders; + remote: URL; + }, body: ReadableStream | null ): Promise<{ payload: HTTPResponsePayload; body: ArrayBuffer }> { - let json = JSON.stringify(data); + const payload: HTTPRequestPayload = { ...data, hasBody: Boolean(body) }; + let json = JSON.stringify(payload); return new Promise(async (resolve) => { let seq = this.nextSeq(); this.requestCallbacks[seq] = resolve; await this.send(seq, C2SRequestTypes.HTTPRequestStart, new Blob([json])); - if (body) { + if (payload.hasBody) { for await (const chunk of body as unknown as NodeJS.ReadableStream) { await this.send( seq, @@ -170,8 +176,8 @@ export class Connection { new Uint8Array(chunk as Uint8Array | ArrayBuffer) ); } + await this.send(seq, C2SRequestTypes.HTTPRequestEnd); } - await this.send(seq, C2SRequestTypes.HTTPRequestEnd); }); } diff --git a/protocol/src/index.ts b/protocol/src/index.ts index 21dd3b5..1d27843 100644 --- a/protocol/src/index.ts +++ b/protocol/src/index.ts @@ -28,6 +28,7 @@ export type HTTPRequestPayload = { method: string; requestHeaders: ProtoBareHeaders; remote: URL; + hasBody: boolean; }; export type HTTPResponsePayload = { diff --git a/server/src/server.ts b/server/src/server.ts index de8206f..1426c58 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -92,13 +92,20 @@ export class AdriftServer { let resp: IncomingMessage; try { - const outgoing = await (this.requestStreams[seq] = bareInitialFetch( + const outgoingPromise = bareInitialFetch( payload, abort.signal, new URL(payload.remote), options - )); - resp = await fetchResponse(outgoing); + ); + if (payload.hasBody) { + this.requestStreams[seq] = outgoingPromise; + } + const outgoingStream = await outgoingPromise; + if (!payload.hasBody) { + outgoingStream.end(); + } + resp = await fetchResponse(await outgoingPromise); } catch (e) { if (e instanceof BareError) { return bareErrorToResponse(e); @@ -243,10 +250,10 @@ export class AdriftServer { resp = bareErrorToResponse(bareError); } - const { payload, body } = resp; + const { payload, body: responseBody } = resp; this.sendHTTPResponseStart(seq, payload); - for await (const chunk of body) { + for await (const chunk of responseBody) { let chunkPart = null; let chunkRest = chunk; do {