serverside WS chunking impl

This commit is contained in:
Spencer Pogorzelski 2023-08-19 12:55:10 -07:00
parent d131afc85e
commit f67aa9e60f
4 changed files with 103 additions and 81 deletions

View file

@ -190,17 +190,19 @@ export class AdriftBareClient extends Client {
onReadyState(WebSocket.CLOSED); onReadyState(WebSocket.CLOSED);
ws.dispatchEvent(new CloseEvent("close", { code, reason, wasClean })); ws.dispatchEvent(new CloseEvent("close", { code, reason, wasClean }));
}, },
(data) => { async (stream, isBinary) => {
console.log({ data, binaryType: ws.binaryType }); let data: ArrayBuffer | string = await new Response(
if (data instanceof ArrayBuffer) { stream
(data as any).__proto__ = arrayBufferImpl.prototype; ).arrayBuffer();
if (!isBinary) {
try {
data = new TextDecoder().decode();
} catch (e) {
console.error(e);
return;
} }
}
ws.dispatchEvent( ws.dispatchEvent(new MessageEvent("message", { data }));
new MessageEvent("message", {
data,
})
);
}, },
(message: string) => { (message: string) => {
console.log({ message }); console.log({ message });

View file

@ -19,7 +19,7 @@ import {
type OpenWSMeta = { type OpenWSMeta = {
onopen: () => void; onopen: () => void;
onclose: (code: number, reason: string, wasClean: boolean) => void; onclose: (code: number, reason: string, wasClean: boolean) => void;
onmessage: (data: any) => void; onmessage: (data: ReadableStream, isBinary: boolean) => void;
onerror: (message: string) => void; onerror: (message: string) => void;
}; };
@ -42,6 +42,7 @@ export class Connection {
openRequestStreams: Record<number, ReadableStreamDefaultController<any>> = {}; openRequestStreams: Record<number, ReadableStreamDefaultController<any>> = {};
openingSockets: Record<number, OpenWSMeta> = {}; openingSockets: Record<number, OpenWSMeta> = {};
openSockets: Record<number, OpenWSMeta> = {}; openSockets: Record<number, OpenWSMeta> = {};
wsMsgStreams: Record<number, ReadableStreamDefaultController<any>> = {};
counter: number = 0; counter: number = 0;
@ -132,18 +133,45 @@ export class Connection {
setTimeout(() => socketMeta.onopen()); setTimeout(() => socketMeta.onopen());
break; break;
case S2CRequestTypes.WSDataText: { case S2CRequestTypes.WSBinaryStart:
case S2CRequestTypes.WSTextStart: {
const socketMeta = this.openSockets[requestID]; const socketMeta = this.openSockets[requestID];
if (!socketMeta) return; if (!socketMeta) return;
setTimeout(() => socketMeta.onmessage(msgText())); const stream = new ReadableStream({
start: (controller) => {
this.wsMsgStreams[requestID] = controller;
},
pull: (constroller) => {
// not needed
},
cancel: () => {
// TODO
},
});
setTimeout(() =>
socketMeta.onmessage(
stream,
requestType === S2CRequestTypes.WSBinaryStart
? true
: requestType === S2CRequestTypes.WSTextStart
? false
: (() => {
throw new Error("unreachable");
})()
)
);
break; break;
} }
case S2CRequestTypes.WSDataBinary: { case S2CRequestTypes.WSDataChunk: {
const socketMeta = this.openSockets[requestID]; const stream = this.wsMsgStreams[requestID];
if (!socketMeta) return; stream?.enqueue(new Uint8Array(data.slice(cursor)));
let slice = data.slice(cursor); break;
setTimeout(() => socketMeta.onmessage(slice)); }
case S2CRequestTypes.WSDataEnd: {
const stream = this.wsMsgStreams[requestID];
stream?.close();
break; break;
} }
@ -244,7 +272,7 @@ export class Connection {
protocols: string | string[], protocols: string | string[],
onopen: () => void, onopen: () => void,
onclose: (code: number, reason: string, wasClean: boolean) => void, onclose: (code: number, reason: string, wasClean: boolean) => void,
onmessage: (data: any) => void, onmessage: (data: ReadableStream, isBinary: boolean) => void,
onerror: (message: string) => void, onerror: (message: string) => void,
arrayBufferImpl: ArrayBufferConstructor arrayBufferImpl: ArrayBufferConstructor
): { ): {

View file

@ -16,9 +16,11 @@ export const S2CRequestTypes = {
HTTPResponseEnd: 2, HTTPResponseEnd: 2,
WSOpen: 3, WSOpen: 3,
WSClose: 4, WSClose: 4,
WSDataText: 5, WSTextStart: 5,
WSDataBinary: 6, WSBinaryStart: 6,
WSError: 7, WSDataChunk: 7,
WSDataEnd: 8,
WSError: 9,
} as const; } as const;
export type S2CRequestType = ObjectValues<typeof S2CRequestTypes>; export type S2CRequestType = ObjectValues<typeof S2CRequestTypes>;
@ -60,6 +62,6 @@ export const S2C_HELLO_OK = ":3";
export const C2S_HELLO = "haiii "; export const C2S_HELLO = "haiii ";
export const S2C_HELLO_ERR = ":< "; export const S2C_HELLO_ERR = ":< ";
export const PROTOCOL_VERSION = "1.0"; export const PROTOCOL_VERSION = "2.0";
export { Transport } from "./Transport"; export { Transport } from "./Transport";

View file

@ -156,35 +156,6 @@ export class AdriftServer {
}; };
} }
_sendJSONRes(seq: number, op: S2CRequestType, payload: any) {
const payloadBuffer = new TextEncoder().encode(JSON.stringify(payload));
const buf = new ArrayBuffer(2 + 1 + payloadBuffer.length);
const dataView = new DataView(buf);
let cursor = 0;
dataView.setUint16(cursor, seq);
cursor += 2;
dataView.setUint8(cursor, op);
cursor += 1;
new Uint8Array(buf).set(payloadBuffer, cursor);
this.send(buf);
}
sendHTTPResponseStart(seq: number, payload: HTTPResponsePayload) {
this._sendJSONRes(seq, S2CRequestTypes.HTTPResponseStart, payload);
}
sendHTTPResponseChunk(seq: number, chunk: Uint8Array) {
const buf = new ArrayBuffer(2 + 1 + chunk.byteLength);
const dataView = new DataView(buf);
let cursor = 0;
dataView.setUint16(cursor, seq);
cursor += 2;
dataView.setUint8(cursor, S2CRequestTypes.HTTPResponseChunk);
cursor += 1;
new Uint8Array(buf).set(chunk, cursor);
this.send(buf);
}
_sendSimpleRes(seq: number, op: S2CRequestType) { _sendSimpleRes(seq: number, op: S2CRequestType) {
const buf = new ArrayBuffer(2 + 1); const buf = new ArrayBuffer(2 + 1);
const dataView = new DataView(buf); const dataView = new DataView(buf);
@ -195,6 +166,35 @@ export class AdriftServer {
this.send(buf); this.send(buf);
} }
_sendBufRes(seq: number, op: S2CRequestType, payload: Uint8Array) {
const payloadArr = new Uint8Array(payload);
const buf = new ArrayBuffer(2 + 1 + payloadArr.length);
const dataView = new DataView(buf);
let cursor = 0;
dataView.setUint16(cursor, seq);
cursor += 2;
dataView.setUint8(cursor, op);
cursor += 1;
new Uint8Array(buf).set(payloadArr, cursor);
this.send(buf);
}
_sendJSONRes(seq: number, op: S2CRequestType, payload: any) {
this._sendBufRes(
seq,
op,
new TextEncoder().encode(JSON.stringify(payload))
);
}
sendHTTPResponseStart(seq: number, payload: HTTPResponsePayload) {
this._sendJSONRes(seq, S2CRequestTypes.HTTPResponseStart, payload);
}
sendHTTPResponseChunk(seq: number, chunk: Uint8Array) {
this._sendBufRes(seq, S2CRequestTypes.HTTPResponseChunk, chunk);
}
sendHTTPResponseEnd(seq: number) { sendHTTPResponseEnd(seq: number) {
this._sendSimpleRes(seq, S2CRequestTypes.HTTPResponseEnd); this._sendSimpleRes(seq, S2CRequestTypes.HTTPResponseEnd);
} }
@ -211,28 +211,18 @@ export class AdriftServer {
this._sendJSONRes(seq, S2CRequestTypes.WSError, payload); this._sendJSONRes(seq, S2CRequestTypes.WSError, payload);
} }
sendWSText(seq: number, textEncoded: ArrayBuffer) { streamWSData(seq: number, isBinary: boolean, textEncoded: ArrayBuffer) {
const buf = new ArrayBuffer(2 + 1 + textEncoded.byteLength); this._sendSimpleRes(
const dataView = new DataView(buf); seq,
let cursor = 0; isBinary ? S2CRequestTypes.WSBinaryStart : S2CRequestTypes.WSTextStart
dataView.setUint16(cursor, seq); );
cursor += 2; let remaining = textEncoded;
dataView.setUint8(cursor, S2CRequestTypes.WSDataText); do {
cursor += 1; const chunk = remaining.slice(0, MAX_CHUNK_SIZE);
new Uint8Array(buf).set(new Uint8Array(textEncoded), cursor); remaining = remaining.slice(MAX_CHUNK_SIZE);
this.send(buf); this._sendBufRes(seq, S2CRequestTypes.WSDataChunk, new Uint8Array(chunk));
} } while (remaining.byteLength > 0);
this._sendSimpleRes(seq, S2CRequestTypes.WSDataEnd);
sendWSBinary(seq: number, msg: ArrayBuffer) {
const buf = new ArrayBuffer(2 + 1 + msg.byteLength);
const dataView = new DataView(buf);
let cursor = 0;
dataView.setUint16(cursor, seq);
cursor += 2;
dataView.setUint8(cursor, S2CRequestTypes.WSDataBinary);
cursor += 1;
new Uint8Array(buf).set(new Uint8Array(msg), cursor);
this.send(buf);
} }
async onMsg(msg: ArrayBuffer) { async onMsg(msg: ArrayBuffer) {
@ -348,20 +338,20 @@ export class AdriftServer {
// web ws: first arg is an event, event.data is string if text or // web ws: first arg is an event, event.data is string if text or
// arraybuffer|blob depending on binaryType. // arraybuffer|blob depending on binaryType.
if (dataOrEvent instanceof ArrayBuffer) { if (dataOrEvent instanceof ArrayBuffer) {
if (isBinary) { this.streamWSData(seq, Boolean(isBinary), dataOrEvent);
this.sendWSBinary(seq, dataOrEvent);
return;
}
this.sendWSText(seq, dataOrEvent);
return; return;
} }
// unless we set binaryType incorrectly, we should be on the web here. // unless we set binaryType incorrectly, we should be on the web here.
if (typeof dataOrEvent.data === "string") { if (typeof dataOrEvent.data === "string") {
this.sendWSText(seq, new TextEncoder().encode(dataOrEvent.data)); this.streamWSData(
seq,
false,
new TextEncoder().encode(dataOrEvent.data)
);
return; return;
} }
if (dataOrEvent.data instanceof ArrayBuffer) { if (dataOrEvent.data instanceof ArrayBuffer) {
this.sendWSBinary(seq, dataOrEvent.data); this.streamWSData(seq, true, dataOrEvent.data);
return; return;
} }
console.error({ dataOrEvent, isBinary }); console.error({ dataOrEvent, isBinary });