From f64cc78f9f9802a36b1b63eacc8e8056c6adb11d Mon Sep 17 00:00:00 2001 From: Spencer Pogorzelski <34356756+Scoder12@users.noreply.github.com> Date: Sun, 13 Aug 2023 19:06:52 -0700 Subject: [PATCH] WIP on websocket impl --- client/src/AdriftClient.ts | 1 - client/src/Connection.ts | 6 ++-- pnpm-lock.yaml | 37 +++++++++++++++++------- protocol/src/index.ts | 10 +++++++ server/package.json | 6 ++-- server/src/server.ts | 59 ++++++++++++++++++++++++++++++-------- 6 files changed, 91 insertions(+), 28 deletions(-) diff --git a/client/src/AdriftClient.ts b/client/src/AdriftClient.ts index a1e1bd1..143c782 100644 --- a/client/src/AdriftClient.ts +++ b/client/src/AdriftClient.ts @@ -5,7 +5,6 @@ import { GetRequestHeadersCallback, MetaCallback, ReadyStateCallback, - WebSocketFields, WebSocketImpl, } from "bare-client-custom"; import { Connection } from "./Connection"; diff --git a/client/src/Connection.ts b/client/src/Connection.ts index 0d352c2..9301c77 100644 --- a/client/src/Connection.ts +++ b/client/src/Connection.ts @@ -1,6 +1,7 @@ import { C2SRequestType, C2SRequestTypes, + C2SWSOpenPayload, HTTPRequestPayload, HTTPResponsePayload, S2CRequestType, @@ -112,11 +113,12 @@ export class Connection { send: (data: any) => void; close: (code?: number, reason?: string) => void; } { - const payload = JSON.stringify({ url }); + const payload: C2SWSOpenPayload = { url: url.toString() }; + const payloadJSON = JSON.stringify(payload); let seq = this.nextSeq(); this.send( seq, - new TextEncoder().encode(payload), + new TextEncoder().encode(payloadJSON), C2SRequestTypes.WSOpen ).catch((e) => { console.error(e); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index bdb5d1c..53fd696 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1,4 +1,4 @@ -lockfileVersion: '6.1' +lockfileVersion: '6.0' settings: autoInstallPeers: true @@ -32,8 +32,8 @@ importers: specifier: ^1.0.0 version: 1.0.0 '@tomphttp/bare-client': - specifier: file:../bare-client-custom - version: file:bare-client-custom + specifier: ^2.2.0-alpha + version: 2.2.0-alpha '@tomphttp/bare-server-node': specifier: ^2.0.1 version: 2.0.1 @@ -94,9 +94,6 @@ importers: url: specifier: ^0.11.0 version: 0.11.0 - uuid: - specifier: ^9.0.0 - version: 9.0.0 webpack: specifier: ^5.82.1 version: 5.82.1(esbuild@0.19.1)(uglify-js@3.17.4)(webpack-cli@5.1.1) @@ -138,8 +135,8 @@ importers: Ultraviolet: devDependencies: '@tomphttp/bare-client': - specifier: file:../bare-client-custom/ - version: file:bare-client-custom + specifier: ^2.2.0-alpha + version: 2.2.0-alpha css-tree: specifier: ^2.3.1 version: 2.3.1 @@ -226,7 +223,7 @@ importers: corium: dependencies: '@rollup/browser': - specifier: ^3.28.0 + specifier: ^3.17.2 version: 3.28.0 '@swc/helpers': specifier: ^0.4.14 @@ -390,6 +387,9 @@ importers: ipaddr.js: specifier: ^2.1.0 version: 2.1.0 + isomorphic-ws: + specifier: ^5.0.0 + version: 5.0.0(ws@8.13.0) protocol: specifier: workspace:* version: link:../protocol @@ -402,6 +402,9 @@ importers: wrtc: specifier: ^0.4.7 version: 0.4.7 + ws: + specifier: ^8.13.0 + version: 8.13.0 devDependencies: '@types/express': specifier: ^4.17.17 @@ -3423,6 +3426,9 @@ packages: resolution: {integrity: sha512-WyIVnSAqzfrLejmOhh/l/LtDOeK+SHnBGi/z+QyliVP1T1JxoNE5eecwxlV+osM9J6FTAYVGNHr8/5bubaIj6Q==} dev: false + /@tomphttp/bare-client@2.2.0-alpha: + resolution: {integrity: sha512-xhcflOpwr92tkpp8SoDhB3nK3LHMBIjx+vgow37XobQew2k0/mXSxmaU7BsDFpOIa1CcLCEsR8gWn0v7Cy9+7Q==} + /@tomphttp/bare-server-node@2.0.1: resolution: {integrity: sha512-L42TC/AldYRFBRZSxhkI0FC5TL8EC/NAsepNC/cWYTTiHQJ7mGg/vdTqNz8ShTYHr6LTHYkuD3/81nhX55SYtA==} engines: {node: '>=18.0.0'} @@ -3620,6 +3626,7 @@ packages: /@types/uuid@9.0.2: resolution: {integrity: sha512-kNnC1GFBLuhImSnV7w4njQkUiJi0ZXUycu1rUaouPqiKlXkh77JKgdRnTAp1x5eBwcIwbtI+3otwzuIDEuDoxQ==} + dev: false /@types/webrtc@0.0.36: resolution: {integrity: sha512-tYFarc92EluXU7XyRmWbkQXSbZIOHTdDOudFPal9u/TNTQuouWpIHV/2o9bNAdqvTJFjLJh/zflCOLWbL30tEQ==} @@ -7269,6 +7276,14 @@ packages: resolution: {integrity: sha512-WhB9zCku7EGTj/HQQRz5aUQEUeoQZH2bWcltRErOpymJ4boYE6wL9Tbr23krRPSZ+C5zqNSrSw+Cc7sZZ4b7vg==} engines: {node: '>=0.10.0'} + /isomorphic-ws@5.0.0(ws@8.13.0): + resolution: {integrity: sha512-muId7Zzn9ywDsyXgTIafTry2sV3nySZeUDe6YedVd1Hvuuep5AsIlqK+XefWpYTyJG5e503F2xIuT2lcU6rCSw==} + peerDependencies: + ws: '*' + dependencies: + ws: 8.13.0 + dev: false + /jackspeak@2.2.3: resolution: {integrity: sha512-pF0kfjmg8DJLxDrizHoCZGUFz4P4czQ3HyfW4BU0ffebYkzAVlBywp5zaxW/TM+r0sGbmrQdi8EQQVTJFxnGsQ==} engines: {node: '>=14'} @@ -10275,6 +10290,7 @@ packages: /uuid@9.0.0: resolution: {integrity: sha512-MXcSTerfPa4uqyzStbRoTgt5XIe3x5+42+q1sDuy3R5MDk66URdLMOZe5aPX/SQd+kuYAh0FdP/pO28IkQyTeg==} hasBin: true + dev: false /v8-compile-cache-lib@3.0.1: resolution: {integrity: sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==} @@ -10823,15 +10839,14 @@ packages: file:bare-client-custom: resolution: {directory: bare-client-custom, type: directory} name: bare-client-custom - version: 2.2.0-alpha dependencies: '@types/uuid': 9.0.2 uuid: 9.0.0 + dev: false file:corium: resolution: {directory: corium, type: directory} name: corium - version: 1.0.0-alpha.2 dependencies: '@rollup/browser': 3.28.0 '@swc/helpers': 0.4.14 diff --git a/protocol/src/index.ts b/protocol/src/index.ts index 3249163..15fb0c6 100644 --- a/protocol/src/index.ts +++ b/protocol/src/index.ts @@ -34,4 +34,14 @@ export type HTTPResponsePayload = { headers: ProtoBareHeaders; }; +export type C2SWSOpenPayload = { + url: string; +}; + +export type S2CWSClosePayload = { + code: number; + reason: string; + wasClean: boolean; +}; + export { Transport } from "./Transport"; diff --git a/server/package.json b/server/package.json index 1cdf5e2..453e890 100644 --- a/server/package.json +++ b/server/package.json @@ -15,12 +15,14 @@ "express": "^4.18.2", "express-ws": "^5.0.2", "firebase": "^10.1.0", + "firebase-config": "workspace:*", "ipaddr.js": "^2.1.0", + "isomorphic-ws": "^5.0.0", + "protocol": "workspace:*", "ts-node": "^10.9.1", "typescript": "^5.1.6", "wrtc": "^0.4.7", - "firebase-config": "workspace:*", - "protocol": "workspace:*" + "ws": "^8.13.0" }, "devDependencies": { "@types/express": "^4.17.17", diff --git a/server/src/server.ts b/server/src/server.ts index 0fe5e68..fc3a54a 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -1,13 +1,14 @@ -import { IncomingMessage, STATUS_CODES } from "http"; - import EventEmitter from "events"; - +import { IncomingMessage, STATUS_CODES } from "http"; +import { WebSocket } from "isomorphic-ws"; import { C2SRequestTypes, HTTPRequestPayload, HTTPResponsePayload, ProtoBareHeaders, + S2CRequestType, S2CRequestTypes, + S2CWSClosePayload, } from "protocol"; import { Readable } from "stream"; import { BareError, bareFetch, options } from "./http"; @@ -28,6 +29,7 @@ function bareErrorToResponse(e: BareError): { export class AdriftServer { send: (msg: ArrayBuffer) => void; + sockets: Record; events: EventEmitter; constructor(send: (msg: ArrayBuffer) => void) { @@ -56,9 +58,7 @@ export class AdriftServer { } } - static parseHttpReqPayload( - payloadRaw: ArrayBuffer - ): HTTPRequestPayload | undefined { + static tryParseJSONPayload(payloadRaw: ArrayBuffer): any | undefined { let payload; try { payload = JSON.parse(new TextDecoder().decode(payloadRaw)); @@ -113,19 +113,23 @@ export class AdriftServer { }; } - sendHTTPResponseStart(seq: number, payload: HTTPResponsePayload) { + _sendJSONRes(seq: number, op: S2CRequestType, payload: any) { const payloadBuffer = new TextEncoder().encode(JSON.stringify(payload)); const buf = new ArrayBuffer(2 + 1 + payloadBuffer.length); const dataView = new DataView(buf); let cursor = 0; dataView.setUint16(cursor, seq); cursor += 2; - dataView.setUint8(cursor, S2CRequestTypes.HTTPResponseStart); + dataView.setUint8(cursor, op); cursor += 1; new Uint8Array(buf).set(payloadBuffer, cursor); this.send(buf); } + sendHTTPResponseStart(seq: number, payload: HTTPResponsePayload) { + this._sendJSONRes(seq, S2CRequestTypes.HTTPResponseStart, payload); + } + sendHTTPResponseChunk(seq: number, chunk: Uint8Array) { const buf = new ArrayBuffer(2 + 1 + chunk.byteLength); const dataView = new DataView(buf); @@ -138,27 +142,39 @@ export class AdriftServer { this.send(buf); } - sendHTTPResponseEnd(seq: number) { + _sendSimpleRes(seq: number, op: S2CRequestType) { const buf = new ArrayBuffer(2 + 1); const dataView = new DataView(buf); let cursor = 0; dataView.setUint16(cursor, seq); cursor += 2; - dataView.setUint8(cursor, S2CRequestTypes.HTTPResponseEnd); + dataView.setUint8(cursor, op); this.send(buf); } + sendHTTPResponseEnd(seq: number) { + this._sendSimpleRes(seq, S2CRequestTypes.HTTPResponseEnd); + } + + sendWSOpen(seq: number) { + this._sendSimpleRes(seq, S2CRequestTypes.WSOpen); + } + + sendWSClose(seq: number, payload: S2CWSClosePayload) { + this._sendJSONRes(seq, S2CRequestTypes.WSClose, payload); + } + async onMsg(msg: ArrayBuffer) { const init = AdriftServer.parseMsgInit(msg); if (!init) return; const { cursor, seq, op } = init; switch (op) { - case C2SRequestTypes.HTTPRequest: + case C2SRequestTypes.HTTPRequest: { let resp: { payload: HTTPResponsePayload; body: AsyncIterable; }; - const reqPayload = AdriftServer.parseHttpReqPayload(msg.slice(cursor)); + const reqPayload = AdriftServer.tryParseJSONPayload(msg.slice(cursor)); if (!reqPayload) return; try { resp = await this.handleHTTPRequest(reqPayload); @@ -194,6 +210,25 @@ export class AdriftServer { } this.sendHTTPResponseEnd(seq); break; + } + + case C2SRequestTypes.WSOpen: { + const payload = AdriftServer.tryParseJSONPayload(msg.slice(cursor)); + const ws = (this.sockets[seq] = new WebSocket(payload.url)); + ws.onopen = () => { + this.sendWSOpen(seq); + }; + ws.onclose = (e) => { + this.sendWSClose(seq, { + code: e.code, + reason: e.reason, + wasClean: e.wasClean, + }); + }; + ws.onmessage = (e) => {}; + break; + } + default: // not implemented break;