This commit is contained in:
CoolElectronics 2023-08-13 19:59:21 -04:00
commit f067d1e09c
2 changed files with 45 additions and 29 deletions

View file

@ -13,22 +13,24 @@ import { Readable } from "stream";
import { BareError, bareFetch, options } from "./http"; import { BareError, bareFetch, options } from "./http";
export class Client { export class Client {
send: (msg: Buffer) => void; send: (msg: ArrayBuffer) => void;
events: EventEmitter; events: EventEmitter;
constructor(send: (msg: Buffer) => void) { constructor(send: (msg: ArrayBuffer) => void) {
this.send = send; this.send = send;
this.events = new EventEmitter(); this.events = new EventEmitter();
} }
static parseMsgInit( static parseMsgInit(
msg: Buffer msg: ArrayBuffer
): { cursor: number; seq: number; op: number } | undefined { ): { cursor: number; seq: number; op: number } | undefined {
try { try {
console.log(msg);
const dataView = new DataView(msg);
let cursor = 0; let cursor = 0;
const seq = msg.readUint16BE(cursor); const seq = dataView.getUint16(cursor);
cursor += 2; cursor += 2;
const op = msg.readUint8(cursor); const op = dataView.getUint8(cursor);
cursor += 1; cursor += 1;
return { cursor, seq, op }; return { cursor, seq, op };
} catch (e) { } catch (e) {
@ -41,11 +43,11 @@ export class Client {
} }
static parseHttpReqPayload( static parseHttpReqPayload(
payloadRaw: Buffer payloadRaw: ArrayBuffer
): HTTPRequestPayload | undefined { ): HTTPRequestPayload | undefined {
let payload; let payload;
try { try {
payload = JSON.parse(payloadRaw.toString()); payload = JSON.parse(new TextDecoder().decode(payloadRaw));
} catch (e) { } catch (e) {
if (e instanceof SyntaxError) { if (e instanceof SyntaxError) {
return; return;
@ -58,7 +60,7 @@ export class Client {
static bareErrorToResponse(e: BareError): { static bareErrorToResponse(e: BareError): {
payload: HTTPResponsePayload; payload: HTTPResponsePayload;
body: AsyncIterable<Buffer>; body: AsyncIterable<ArrayBuffer>;
} { } {
return { return {
payload: { payload: {
@ -72,7 +74,7 @@ export class Client {
async handleHTTPRequest(payload: HTTPRequestPayload): Promise<{ async handleHTTPRequest(payload: HTTPRequestPayload): Promise<{
payload: HTTPResponsePayload; payload: HTTPResponsePayload;
body: AsyncIterable<Buffer>; body: AsyncIterable<ArrayBuffer>;
}> { }> {
const abort = new AbortController(); const abort = new AbortController();
const onClose = () => { const onClose = () => {
@ -112,34 +114,41 @@ export class Client {
} }
sendHTTPResponseStart(seq: number, payload: HTTPResponsePayload) { sendHTTPResponseStart(seq: number, payload: HTTPResponsePayload) {
const payloadBuffer = Buffer.from(JSON.stringify(payload)); const payloadBuffer = new TextEncoder().encode(JSON.stringify(payload));
const buf = Buffer.alloc(2 + 1 + payloadBuffer.length); const buf = new ArrayBuffer(2 + 1 + payloadBuffer.length);
const dataView = new DataView(buf);
let cursor = 0; let cursor = 0;
cursor = buf.writeUInt16BE(seq, cursor); dataView.setUint16(cursor, seq);
cursor = buf.writeUInt8(S2CRequestTypes.HTTPResponseStart, cursor); cursor += 2;
payloadBuffer.copy(buf, cursor); dataView.setUint8(cursor, S2CRequestTypes.HTTPResponseStart);
cursor += 1;
new Uint8Array(buf).set(payloadBuffer, cursor);
this.send(buf); this.send(buf);
} }
sendHTTPResponseChunk(seq: number, chunk: Buffer) { sendHTTPResponseChunk(seq: number, chunk: Uint8Array) {
if (!chunk.copy) return; const buf = new ArrayBuffer(2 + 1 + chunk.byteLength);
const buf = Buffer.alloc(2 + 1 + chunk.length); const dataView = new DataView(buf);
let cursor = 0; let cursor = 0;
cursor = buf.writeUInt16BE(seq, cursor); dataView.setUint16(cursor, seq);
cursor = buf.writeUInt8(S2CRequestTypes.HTTPResponseChunk, cursor); cursor += 2;
chunk.copy(buf, cursor); dataView.setUint8(cursor, S2CRequestTypes.HTTPResponseChunk);
cursor += 1;
new Uint8Array(buf).set(chunk, cursor);
this.send(buf); this.send(buf);
} }
sendHTTPResponseEnd(seq: number) { sendHTTPResponseEnd(seq: number) {
const buf = Buffer.alloc(2 + 1); const buf = new ArrayBuffer(2 + 1);
const dataView = new DataView(buf);
let cursor = 0; let cursor = 0;
cursor = buf.writeUInt16BE(seq, cursor); dataView.setUint16(cursor, seq);
cursor = buf.writeUInt8(S2CRequestTypes.HTTPResponseEnd, cursor); cursor += 2;
dataView.setUint8(cursor, S2CRequestTypes.HTTPResponseEnd);
this.send(buf); this.send(buf);
} }
async onMsg(msg: Buffer) { async onMsg(msg: ArrayBuffer) {
const init = Client.parseMsgInit(msg); const init = Client.parseMsgInit(msg);
if (!init) return; if (!init) return;
const { cursor, seq, op } = init; const { cursor, seq, op } = init;
@ -147,9 +156,9 @@ export class Client {
case C2SRequestTypes.HTTPRequest: case C2SRequestTypes.HTTPRequest:
let resp: { let resp: {
payload: HTTPResponsePayload; payload: HTTPResponsePayload;
body: AsyncIterable<Buffer>; body: AsyncIterable<ArrayBuffer>;
}; };
const reqPayload = Client.parseHttpReqPayload(msg.subarray(cursor)); const reqPayload = Client.parseHttpReqPayload(msg.slice(cursor));
if (!reqPayload) return; if (!reqPayload) return;
try { try {
resp = await this.handleHTTPRequest(reqPayload); resp = await this.handleHTTPRequest(reqPayload);
@ -181,7 +190,7 @@ export class Client {
const { payload, body } = resp; const { payload, body } = resp;
this.sendHTTPResponseStart(seq, payload); this.sendHTTPResponseStart(seq, payload);
for await (const chunk of body) { for await (const chunk of body) {
this.sendHTTPResponseChunk(seq, chunk); this.sendHTTPResponseChunk(seq, new Uint8Array(chunk));
} }
this.sendHTTPResponseEnd(seq); this.sendHTTPResponseEnd(seq);
break; break;

View file

@ -81,6 +81,10 @@ app.use((_req, res, next) => {
next(); next();
}); });
function bufferToArrayBuffer(buf: Buffer): ArrayBuffer {
return buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength);
}
async function answerRtc(data: any, onrespond: (answer: any) => void) { async function answerRtc(data: any, onrespond: (answer: any) => void) {
if (data && data.offer && data.localCandidates) { if (data && data.offer && data.localCandidates) {
const { offer, localCandidates } = data; const { offer, localCandidates } = data;
@ -106,7 +110,10 @@ async function answerRtc(data: any, onrespond: (answer: any) => void) {
}; };
dataChannel.onmessage = (event) => { dataChannel.onmessage = (event) => {
console.log("messaged"); console.log("messaged");
client.onMsg(Buffer.from(event.data)); if (event.data instanceof Buffer) {
client.onMsg(bufferToArrayBuffer(event.data));
}
throw new Error("Unexpected datachannel message type");
}; };
} }
} }
@ -128,7 +135,7 @@ app.ws("/dev-ws", (ws, _req) => {
} }
if (msg instanceof Buffer) { if (msg instanceof Buffer) {
client.onMsg(msg); client.onMsg(bufferToArrayBuffer(msg));
return; return;
} }
throw new Error("Unexpected message type"); throw new Error("Unexpected message type");