From 07b0ba3a07ef6c883e6f753e0df3def5346940d6 Mon Sep 17 00:00:00 2001 From: Spencer Pogorzelski <34356756+Scoder12@users.noreply.github.com> Date: Sun, 13 Aug 2023 18:46:07 -0700 Subject: [PATCH 1/2] client side impl begin --- client/src/AdriftClient.ts | 13 +++-- client/src/Connection.ts | 104 +++++++++++++++++++++++++++++++------ 2 files changed, 95 insertions(+), 22 deletions(-) diff --git a/client/src/AdriftClient.ts b/client/src/AdriftClient.ts index 1b89b79..a1e1bd1 100644 --- a/client/src/AdriftClient.ts +++ b/client/src/AdriftClient.ts @@ -79,7 +79,7 @@ export class AdriftBareClient extends Client { const ws = new webSocketImpl("ws:null", protocols); // this will error. that's okay - let send = this.connection.wsconnect( + let { send, close } = this.connection.wsconnect( remote, () => { onReadyState(WebSocketFields.OPEN); @@ -88,8 +88,6 @@ export class AdriftBareClient extends Client { () => { onReadyState(WebSocketFields.CLOSED); ws.dispatchEvent(new Event("close")); - - // what do i do for WebSocketFields.closing? }, (data) => { ws.dispatchEvent( @@ -100,11 +98,16 @@ export class AdriftBareClient extends Client { } ); - (ws as any).__defineGetter__("send", () => (data: any) => { send(data); }); - (ws as any).__defineSetter__("send", () => { }); + // uv wraps it and we don't want that + // i can probably fix later but this is fine for now -CE + (ws as any).__defineSetter__("send", () => {}); + + (ws as any).__defineGetter__("close", (code?: number, reason?: string) => { + close(code, reason); + }); return ws; } diff --git a/client/src/Connection.ts b/client/src/Connection.ts index d268df7..0d352c2 100644 --- a/client/src/Connection.ts +++ b/client/src/Connection.ts @@ -9,8 +9,13 @@ import { } from "protocol"; export class Connection { - callbacks: Record = {}; - openStreams: Record> = {}; + requestCallbacks: Record = {}; + openRequestStreams: Record> = {}; + openingSockets: Record void>; + openSockets: Record< + number, + { onclose: () => void; onmessage: (data: any) => void } + >; counter: number = 0; @@ -18,6 +23,10 @@ export class Connection { transport.ondata = this.ondata.bind(this); } + nextSeq() { + return ++this.counter; + } + ondata(data: ArrayBuffer) { let cursor = 0; const view = new DataView(data); @@ -35,7 +44,7 @@ export class Connection { const payload = JSON.parse(decoder.decode(data.slice(cursor))); const stream = new ReadableStream({ start: (controller) => { - this.openStreams[requestID] = controller; + this.openRequestStreams[requestID] = controller; }, pull: (controller) => { // not needed @@ -44,17 +53,19 @@ export class Connection { // TODO }, }); - this.callbacks[requestID]({ payload, body: stream }); + this.requestCallbacks[requestID]({ payload, body: stream }); + delete this.requestCallbacks[requestID]; break; case S2CRequestTypes.HTTPResponseChunk: - this.openStreams[requestID]?.enqueue( + this.openRequestStreams[requestID]?.enqueue( new Uint8Array(data.slice(cursor)) ); break; case S2CRequestTypes.HTTPResponseEnd: - this.openStreams[requestID]?.close(); + this.openRequestStreams[requestID]?.close(); + delete this.openRequestStreams[requestID]; break; } } @@ -86,23 +97,82 @@ export class Connection { let json = JSON.stringify(data); return new Promise(async (resolve) => { - let id = ++this.counter; - this.callbacks[id] = resolve; - await this.send(id, new Blob([json]), C2SRequestTypes.HTTPRequest); + let seq = this.nextSeq(); + this.requestCallbacks[seq] = resolve; + await this.send(seq, new Blob([json]), C2SRequestTypes.HTTPRequest); }); } - // idk the type of data, figure it out ig - wsconnect(url: URL, onopen: () => void, onclose: () => void, onmessage: (data: any) => void): (data: any) => void { - // do the connection shit here + wsconnect( + url: URL, + onopen: () => void, + onclose: () => void, + onmessage: (data: any) => void + ): { + send: (data: any) => void; + close: (code?: number, reason?: string) => void; + } { + const payload = JSON.stringify({ url }); + let seq = this.nextSeq(); + this.send( + seq, + new TextEncoder().encode(payload), + C2SRequestTypes.WSOpen + ).catch((e) => { + console.error(e); + onclose(); + }); - onopen(); // this can't be async, just call onopen when opened + this.openingSockets[seq] = onopen; - return (data) => { - - // send "data" to the server + return { + send: (data) => { + if (!this.openSockets[seq]) { + throw new Error("send on closed socket"); + } + const cleanup = (e: any) => { + console.error(e); + delete this.openSockets[seq]; + }; + if (typeof data === "string") { + this.send( + seq, + new TextEncoder().encode(data), + C2SRequestTypes.WSSendText + ).catch(cleanup); + return; + } + if (data instanceof ArrayBuffer) { + this.send(seq, data, C2SRequestTypes.WSSendBinary).catch(cleanup); + return; + } + if (ArrayBuffer.isView(data)) { + this.send( + seq, + data.buffer.slice( + data.byteOffset, + data.byteOffset + data.byteLength + ), + C2SRequestTypes.WSSendBinary + ).catch(cleanup); + return; + } + console.error({ data }); + throw new Error("Unexpected type passed to send"); + }, + close: (code?: number, reason?: string) => { + const payload = JSON.stringify({ code, reason }); + this.send( + seq, + new TextEncoder().encode(payload), + C2SRequestTypes.WSClose + ).catch((e) => { + // At this point there is nothing left to clean up + console.error(e); + }); + delete this.openSockets[seq]; + }, }; - } } From f64cc78f9f9802a36b1b63eacc8e8056c6adb11d Mon Sep 17 00:00:00 2001 From: Spencer Pogorzelski <34356756+Scoder12@users.noreply.github.com> Date: Sun, 13 Aug 2023 19:06:52 -0700 Subject: [PATCH 2/2] WIP on websocket impl --- client/src/AdriftClient.ts | 1 - client/src/Connection.ts | 6 ++-- pnpm-lock.yaml | 37 +++++++++++++++++------- protocol/src/index.ts | 10 +++++++ server/package.json | 6 ++-- server/src/server.ts | 59 ++++++++++++++++++++++++++++++-------- 6 files changed, 91 insertions(+), 28 deletions(-) diff --git a/client/src/AdriftClient.ts b/client/src/AdriftClient.ts index a1e1bd1..143c782 100644 --- a/client/src/AdriftClient.ts +++ b/client/src/AdriftClient.ts @@ -5,7 +5,6 @@ import { GetRequestHeadersCallback, MetaCallback, ReadyStateCallback, - WebSocketFields, WebSocketImpl, } from "bare-client-custom"; import { Connection } from "./Connection"; diff --git a/client/src/Connection.ts b/client/src/Connection.ts index 0d352c2..9301c77 100644 --- a/client/src/Connection.ts +++ b/client/src/Connection.ts @@ -1,6 +1,7 @@ import { C2SRequestType, C2SRequestTypes, + C2SWSOpenPayload, HTTPRequestPayload, HTTPResponsePayload, S2CRequestType, @@ -112,11 +113,12 @@ export class Connection { send: (data: any) => void; close: (code?: number, reason?: string) => void; } { - const payload = JSON.stringify({ url }); + const payload: C2SWSOpenPayload = { url: url.toString() }; + const payloadJSON = JSON.stringify(payload); let seq = this.nextSeq(); this.send( seq, - new TextEncoder().encode(payload), + new TextEncoder().encode(payloadJSON), C2SRequestTypes.WSOpen ).catch((e) => { console.error(e); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index bdb5d1c..53fd696 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1,4 +1,4 @@ -lockfileVersion: '6.1' +lockfileVersion: '6.0' settings: autoInstallPeers: true @@ -32,8 +32,8 @@ importers: specifier: ^1.0.0 version: 1.0.0 '@tomphttp/bare-client': - specifier: file:../bare-client-custom - version: file:bare-client-custom + specifier: ^2.2.0-alpha + version: 2.2.0-alpha '@tomphttp/bare-server-node': specifier: ^2.0.1 version: 2.0.1 @@ -94,9 +94,6 @@ importers: url: specifier: ^0.11.0 version: 0.11.0 - uuid: - specifier: ^9.0.0 - version: 9.0.0 webpack: specifier: ^5.82.1 version: 5.82.1(esbuild@0.19.1)(uglify-js@3.17.4)(webpack-cli@5.1.1) @@ -138,8 +135,8 @@ importers: Ultraviolet: devDependencies: '@tomphttp/bare-client': - specifier: file:../bare-client-custom/ - version: file:bare-client-custom + specifier: ^2.2.0-alpha + version: 2.2.0-alpha css-tree: specifier: ^2.3.1 version: 2.3.1 @@ -226,7 +223,7 @@ importers: corium: dependencies: '@rollup/browser': - specifier: ^3.28.0 + specifier: ^3.17.2 version: 3.28.0 '@swc/helpers': specifier: ^0.4.14 @@ -390,6 +387,9 @@ importers: ipaddr.js: specifier: ^2.1.0 version: 2.1.0 + isomorphic-ws: + specifier: ^5.0.0 + version: 5.0.0(ws@8.13.0) protocol: specifier: workspace:* version: link:../protocol @@ -402,6 +402,9 @@ importers: wrtc: specifier: ^0.4.7 version: 0.4.7 + ws: + specifier: ^8.13.0 + version: 8.13.0 devDependencies: '@types/express': specifier: ^4.17.17 @@ -3423,6 +3426,9 @@ packages: resolution: {integrity: sha512-WyIVnSAqzfrLejmOhh/l/LtDOeK+SHnBGi/z+QyliVP1T1JxoNE5eecwxlV+osM9J6FTAYVGNHr8/5bubaIj6Q==} dev: false + /@tomphttp/bare-client@2.2.0-alpha: + resolution: {integrity: sha512-xhcflOpwr92tkpp8SoDhB3nK3LHMBIjx+vgow37XobQew2k0/mXSxmaU7BsDFpOIa1CcLCEsR8gWn0v7Cy9+7Q==} + /@tomphttp/bare-server-node@2.0.1: resolution: {integrity: sha512-L42TC/AldYRFBRZSxhkI0FC5TL8EC/NAsepNC/cWYTTiHQJ7mGg/vdTqNz8ShTYHr6LTHYkuD3/81nhX55SYtA==} engines: {node: '>=18.0.0'} @@ -3620,6 +3626,7 @@ packages: /@types/uuid@9.0.2: resolution: {integrity: sha512-kNnC1GFBLuhImSnV7w4njQkUiJi0ZXUycu1rUaouPqiKlXkh77JKgdRnTAp1x5eBwcIwbtI+3otwzuIDEuDoxQ==} + dev: false /@types/webrtc@0.0.36: resolution: {integrity: sha512-tYFarc92EluXU7XyRmWbkQXSbZIOHTdDOudFPal9u/TNTQuouWpIHV/2o9bNAdqvTJFjLJh/zflCOLWbL30tEQ==} @@ -7269,6 +7276,14 @@ packages: resolution: {integrity: sha512-WhB9zCku7EGTj/HQQRz5aUQEUeoQZH2bWcltRErOpymJ4boYE6wL9Tbr23krRPSZ+C5zqNSrSw+Cc7sZZ4b7vg==} engines: {node: '>=0.10.0'} + /isomorphic-ws@5.0.0(ws@8.13.0): + resolution: {integrity: sha512-muId7Zzn9ywDsyXgTIafTry2sV3nySZeUDe6YedVd1Hvuuep5AsIlqK+XefWpYTyJG5e503F2xIuT2lcU6rCSw==} + peerDependencies: + ws: '*' + dependencies: + ws: 8.13.0 + dev: false + /jackspeak@2.2.3: resolution: {integrity: sha512-pF0kfjmg8DJLxDrizHoCZGUFz4P4czQ3HyfW4BU0ffebYkzAVlBywp5zaxW/TM+r0sGbmrQdi8EQQVTJFxnGsQ==} engines: {node: '>=14'} @@ -10275,6 +10290,7 @@ packages: /uuid@9.0.0: resolution: {integrity: sha512-MXcSTerfPa4uqyzStbRoTgt5XIe3x5+42+q1sDuy3R5MDk66URdLMOZe5aPX/SQd+kuYAh0FdP/pO28IkQyTeg==} hasBin: true + dev: false /v8-compile-cache-lib@3.0.1: resolution: {integrity: sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==} @@ -10823,15 +10839,14 @@ packages: file:bare-client-custom: resolution: {directory: bare-client-custom, type: directory} name: bare-client-custom - version: 2.2.0-alpha dependencies: '@types/uuid': 9.0.2 uuid: 9.0.0 + dev: false file:corium: resolution: {directory: corium, type: directory} name: corium - version: 1.0.0-alpha.2 dependencies: '@rollup/browser': 3.28.0 '@swc/helpers': 0.4.14 diff --git a/protocol/src/index.ts b/protocol/src/index.ts index 3249163..15fb0c6 100644 --- a/protocol/src/index.ts +++ b/protocol/src/index.ts @@ -34,4 +34,14 @@ export type HTTPResponsePayload = { headers: ProtoBareHeaders; }; +export type C2SWSOpenPayload = { + url: string; +}; + +export type S2CWSClosePayload = { + code: number; + reason: string; + wasClean: boolean; +}; + export { Transport } from "./Transport"; diff --git a/server/package.json b/server/package.json index 1cdf5e2..453e890 100644 --- a/server/package.json +++ b/server/package.json @@ -15,12 +15,14 @@ "express": "^4.18.2", "express-ws": "^5.0.2", "firebase": "^10.1.0", + "firebase-config": "workspace:*", "ipaddr.js": "^2.1.0", + "isomorphic-ws": "^5.0.0", + "protocol": "workspace:*", "ts-node": "^10.9.1", "typescript": "^5.1.6", "wrtc": "^0.4.7", - "firebase-config": "workspace:*", - "protocol": "workspace:*" + "ws": "^8.13.0" }, "devDependencies": { "@types/express": "^4.17.17", diff --git a/server/src/server.ts b/server/src/server.ts index 0fe5e68..fc3a54a 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -1,13 +1,14 @@ -import { IncomingMessage, STATUS_CODES } from "http"; - import EventEmitter from "events"; - +import { IncomingMessage, STATUS_CODES } from "http"; +import { WebSocket } from "isomorphic-ws"; import { C2SRequestTypes, HTTPRequestPayload, HTTPResponsePayload, ProtoBareHeaders, + S2CRequestType, S2CRequestTypes, + S2CWSClosePayload, } from "protocol"; import { Readable } from "stream"; import { BareError, bareFetch, options } from "./http"; @@ -28,6 +29,7 @@ function bareErrorToResponse(e: BareError): { export class AdriftServer { send: (msg: ArrayBuffer) => void; + sockets: Record; events: EventEmitter; constructor(send: (msg: ArrayBuffer) => void) { @@ -56,9 +58,7 @@ export class AdriftServer { } } - static parseHttpReqPayload( - payloadRaw: ArrayBuffer - ): HTTPRequestPayload | undefined { + static tryParseJSONPayload(payloadRaw: ArrayBuffer): any | undefined { let payload; try { payload = JSON.parse(new TextDecoder().decode(payloadRaw)); @@ -113,19 +113,23 @@ export class AdriftServer { }; } - sendHTTPResponseStart(seq: number, payload: HTTPResponsePayload) { + _sendJSONRes(seq: number, op: S2CRequestType, payload: any) { const payloadBuffer = new TextEncoder().encode(JSON.stringify(payload)); const buf = new ArrayBuffer(2 + 1 + payloadBuffer.length); const dataView = new DataView(buf); let cursor = 0; dataView.setUint16(cursor, seq); cursor += 2; - dataView.setUint8(cursor, S2CRequestTypes.HTTPResponseStart); + dataView.setUint8(cursor, op); cursor += 1; new Uint8Array(buf).set(payloadBuffer, cursor); this.send(buf); } + sendHTTPResponseStart(seq: number, payload: HTTPResponsePayload) { + this._sendJSONRes(seq, S2CRequestTypes.HTTPResponseStart, payload); + } + sendHTTPResponseChunk(seq: number, chunk: Uint8Array) { const buf = new ArrayBuffer(2 + 1 + chunk.byteLength); const dataView = new DataView(buf); @@ -138,27 +142,39 @@ export class AdriftServer { this.send(buf); } - sendHTTPResponseEnd(seq: number) { + _sendSimpleRes(seq: number, op: S2CRequestType) { const buf = new ArrayBuffer(2 + 1); const dataView = new DataView(buf); let cursor = 0; dataView.setUint16(cursor, seq); cursor += 2; - dataView.setUint8(cursor, S2CRequestTypes.HTTPResponseEnd); + dataView.setUint8(cursor, op); this.send(buf); } + sendHTTPResponseEnd(seq: number) { + this._sendSimpleRes(seq, S2CRequestTypes.HTTPResponseEnd); + } + + sendWSOpen(seq: number) { + this._sendSimpleRes(seq, S2CRequestTypes.WSOpen); + } + + sendWSClose(seq: number, payload: S2CWSClosePayload) { + this._sendJSONRes(seq, S2CRequestTypes.WSClose, payload); + } + async onMsg(msg: ArrayBuffer) { const init = AdriftServer.parseMsgInit(msg); if (!init) return; const { cursor, seq, op } = init; switch (op) { - case C2SRequestTypes.HTTPRequest: + case C2SRequestTypes.HTTPRequest: { let resp: { payload: HTTPResponsePayload; body: AsyncIterable; }; - const reqPayload = AdriftServer.parseHttpReqPayload(msg.slice(cursor)); + const reqPayload = AdriftServer.tryParseJSONPayload(msg.slice(cursor)); if (!reqPayload) return; try { resp = await this.handleHTTPRequest(reqPayload); @@ -194,6 +210,25 @@ export class AdriftServer { } this.sendHTTPResponseEnd(seq); break; + } + + case C2SRequestTypes.WSOpen: { + const payload = AdriftServer.tryParseJSONPayload(msg.slice(cursor)); + const ws = (this.sockets[seq] = new WebSocket(payload.url)); + ws.onopen = () => { + this.sendWSOpen(seq); + }; + ws.onclose = (e) => { + this.sendWSClose(seq, { + code: e.code, + reason: e.reason, + wasClean: e.wasClean, + }); + }; + ws.onmessage = (e) => {}; + break; + } + default: // not implemented break;