serverside client chunking impl (CURSED)

This commit is contained in:
Spencer Pogorzelski 2023-08-14 18:30:33 -07:00
parent a839d0fe51
commit b73a561844
5 changed files with 51 additions and 20 deletions

View file

@ -7,7 +7,6 @@ import {
ReadyStateCallback, ReadyStateCallback,
WebSocketImpl, WebSocketImpl,
} from "bare-client-custom"; } from "bare-client-custom";
import { ReadableStream, TransformStream } from "node:stream/web";
import { MAX_CHUNK_SIZE } from "protocol"; import { MAX_CHUNK_SIZE } from "protocol";
import { Connection } from "./Connection"; import { Connection } from "./Connection";
@ -23,6 +22,17 @@ function createBodyStream(
): ReadableStream<ArrayBuffer | Uint8Array> | null { ): ReadableStream<ArrayBuffer | Uint8Array> | null {
if (body === null) return null; if (body === null) return null;
if (typeof body === "string") {
body = new TextEncoder().encode(body);
}
if (ArrayBuffer.isView(body)) {
body = body.buffer.slice(
body.byteOffset,
body.byteOffset + body.byteLength
);
}
const transformer = () => const transformer = () =>
new TransformStream({ new TransformStream({
transform: async ( transform: async (
@ -82,7 +92,6 @@ function createBodyStream(
} }
if (body instanceof Blob) { if (body instanceof Blob) {
// @ts-expect-error
return body.stream().pipeThrough(transformer()); return body.stream().pipeThrough(transformer());
} }

View file

@ -1,4 +1,3 @@
import { ReadableStream } from "node:stream/web";
import { import {
C2SRequestType, C2SRequestType,
C2SRequestTypes, C2SRequestTypes,
@ -164,11 +163,11 @@ export class Connection {
await this.send(seq, C2SRequestTypes.HTTPRequestStart, new Blob([json])); await this.send(seq, C2SRequestTypes.HTTPRequestStart, new Blob([json]));
if (body) { if (body) {
for await (const chunk of body) { for await (const chunk of body as unknown as NodeJS.ReadableStream) {
await this.send( await this.send(
seq, seq,
C2SRequestTypes.HTTPRequestChunk, C2SRequestTypes.HTTPRequestChunk,
new Uint8Array(chunk) new Uint8Array(chunk as Uint8Array | ArrayBuffer)
); );
} }
} }

12
pnpm-lock.yaml generated
View file

@ -85,10 +85,6 @@ importers:
version: 4.9.4 version: 4.9.4
Ultraviolet: Ultraviolet:
dependencies:
bare-client-custom:
specifier: workspace:2.2.0-alpha
version: link:../bare-client-custom
devDependencies: devDependencies:
'@tomphttp/bare-client': '@tomphttp/bare-client':
specifier: ^2.2.0-alpha specifier: ^2.2.0-alpha
@ -178,6 +174,10 @@ importers:
protocol: protocol:
specifier: workspace:* specifier: workspace:*
version: link:../protocol version: link:../protocol
devDependencies:
'@types/node':
specifier: ^20.4.10
version: 20.4.10
corium: corium:
dependencies: dependencies:
@ -3779,10 +3779,6 @@ packages:
resolution: {integrity: sha512-BG7fQKZ689HIoc5h+6D2Dgq1fABRa0RbBWKBd9SP/MVRVXROflpm5fhwyATX5duFmbStzyzyycPB8qUYKDH3NA==} resolution: {integrity: sha512-BG7fQKZ689HIoc5h+6D2Dgq1fABRa0RbBWKBd9SP/MVRVXROflpm5fhwyATX5duFmbStzyzyycPB8qUYKDH3NA==}
dev: true dev: true
/@types/crypto-js@4.1.1:
resolution: {integrity: sha512-BG7fQKZ689HIoc5h+6D2Dgq1fABRa0RbBWKBd9SP/MVRVXROflpm5fhwyATX5duFmbStzyzyycPB8qUYKDH3NA==}
dev: true
/@types/css-tree@2.0.0: /@types/css-tree@2.0.0:
resolution: {integrity: sha512-mY2sXRLBnUPMYw6mkOT+6dABeaNxAEKZz6scE9kQPNJx8fKe1fOsm8Honl7+xFYe6TKX8WNk2+7oMp2vBArJ9Q==} resolution: {integrity: sha512-mY2sXRLBnUPMYw6mkOT+6dABeaNxAEKZz6scE9kQPNJx8fKe1fOsm8Honl7+xFYe6TKX8WNk2+7oMp2vBArJ9Q==}
dev: true dev: true

View file

@ -9,7 +9,7 @@ import {
import { Agent as HTTPSAgent, request as httpsRequest } from "https"; import { Agent as HTTPSAgent, request as httpsRequest } from "https";
import fuck from "ipaddr.js"; import fuck from "ipaddr.js";
import { HTTPRequestPayload } from "protocol"; import { HTTPRequestPayload } from "protocol";
import { Readable } from "stream"; import { Writable } from "stream";
const { isValid, parse } = fuck; const { isValid, parse } = fuck;
export interface BareErrorBody { export interface BareErrorBody {
@ -139,6 +139,7 @@ function outgoingError<T>(error: T): T | BareError {
export async function bareFetch( export async function bareFetch(
request: HTTPRequestPayload, request: HTTPRequestPayload,
pipeOutgoing: (s: Writable) => void,
signal: AbortSignal, signal: AbortSignal,
remote: URL, remote: URL,
options: BareServerOptions options: BareServerOptions
@ -172,8 +173,7 @@ export async function bareFetch(
}); });
else throw new RangeError(`Unsupported protocol: '${remote.protocol}'`); else throw new RangeError(`Unsupported protocol: '${remote.protocol}'`);
if (request.body) Readable.from([request.body]).pipe(outgoing); pipeOutgoing(outgoing);
else outgoing.end();
return await new Promise((resolve, reject) => { return await new Promise((resolve, reject) => {
outgoing.on("response", (response: IncomingMessage) => { outgoing.on("response", (response: IncomingMessage) => {

View file

@ -12,7 +12,7 @@ import {
WSClosePayload, WSClosePayload,
WSErrorPayload, WSErrorPayload,
} from "protocol"; } from "protocol";
import { Readable } from "stream"; import { Readable, Writable } from "stream";
import { BareError, bareFetch, options } from "./http"; import { BareError, bareFetch, options } from "./http";
function bareErrorToResponse(e: BareError): { function bareErrorToResponse(e: BareError): {
@ -25,12 +25,14 @@ function bareErrorToResponse(e: BareError): {
statusText: STATUS_CODES[e.status] || "", statusText: STATUS_CODES[e.status] || "",
headers: {}, headers: {},
}, },
// TODO: this is node specific. for web we might have to go through Blob here
body: Readable.from(JSON.stringify(e.body)), body: Readable.from(JSON.stringify(e.body)),
}; };
} }
export class AdriftServer { export class AdriftServer {
send: (msg: ArrayBuffer) => void; send: (msg: ArrayBuffer) => void;
requestStreams: Record<number, Writable> = {};
sockets: Record<number, WebSocket> = {}; sockets: Record<number, WebSocket> = {};
events: EventEmitter; events: EventEmitter;
@ -74,7 +76,10 @@ export class AdriftServer {
return payload; return payload;
} }
async handleHTTPRequest(payload: HTTPRequestPayload): Promise<{ async handleHTTPRequest(
payload: HTTPRequestPayload,
pipeOutgoing: (s: Writable) => void
): Promise<{
payload: HTTPResponsePayload; payload: HTTPResponsePayload;
body: AsyncIterable<ArrayBuffer>; body: AsyncIterable<ArrayBuffer>;
}> { }> {
@ -89,6 +94,7 @@ export class AdriftServer {
try { try {
resp = await bareFetch( resp = await bareFetch(
payload, payload,
pipeOutgoing,
abort.signal, abort.signal,
new URL(payload.remote), new URL(payload.remote),
options options
@ -199,16 +205,20 @@ export class AdriftServer {
if (!init) return; if (!init) return;
const { cursor, seq, op } = init; const { cursor, seq, op } = init;
switch (op) { switch (op) {
case C2SRequestTypes.HTTPRequest: { case C2SRequestTypes.HTTPRequestStart: {
let resp: { let resp: {
payload: HTTPResponsePayload; payload: HTTPResponsePayload;
body: AsyncIterable<ArrayBuffer>; body: AsyncIterable<ArrayBuffer>;
}; };
const reqPayload = AdriftServer.tryParseJSONPayload(msg.slice(cursor)); const reqPayload = AdriftServer.tryParseJSONPayload(msg.slice(cursor));
if (!reqPayload) return; if (!reqPayload) return;
try { try {
resp = await this.handleHTTPRequest(reqPayload); resp = await this.handleHTTPRequest(reqPayload, (outgoingStream) => {
this.requestStreams[seq] = outgoingStream;
});
} catch (e) { } catch (e) {
delete this.requestStreams[seq];
if (options.logErrors) console.error(e); if (options.logErrors) console.error(e);
let bareError; let bareError;
@ -233,8 +243,10 @@ export class AdriftServer {
resp = bareErrorToResponse(bareError); resp = bareErrorToResponse(bareError);
} }
delete this.requestStreams[seq];
const { payload, body } = resp; const { payload, body } = resp;
this.sendHTTPResponseStart(seq, payload); this.sendHTTPResponseStart(seq, payload);
for await (const chunk of body) { for await (const chunk of body) {
let chunkPart = null; let chunkPart = null;
let chunkRest = chunk; let chunkRest = chunk;
@ -248,6 +260,21 @@ export class AdriftServer {
break; break;
} }
case C2SRequestTypes.HTTPRequestChunk: {
const stream = this.requestStreams[seq];
if (!stream) return;
stream.write(new Uint8Array(msg.slice(cursor)));
break;
}
case C2SRequestTypes.HTTPRequestEnd: {
const stream = this.requestStreams[seq];
if (!stream) return;
stream.end();
delete this.requestStreams[seq];
break;
}
case C2SRequestTypes.WSOpen: { case C2SRequestTypes.WSOpen: {
const payload = AdriftServer.tryParseJSONPayload(msg.slice(cursor)); const payload = AdriftServer.tryParseJSONPayload(msg.slice(cursor));
const ws = (this.sockets[seq] = new WebSocket(payload.url)); const ws = (this.sockets[seq] = new WebSocket(payload.url));