This commit is contained in:
Spencer Pogorzelski 2023-08-12 17:04:02 -07:00
parent db82f47aca
commit 753bccf51b
6 changed files with 2264 additions and 179 deletions

View file

@ -10,6 +10,7 @@ import {
export class Connection {
callbacks: Record<number, Function> = {};
openStreams: Record<number, ReadableStreamDefaultController<any>> = {};
counter: number = 0;
@ -29,20 +30,32 @@ export class Connection {
console.log(requestID, requestType);
switch (requestType) {
case S2CRequestTypes.HTTPResponse: {
const payloadLen = view.getUint32(cursor);
cursor += 4;
case S2CRequestTypes.HTTPResponseStart:
const decoder = new TextDecoder();
const payloadRaw = decoder.decode(
data.slice(cursor, cursor + payloadLen)
);
console.log({ payloadLen, payloadRaw });
const payload = JSON.parse(payloadRaw);
cursor += payloadLen;
this.callbacks[requestID]({ payload, body: data.slice(cursor) });
const payload = JSON.parse(decoder.decode(data.slice(cursor)));
const stream = new ReadableStream({
start: (controller) => {
this.openStreams[requestID] = controller;
},
pull: (controller) => {
// not needed
},
cancel: () => {
// TODO
},
});
this.callbacks[requestID]({ payload, body: stream });
break;
case S2CRequestTypes.HTTPResponseChunk:
this.openStreams[requestID]?.enqueue(
new Uint8Array(data.slice(cursor))
);
break;
case S2CRequestTypes.HTTPResponseEnd:
this.openStreams[requestID]?.close();
break;
}
}
}

View file

@ -146,6 +146,8 @@
);
}
}
(window as any).bare = new BareClient();
</script>
{#if ready}

2338
pnpm-lock.yaml generated

File diff suppressed because it is too large Load diff

View file

@ -4,3 +4,4 @@ packages:
- frontend
- client
- firebase-config
- corium

View file

@ -9,10 +9,13 @@ export const C2SRequestTypes = {
export type C2SRequestType = ObjectValues<typeof C2SRequestTypes>;
export const S2CRequestTypes = {
HTTPResponse: 0,
WSOpen: 1,
WSDataText: 2,
WSDataBinary: 3,
HTTPResponseStart: 0,
HTTPResponseChunk: 1,
HTTPResponseEnd: 2,
WSOpen: 3,
WSDataText: 4,
WSDataBinary: 5,
WSClose: 6,
} as const;
export type S2CRequestType = ObjectValues<typeof S2CRequestTypes>;

View file

@ -9,6 +9,7 @@ import {
ProtoBareHeaders,
S2CRequestTypes,
} from "protocol";
import { Readable } from "stream";
import { BareError, bareFetch, options } from "./http";
export class Client {
@ -57,7 +58,7 @@ export class Client {
static bareErrorToResponse(e: BareError): {
payload: HTTPResponsePayload;
body: Buffer;
body: AsyncIterable<Buffer>;
} {
return {
payload: {
@ -65,13 +66,13 @@ export class Client {
statusText: STATUS_CODES[e.status] || "",
headers: {},
},
body: Buffer.from(JSON.stringify(e.body)),
body: Readable.from(JSON.stringify(e.body)),
};
}
async handleHTTPRequest(payload: HTTPRequestPayload): Promise<{
payload: HTTPResponsePayload;
body: Buffer;
body: AsyncIterable<Buffer>;
}> {
const abort = new AbortController();
const onClose = () => {
@ -97,13 +98,6 @@ export class Client {
}
this.events.off("close", onClose);
const buffers: any[] = [];
// node.js readable streams implement the async iterator protocol
for await (const data of resp) {
buffers.push(data);
}
const body = Buffer.concat(buffers);
return {
payload: {
@ -113,19 +107,34 @@ export class Client {
Object.entries(resp.headersDistinct).filter(([_k, v]) => Boolean(v))
) as ProtoBareHeaders,
},
body,
body: resp,
};
}
sendHTTPResponse(seq: number, payload: HTTPResponsePayload, body: Buffer) {
sendHTTPResponseStart(seq: number, payload: HTTPResponsePayload) {
const payloadBuffer = Buffer.from(JSON.stringify(payload));
const buf = Buffer.alloc(2 + 1 + 4 + payloadBuffer.length + body.length);
const buf = Buffer.alloc(2 + 1 + payloadBuffer.length);
let cursor = 0;
cursor = buf.writeUInt16BE(seq, cursor);
cursor = buf.writeUInt8(S2CRequestTypes.HTTPResponse, cursor);
cursor = buf.writeUInt32BE(payloadBuffer.length, cursor);
cursor += payloadBuffer.copy(buf, cursor);
body.copy(buf, cursor);
cursor = buf.writeUInt8(S2CRequestTypes.HTTPResponseStart, cursor);
payloadBuffer.copy(buf, cursor);
this.send(buf);
}
sendHTTPResponseChunk(seq: number, chunk: Buffer) {
const buf = Buffer.alloc(2 + 1 + chunk.length);
let cursor = 0;
cursor = buf.writeUInt16BE(seq, cursor);
cursor = buf.writeUInt8(S2CRequestTypes.HTTPResponseChunk, cursor);
chunk.copy(buf, cursor);
this.send(buf);
}
sendHTTPResponseEnd(seq: number) {
const buf = Buffer.alloc(2 + 1);
let cursor = 0;
cursor = buf.writeUInt16BE(seq, cursor);
cursor = buf.writeUInt8(S2CRequestTypes.HTTPResponseEnd, cursor);
this.send(buf);
}
@ -135,7 +144,10 @@ export class Client {
const { cursor, seq, op } = init;
switch (op) {
case C2SRequestTypes.HTTPRequest:
let resp;
let resp: {
payload: HTTPResponsePayload;
body: AsyncIterable<Buffer>;
};
const reqPayload = Client.parseHttpReqPayload(msg.subarray(cursor));
if (!reqPayload) return;
try {
@ -166,7 +178,11 @@ export class Client {
}
const { payload, body } = resp;
this.sendHTTPResponse(seq, payload, body);
this.sendHTTPResponseStart(seq, payload);
for await (const chunk of body) {
this.sendHTTPResponseChunk(seq, chunk);
}
this.sendHTTPResponseEnd(seq);
break;
default:
// not implemented