diff --git a/client/src/AdriftClient.ts b/client/src/AdriftClient.ts index 1de1ea8..472e099 100644 --- a/client/src/AdriftClient.ts +++ b/client/src/AdriftClient.ts @@ -190,17 +190,19 @@ export class AdriftBareClient extends Client { onReadyState(WebSocket.CLOSED); ws.dispatchEvent(new CloseEvent("close", { code, reason, wasClean })); }, - (data) => { - console.log({ data, binaryType: ws.binaryType }); - if (data instanceof ArrayBuffer) { - (data as any).__proto__ = arrayBufferImpl.prototype; + async (stream, isBinary) => { + let data: ArrayBuffer | string = await new Response( + stream + ).arrayBuffer(); + if (!isBinary) { + try { + data = new TextDecoder().decode(); + } catch (e) { + console.error(e); + return; + } } - - ws.dispatchEvent( - new MessageEvent("message", { - data, - }) - ); + ws.dispatchEvent(new MessageEvent("message", { data })); }, (message: string) => { console.log({ message }); diff --git a/client/src/Connection.ts b/client/src/Connection.ts index 3e3bceb..4f3cca6 100644 --- a/client/src/Connection.ts +++ b/client/src/Connection.ts @@ -19,7 +19,7 @@ import { type OpenWSMeta = { onopen: () => void; onclose: (code: number, reason: string, wasClean: boolean) => void; - onmessage: (data: any) => void; + onmessage: (data: ReadableStream, isBinary: boolean) => void; onerror: (message: string) => void; }; @@ -42,6 +42,7 @@ export class Connection { openRequestStreams: Record> = {}; openingSockets: Record = {}; openSockets: Record = {}; + wsMsgStreams: Record> = {}; counter: number = 0; @@ -132,18 +133,45 @@ export class Connection { setTimeout(() => socketMeta.onopen()); break; - case S2CRequestTypes.WSDataText: { + case S2CRequestTypes.WSBinaryStart: + case S2CRequestTypes.WSTextStart: { const socketMeta = this.openSockets[requestID]; if (!socketMeta) return; - setTimeout(() => socketMeta.onmessage(msgText())); + const stream = new ReadableStream({ + start: (controller) => { + this.wsMsgStreams[requestID] = controller; + }, + pull: (constroller) => { + // not needed + }, + cancel: () => { + // TODO + }, + }); + setTimeout(() => + socketMeta.onmessage( + stream, + requestType === S2CRequestTypes.WSBinaryStart + ? true + : requestType === S2CRequestTypes.WSTextStart + ? false + : (() => { + throw new Error("unreachable"); + })() + ) + ); break; } - case S2CRequestTypes.WSDataBinary: { - const socketMeta = this.openSockets[requestID]; - if (!socketMeta) return; - let slice = data.slice(cursor); - setTimeout(() => socketMeta.onmessage(slice)); + case S2CRequestTypes.WSDataChunk: { + const stream = this.wsMsgStreams[requestID]; + stream?.enqueue(new Uint8Array(data.slice(cursor))); + break; + } + + case S2CRequestTypes.WSDataEnd: { + const stream = this.wsMsgStreams[requestID]; + stream?.close(); break; } @@ -244,7 +272,7 @@ export class Connection { protocols: string | string[], onopen: () => void, onclose: (code: number, reason: string, wasClean: boolean) => void, - onmessage: (data: any) => void, + onmessage: (data: ReadableStream, isBinary: boolean) => void, onerror: (message: string) => void, arrayBufferImpl: ArrayBufferConstructor ): { diff --git a/protocol/src/index.ts b/protocol/src/index.ts index 0247292..282842a 100644 --- a/protocol/src/index.ts +++ b/protocol/src/index.ts @@ -16,9 +16,11 @@ export const S2CRequestTypes = { HTTPResponseEnd: 2, WSOpen: 3, WSClose: 4, - WSDataText: 5, - WSDataBinary: 6, - WSError: 7, + WSTextStart: 5, + WSBinaryStart: 6, + WSDataChunk: 7, + WSDataEnd: 8, + WSError: 9, } as const; export type S2CRequestType = ObjectValues; @@ -60,6 +62,6 @@ export const S2C_HELLO_OK = ":3"; export const C2S_HELLO = "haiii "; export const S2C_HELLO_ERR = ":< "; -export const PROTOCOL_VERSION = "1.0"; +export const PROTOCOL_VERSION = "2.0"; export { Transport } from "./Transport"; diff --git a/server/src/server.ts b/server/src/server.ts index a2c15eb..a8d58ac 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -156,35 +156,6 @@ export class AdriftServer { }; } - _sendJSONRes(seq: number, op: S2CRequestType, payload: any) { - const payloadBuffer = new TextEncoder().encode(JSON.stringify(payload)); - const buf = new ArrayBuffer(2 + 1 + payloadBuffer.length); - const dataView = new DataView(buf); - let cursor = 0; - dataView.setUint16(cursor, seq); - cursor += 2; - dataView.setUint8(cursor, op); - cursor += 1; - new Uint8Array(buf).set(payloadBuffer, cursor); - this.send(buf); - } - - sendHTTPResponseStart(seq: number, payload: HTTPResponsePayload) { - this._sendJSONRes(seq, S2CRequestTypes.HTTPResponseStart, payload); - } - - sendHTTPResponseChunk(seq: number, chunk: Uint8Array) { - const buf = new ArrayBuffer(2 + 1 + chunk.byteLength); - const dataView = new DataView(buf); - let cursor = 0; - dataView.setUint16(cursor, seq); - cursor += 2; - dataView.setUint8(cursor, S2CRequestTypes.HTTPResponseChunk); - cursor += 1; - new Uint8Array(buf).set(chunk, cursor); - this.send(buf); - } - _sendSimpleRes(seq: number, op: S2CRequestType) { const buf = new ArrayBuffer(2 + 1); const dataView = new DataView(buf); @@ -195,6 +166,35 @@ export class AdriftServer { this.send(buf); } + _sendBufRes(seq: number, op: S2CRequestType, payload: Uint8Array) { + const payloadArr = new Uint8Array(payload); + const buf = new ArrayBuffer(2 + 1 + payloadArr.length); + const dataView = new DataView(buf); + let cursor = 0; + dataView.setUint16(cursor, seq); + cursor += 2; + dataView.setUint8(cursor, op); + cursor += 1; + new Uint8Array(buf).set(payloadArr, cursor); + this.send(buf); + } + + _sendJSONRes(seq: number, op: S2CRequestType, payload: any) { + this._sendBufRes( + seq, + op, + new TextEncoder().encode(JSON.stringify(payload)) + ); + } + + sendHTTPResponseStart(seq: number, payload: HTTPResponsePayload) { + this._sendJSONRes(seq, S2CRequestTypes.HTTPResponseStart, payload); + } + + sendHTTPResponseChunk(seq: number, chunk: Uint8Array) { + this._sendBufRes(seq, S2CRequestTypes.HTTPResponseChunk, chunk); + } + sendHTTPResponseEnd(seq: number) { this._sendSimpleRes(seq, S2CRequestTypes.HTTPResponseEnd); } @@ -211,28 +211,18 @@ export class AdriftServer { this._sendJSONRes(seq, S2CRequestTypes.WSError, payload); } - sendWSText(seq: number, textEncoded: ArrayBuffer) { - const buf = new ArrayBuffer(2 + 1 + textEncoded.byteLength); - const dataView = new DataView(buf); - let cursor = 0; - dataView.setUint16(cursor, seq); - cursor += 2; - dataView.setUint8(cursor, S2CRequestTypes.WSDataText); - cursor += 1; - new Uint8Array(buf).set(new Uint8Array(textEncoded), cursor); - this.send(buf); - } - - sendWSBinary(seq: number, msg: ArrayBuffer) { - const buf = new ArrayBuffer(2 + 1 + msg.byteLength); - const dataView = new DataView(buf); - let cursor = 0; - dataView.setUint16(cursor, seq); - cursor += 2; - dataView.setUint8(cursor, S2CRequestTypes.WSDataBinary); - cursor += 1; - new Uint8Array(buf).set(new Uint8Array(msg), cursor); - this.send(buf); + streamWSData(seq: number, isBinary: boolean, textEncoded: ArrayBuffer) { + this._sendSimpleRes( + seq, + isBinary ? S2CRequestTypes.WSBinaryStart : S2CRequestTypes.WSTextStart + ); + let remaining = textEncoded; + do { + const chunk = remaining.slice(0, MAX_CHUNK_SIZE); + remaining = remaining.slice(MAX_CHUNK_SIZE); + this._sendBufRes(seq, S2CRequestTypes.WSDataChunk, new Uint8Array(chunk)); + } while (remaining.byteLength > 0); + this._sendSimpleRes(seq, S2CRequestTypes.WSDataEnd); } async onMsg(msg: ArrayBuffer) { @@ -348,20 +338,20 @@ export class AdriftServer { // web ws: first arg is an event, event.data is string if text or // arraybuffer|blob depending on binaryType. if (dataOrEvent instanceof ArrayBuffer) { - if (isBinary) { - this.sendWSBinary(seq, dataOrEvent); - return; - } - this.sendWSText(seq, dataOrEvent); + this.streamWSData(seq, Boolean(isBinary), dataOrEvent); return; } // unless we set binaryType incorrectly, we should be on the web here. if (typeof dataOrEvent.data === "string") { - this.sendWSText(seq, new TextEncoder().encode(dataOrEvent.data)); + this.streamWSData( + seq, + false, + new TextEncoder().encode(dataOrEvent.data) + ); return; } if (dataOrEvent.data instanceof ArrayBuffer) { - this.sendWSBinary(seq, dataOrEvent.data); + this.streamWSData(seq, true, dataOrEvent.data); return; } console.error({ dataOrEvent, isBinary });