diff --git a/client/package.json b/client/package.json index c4ea2f1..1346318 100644 --- a/client/package.json +++ b/client/package.json @@ -13,5 +13,8 @@ "bare-client-custom": "file:../bare-client-custom", "firebase": "^10.1.0", "protocol": "workspace:*" + }, + "devDependencies": { + "@types/node": "^20.4.10" } } diff --git a/client/src/AdriftClient.ts b/client/src/AdriftClient.ts index f7a6094..046b196 100644 --- a/client/src/AdriftClient.ts +++ b/client/src/AdriftClient.ts @@ -7,11 +7,92 @@ import { ReadyStateCallback, WebSocketImpl, } from "bare-client-custom"; +import { ReadableStream, TransformStream } from "node:stream/web"; +import { MAX_CHUNK_SIZE } from "protocol"; import { Connection } from "./Connection"; // https://fetch.spec.whatwg.org/#statuses const NULL_BODY_STATUSES = [101, 103, 204, 205, 304]; +/** + * given a completely unknown body type, returns a stream that yields Uint8Arrays + * below MAX_CHUNK_SIZE. + */ +function createBodyStream( + body: BodyInit | null +): ReadableStream | null { + if (body === null) return null; + + const transformer = () => + new TransformStream({ + transform: async ( + chunk: any, + controller: TransformStreamDefaultController + ) => { + // attempt to transform a couple types into an ArrayBuffer + if (typeof chunk === "string") { + chunk = new TextEncoder().encode(chunk); + } + if (chunk instanceof Blob) { + chunk = await chunk.arrayBuffer(); + } + if (ArrayBuffer.isView(chunk)) { + chunk = chunk.buffer.slice( + chunk.byteOffset, + chunk.byteOffset + chunk.byteLength + ); + } + + // if none of those worked, give up. + if (!(chunk instanceof ArrayBuffer)) { + console.error({ chunk }); + throw new Error("Invalid type read from body stream: " + chunk); + } + + let current = null; + let remaining = chunk; + do { + current = remaining.slice(0, MAX_CHUNK_SIZE); + remaining = remaining.slice(MAX_CHUNK_SIZE); + controller.enqueue(new Uint8Array(current)); + } while (remaining.byteLength > 0); + }, + }); + + if (body instanceof ReadableStream) { + return body.pipeThrough(transformer()); + } + + if (body instanceof ArrayBuffer) { + if (body.byteLength == 0) { + return null; + } + let remaining = body; + return new ReadableStream({ + type: "bytes", + pull: (controller) => { + if (remaining.byteLength <= 0) { + return controller.close(); + } + const current = remaining.slice(0, MAX_CHUNK_SIZE); + remaining = remaining.slice(MAX_CHUNK_SIZE); + controller.enqueue(new Uint8Array(current)); + }, + }); + } + + if (body instanceof Blob) { + // @ts-expect-error + return body.stream().pipeThrough(transformer()); + } + + if (body instanceof FormData) { + throw new Error("formdata todo"); + } + + throw new Error("Unexpected body type: " + body); +} + export class AdriftBareClient extends Client { constructor(private connection: Connection) { super(); @@ -25,20 +106,15 @@ export class AdriftBareClient extends Client { duplex: string | undefined, signal: AbortSignal | undefined ): Promise { - if ( - body !== null && - typeof body !== "undefined" && - typeof body !== "string" - ) { - console.log({ body }); - throw new Error("bare-client-custom passed an unexpected body type"); - } - let { payload, body: respRawBody } = await this.connection.httprequest({ - method, - requestHeaders, - body, - remote, - }); + const bodyStream = createBodyStream(body); + let { payload, body: respRawBody } = await this.connection.httprequest( + { + method, + requestHeaders, + remote, + }, + bodyStream + ); const headers = new Headers(); for (const [header, values] of Object.entries(payload.headers)) { for (const value of values) { diff --git a/client/src/Connection.ts b/client/src/Connection.ts index 9e87dec..e9b0a6c 100644 --- a/client/src/Connection.ts +++ b/client/src/Connection.ts @@ -1,3 +1,4 @@ +import { ReadableStream } from "node:stream/web"; import { C2SRequestType, C2SRequestTypes, @@ -130,8 +131,8 @@ export class Connection { async send( requestID: number, - data: ArrayBuffer | Blob, - type: C2SRequestType + type: C2SRequestType, + data?: ArrayBuffer | Blob ): Promise { let header = new ArrayBuffer(2 + 1); let view = new DataView(header); @@ -143,21 +144,35 @@ export class Connection { view.setUint8(cursor, type); cursor += 1; - let buf = await new Blob([header, data]).arrayBuffer(); + let buf = header; + if (data) { + buf = await new Blob([header, data]).arrayBuffer(); + } this.transport.send(buf); - console.log(buf); } httprequest( - data: HTTPRequestPayload + data: HTTPRequestPayload, + body: ReadableStream | null ): Promise<{ payload: HTTPResponsePayload; body: ArrayBuffer }> { let json = JSON.stringify(data); return new Promise(async (resolve) => { let seq = this.nextSeq(); this.requestCallbacks[seq] = resolve; - await this.send(seq, new Blob([json]), C2SRequestTypes.HTTPRequest); + await this.send(seq, C2SRequestTypes.HTTPRequestStart, new Blob([json])); + + if (body) { + for await (const chunk of body) { + await this.send( + seq, + C2SRequestTypes.HTTPRequestChunk, + new Uint8Array(chunk) + ); + } + } + await this.send(seq, C2SRequestTypes.HTTPRequestEnd); }); } @@ -179,8 +194,8 @@ export class Connection { this.send( seq, - new TextEncoder().encode(payloadJSON), - C2SRequestTypes.WSOpen + C2SRequestTypes.WSOpen, + new TextEncoder().encode(payloadJSON) ).catch((e) => { console.error(e); closeWithError(); @@ -206,23 +221,23 @@ export class Connection { if (typeof data === "string") { this.send( seq, - new TextEncoder().encode(data), - C2SRequestTypes.WSSendText + C2SRequestTypes.WSSendText, + new TextEncoder().encode(data) ).catch(cleanup); return; } if (data instanceof ArrayBuffer) { - this.send(seq, data, C2SRequestTypes.WSSendBinary).catch(cleanup); + this.send(seq, C2SRequestTypes.WSSendBinary, data).catch(cleanup); return; } if (ArrayBuffer.isView(data)) { this.send( seq, + C2SRequestTypes.WSSendBinary, data.buffer.slice( data.byteOffset, data.byteOffset + data.byteLength - ), - C2SRequestTypes.WSSendBinary + ) ).catch(cleanup); return; } @@ -234,8 +249,8 @@ export class Connection { const payloadJSON = JSON.stringify(payload); this.send( seq, - new TextEncoder().encode(payloadJSON), - C2SRequestTypes.WSClose + C2SRequestTypes.WSClose, + new TextEncoder().encode(payloadJSON) ).catch((e) => { // At this point there is nothing left to clean up console.error(e); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9436508..9660cc7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1,4 +1,4 @@ -lockfileVersion: '6.1' +lockfileVersion: '6.0' settings: autoInstallPeers: true @@ -37,6 +37,9 @@ importers: chalk: specifier: ^5.3.0 version: 5.3.0 + crypto-js: + specifier: ^4.1.1 + version: 4.1.1 domhandler: specifier: ^5.0.3 version: 5.0.3 @@ -62,6 +65,9 @@ importers: specifier: ^2.6.0 version: 2.6.0 devDependencies: + '@types/crypto-js': + specifier: ^4.1.1 + version: 4.1.1 '@types/mime-db': specifier: ^1.43.1 version: 1.43.1 @@ -81,8 +87,8 @@ importers: Ultraviolet: devDependencies: '@tomphttp/bare-client': - specifier: file:../bare-client-custom/ - version: file:bare-client-custom + specifier: ^2.2.0-alpha + version: 2.2.0-alpha css-tree: specifier: ^2.3.1 version: 2.3.1 @@ -165,11 +171,15 @@ importers: protocol: specifier: workspace:* version: link:../protocol + devDependencies: + '@types/node': + specifier: ^20.4.10 + version: 20.4.10 corium: dependencies: '@rollup/browser': - specifier: ^3.28.0 + specifier: ^3.17.2 version: 3.28.0 '@swc/helpers': specifier: ^0.4.14 @@ -3562,7 +3572,6 @@ packages: /@tomphttp/bare-client@2.2.0-alpha: resolution: {integrity: sha512-xhcflOpwr92tkpp8SoDhB3nK3LHMBIjx+vgow37XobQew2k0/mXSxmaU7BsDFpOIa1CcLCEsR8gWn0v7Cy9+7Q==} - dev: false /@tomphttp/bare-server-node@2.0.1: resolution: {integrity: sha512-L42TC/AldYRFBRZSxhkI0FC5TL8EC/NAsepNC/cWYTTiHQJ7mGg/vdTqNz8ShTYHr6LTHYkuD3/81nhX55SYtA==} @@ -3607,6 +3616,10 @@ packages: '@types/node': 20.4.10 dev: true + /@types/crypto-js@4.1.1: + resolution: {integrity: sha512-BG7fQKZ689HIoc5h+6D2Dgq1fABRa0RbBWKBd9SP/MVRVXROflpm5fhwyATX5duFmbStzyzyycPB8qUYKDH3NA==} + dev: true + /@types/css-tree@2.0.0: resolution: {integrity: sha512-mY2sXRLBnUPMYw6mkOT+6dABeaNxAEKZz6scE9kQPNJx8fKe1fOsm8Honl7+xFYe6TKX8WNk2+7oMp2vBArJ9Q==} dev: true @@ -3760,6 +3773,7 @@ packages: /@types/uuid@9.0.2: resolution: {integrity: sha512-kNnC1GFBLuhImSnV7w4njQkUiJi0ZXUycu1rUaouPqiKlXkh77JKgdRnTAp1x5eBwcIwbtI+3otwzuIDEuDoxQ==} + dev: false /@types/webrtc@0.0.36: resolution: {integrity: sha512-tYFarc92EluXU7XyRmWbkQXSbZIOHTdDOudFPal9u/TNTQuouWpIHV/2o9bNAdqvTJFjLJh/zflCOLWbL30tEQ==} @@ -5185,6 +5199,10 @@ packages: shebang-command: 2.0.0 which: 2.0.2 + /crypto-js@4.1.1: + resolution: {integrity: sha512-o2JlM7ydqd3Qk9CA0L4NL6mTzU2sdx96a+oOfPu8Mkl/PK51vSyoi8/rQ8NknZtk44vq15lmhAj9CIAGwgeWKw==} + dev: false + /css-tree@2.3.1: resolution: {integrity: sha512-6Fv1DV/TYw//QF5IzQdqsNDjx/wc8TrMBZsqjL9eW01tWb7R7k/mq+/VXfJCl7SoD5emsJop9cOByJZfs8hYIw==} engines: {node: ^10 || ^12.20.0 || ^14.13.0 || >=15.0.0} @@ -10106,6 +10124,7 @@ packages: /uuid@9.0.0: resolution: {integrity: sha512-MXcSTerfPa4uqyzStbRoTgt5XIe3x5+42+q1sDuy3R5MDk66URdLMOZe5aPX/SQd+kuYAh0FdP/pO28IkQyTeg==} hasBin: true + dev: false /v8-compile-cache-lib@3.0.1: resolution: {integrity: sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==} @@ -10556,15 +10575,14 @@ packages: file:bare-client-custom: resolution: {directory: bare-client-custom, type: directory} name: bare-client-custom - version: 2.2.0-alpha dependencies: '@types/uuid': 9.0.2 uuid: 9.0.0 + dev: false file:corium: resolution: {directory: corium, type: directory} name: corium - version: 1.0.0-alpha.2 dependencies: '@rollup/browser': 3.28.0 '@swc/helpers': 0.4.14 diff --git a/protocol/src/index.ts b/protocol/src/index.ts index c8d775a..21dd3b5 100644 --- a/protocol/src/index.ts +++ b/protocol/src/index.ts @@ -1,10 +1,12 @@ export type ObjectValues = T[keyof T]; export const C2SRequestTypes = { - HTTPRequest: 0, - WSOpen: 1, - WSClose: 2, - WSSendText: 3, - WSSendBinary: 4, + HTTPRequestStart: 0, + HTTPRequestChunk: 1, + HTTPRequestEnd: 2, + WSOpen: 3, + WSClose: 4, + WSSendText: 5, + WSSendBinary: 6, } as const; export type C2SRequestType = ObjectValues; @@ -13,9 +15,9 @@ export const S2CRequestTypes = { HTTPResponseChunk: 1, HTTPResponseEnd: 2, WSOpen: 3, - WSDataText: 4, - WSDataBinary: 5, - WSClose: 6, + WSClose: 4, + WSDataText: 5, + WSDataBinary: 6, WSError: 7, } as const; export type S2CRequestType = ObjectValues; @@ -25,7 +27,6 @@ export type ProtoBareHeaders = Record; export type HTTPRequestPayload = { method: string; requestHeaders: ProtoBareHeaders; - body: string | null; remote: URL; };