WIP on websocket impl

This commit is contained in:
Spencer Pogorzelski 2023-08-13 19:06:52 -07:00
parent 07b0ba3a07
commit f64cc78f9f
6 changed files with 91 additions and 28 deletions

View file

@ -15,12 +15,14 @@
"express": "^4.18.2",
"express-ws": "^5.0.2",
"firebase": "^10.1.0",
"firebase-config": "workspace:*",
"ipaddr.js": "^2.1.0",
"isomorphic-ws": "^5.0.0",
"protocol": "workspace:*",
"ts-node": "^10.9.1",
"typescript": "^5.1.6",
"wrtc": "^0.4.7",
"firebase-config": "workspace:*",
"protocol": "workspace:*"
"ws": "^8.13.0"
},
"devDependencies": {
"@types/express": "^4.17.17",

View file

@ -1,13 +1,14 @@
import { IncomingMessage, STATUS_CODES } from "http";
import EventEmitter from "events";
import { IncomingMessage, STATUS_CODES } from "http";
import { WebSocket } from "isomorphic-ws";
import {
C2SRequestTypes,
HTTPRequestPayload,
HTTPResponsePayload,
ProtoBareHeaders,
S2CRequestType,
S2CRequestTypes,
S2CWSClosePayload,
} from "protocol";
import { Readable } from "stream";
import { BareError, bareFetch, options } from "./http";
@ -28,6 +29,7 @@ function bareErrorToResponse(e: BareError): {
export class AdriftServer {
send: (msg: ArrayBuffer) => void;
sockets: Record<number, WebSocket>;
events: EventEmitter;
constructor(send: (msg: ArrayBuffer) => void) {
@ -56,9 +58,7 @@ export class AdriftServer {
}
}
static parseHttpReqPayload(
payloadRaw: ArrayBuffer
): HTTPRequestPayload | undefined {
static tryParseJSONPayload(payloadRaw: ArrayBuffer): any | undefined {
let payload;
try {
payload = JSON.parse(new TextDecoder().decode(payloadRaw));
@ -113,19 +113,23 @@ export class AdriftServer {
};
}
sendHTTPResponseStart(seq: number, payload: HTTPResponsePayload) {
_sendJSONRes(seq: number, op: S2CRequestType, payload: any) {
const payloadBuffer = new TextEncoder().encode(JSON.stringify(payload));
const buf = new ArrayBuffer(2 + 1 + payloadBuffer.length);
const dataView = new DataView(buf);
let cursor = 0;
dataView.setUint16(cursor, seq);
cursor += 2;
dataView.setUint8(cursor, S2CRequestTypes.HTTPResponseStart);
dataView.setUint8(cursor, op);
cursor += 1;
new Uint8Array(buf).set(payloadBuffer, cursor);
this.send(buf);
}
sendHTTPResponseStart(seq: number, payload: HTTPResponsePayload) {
this._sendJSONRes(seq, S2CRequestTypes.HTTPResponseStart, payload);
}
sendHTTPResponseChunk(seq: number, chunk: Uint8Array) {
const buf = new ArrayBuffer(2 + 1 + chunk.byteLength);
const dataView = new DataView(buf);
@ -138,27 +142,39 @@ export class AdriftServer {
this.send(buf);
}
sendHTTPResponseEnd(seq: number) {
_sendSimpleRes(seq: number, op: S2CRequestType) {
const buf = new ArrayBuffer(2 + 1);
const dataView = new DataView(buf);
let cursor = 0;
dataView.setUint16(cursor, seq);
cursor += 2;
dataView.setUint8(cursor, S2CRequestTypes.HTTPResponseEnd);
dataView.setUint8(cursor, op);
this.send(buf);
}
sendHTTPResponseEnd(seq: number) {
this._sendSimpleRes(seq, S2CRequestTypes.HTTPResponseEnd);
}
sendWSOpen(seq: number) {
this._sendSimpleRes(seq, S2CRequestTypes.WSOpen);
}
sendWSClose(seq: number, payload: S2CWSClosePayload) {
this._sendJSONRes(seq, S2CRequestTypes.WSClose, payload);
}
async onMsg(msg: ArrayBuffer) {
const init = AdriftServer.parseMsgInit(msg);
if (!init) return;
const { cursor, seq, op } = init;
switch (op) {
case C2SRequestTypes.HTTPRequest:
case C2SRequestTypes.HTTPRequest: {
let resp: {
payload: HTTPResponsePayload;
body: AsyncIterable<ArrayBuffer>;
};
const reqPayload = AdriftServer.parseHttpReqPayload(msg.slice(cursor));
const reqPayload = AdriftServer.tryParseJSONPayload(msg.slice(cursor));
if (!reqPayload) return;
try {
resp = await this.handleHTTPRequest(reqPayload);
@ -194,6 +210,25 @@ export class AdriftServer {
}
this.sendHTTPResponseEnd(seq);
break;
}
case C2SRequestTypes.WSOpen: {
const payload = AdriftServer.tryParseJSONPayload(msg.slice(cursor));
const ws = (this.sockets[seq] = new WebSocket(payload.url));
ws.onopen = () => {
this.sendWSOpen(seq);
};
ws.onclose = (e) => {
this.sendWSClose(seq, {
code: e.code,
reason: e.reason,
wasClean: e.wasClean,
});
};
ws.onmessage = (e) => {};
break;
}
default:
// not implemented
break;