diff --git a/client/src/AdriftClient.ts b/client/src/AdriftClient.ts index 046b196..60ab972 100644 --- a/client/src/AdriftClient.ts +++ b/client/src/AdriftClient.ts @@ -7,7 +7,6 @@ import { ReadyStateCallback, WebSocketImpl, } from "bare-client-custom"; -import { ReadableStream, TransformStream } from "node:stream/web"; import { MAX_CHUNK_SIZE } from "protocol"; import { Connection } from "./Connection"; @@ -23,6 +22,17 @@ function createBodyStream( ): ReadableStream | null { if (body === null) return null; + if (typeof body === "string") { + body = new TextEncoder().encode(body); + } + + if (ArrayBuffer.isView(body)) { + body = body.buffer.slice( + body.byteOffset, + body.byteOffset + body.byteLength + ); + } + const transformer = () => new TransformStream({ transform: async ( @@ -82,7 +92,6 @@ function createBodyStream( } if (body instanceof Blob) { - // @ts-expect-error return body.stream().pipeThrough(transformer()); } diff --git a/client/src/Connection.ts b/client/src/Connection.ts index e9b0a6c..b26cfdb 100644 --- a/client/src/Connection.ts +++ b/client/src/Connection.ts @@ -1,4 +1,3 @@ -import { ReadableStream } from "node:stream/web"; import { C2SRequestType, C2SRequestTypes, @@ -164,11 +163,11 @@ export class Connection { await this.send(seq, C2SRequestTypes.HTTPRequestStart, new Blob([json])); if (body) { - for await (const chunk of body) { + for await (const chunk of body as unknown as NodeJS.ReadableStream) { await this.send( seq, C2SRequestTypes.HTTPRequestChunk, - new Uint8Array(chunk) + new Uint8Array(chunk as Uint8Array | ArrayBuffer) ); } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d74f131..912de03 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -85,10 +85,6 @@ importers: version: 4.9.4 Ultraviolet: - dependencies: - bare-client-custom: - specifier: workspace:2.2.0-alpha - version: link:../bare-client-custom devDependencies: '@tomphttp/bare-client': specifier: ^2.2.0-alpha @@ -178,6 +174,10 @@ importers: protocol: specifier: workspace:* version: link:../protocol + devDependencies: + '@types/node': + specifier: ^20.4.10 + version: 20.4.10 corium: dependencies: @@ -3779,10 +3779,6 @@ packages: resolution: {integrity: sha512-BG7fQKZ689HIoc5h+6D2Dgq1fABRa0RbBWKBd9SP/MVRVXROflpm5fhwyATX5duFmbStzyzyycPB8qUYKDH3NA==} dev: true - /@types/crypto-js@4.1.1: - resolution: {integrity: sha512-BG7fQKZ689HIoc5h+6D2Dgq1fABRa0RbBWKBd9SP/MVRVXROflpm5fhwyATX5duFmbStzyzyycPB8qUYKDH3NA==} - dev: true - /@types/css-tree@2.0.0: resolution: {integrity: sha512-mY2sXRLBnUPMYw6mkOT+6dABeaNxAEKZz6scE9kQPNJx8fKe1fOsm8Honl7+xFYe6TKX8WNk2+7oMp2vBArJ9Q==} dev: true diff --git a/server/src/http.ts b/server/src/http.ts index ca40655..6a67e35 100644 --- a/server/src/http.ts +++ b/server/src/http.ts @@ -9,7 +9,7 @@ import { import { Agent as HTTPSAgent, request as httpsRequest } from "https"; import fuck from "ipaddr.js"; import { HTTPRequestPayload } from "protocol"; -import { Readable } from "stream"; +import { Writable } from "stream"; const { isValid, parse } = fuck; export interface BareErrorBody { @@ -139,6 +139,7 @@ function outgoingError(error: T): T | BareError { export async function bareFetch( request: HTTPRequestPayload, + pipeOutgoing: (s: Writable) => void, signal: AbortSignal, remote: URL, options: BareServerOptions @@ -172,8 +173,7 @@ export async function bareFetch( }); else throw new RangeError(`Unsupported protocol: '${remote.protocol}'`); - if (request.body) Readable.from([request.body]).pipe(outgoing); - else outgoing.end(); + pipeOutgoing(outgoing); return await new Promise((resolve, reject) => { outgoing.on("response", (response: IncomingMessage) => { diff --git a/server/src/server.ts b/server/src/server.ts index ae09848..7706ff3 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -12,7 +12,7 @@ import { WSClosePayload, WSErrorPayload, } from "protocol"; -import { Readable } from "stream"; +import { Readable, Writable } from "stream"; import { BareError, bareFetch, options } from "./http"; function bareErrorToResponse(e: BareError): { @@ -25,12 +25,14 @@ function bareErrorToResponse(e: BareError): { statusText: STATUS_CODES[e.status] || "", headers: {}, }, + // TODO: this is node specific. for web we might have to go through Blob here body: Readable.from(JSON.stringify(e.body)), }; } export class AdriftServer { send: (msg: ArrayBuffer) => void; + requestStreams: Record = {}; sockets: Record = {}; events: EventEmitter; @@ -74,7 +76,10 @@ export class AdriftServer { return payload; } - async handleHTTPRequest(payload: HTTPRequestPayload): Promise<{ + async handleHTTPRequest( + payload: HTTPRequestPayload, + pipeOutgoing: (s: Writable) => void + ): Promise<{ payload: HTTPResponsePayload; body: AsyncIterable; }> { @@ -89,6 +94,7 @@ export class AdriftServer { try { resp = await bareFetch( payload, + pipeOutgoing, abort.signal, new URL(payload.remote), options @@ -199,16 +205,20 @@ export class AdriftServer { if (!init) return; const { cursor, seq, op } = init; switch (op) { - case C2SRequestTypes.HTTPRequest: { + case C2SRequestTypes.HTTPRequestStart: { let resp: { payload: HTTPResponsePayload; body: AsyncIterable; }; const reqPayload = AdriftServer.tryParseJSONPayload(msg.slice(cursor)); if (!reqPayload) return; + try { - resp = await this.handleHTTPRequest(reqPayload); + resp = await this.handleHTTPRequest(reqPayload, (outgoingStream) => { + this.requestStreams[seq] = outgoingStream; + }); } catch (e) { + delete this.requestStreams[seq]; if (options.logErrors) console.error(e); let bareError; @@ -233,8 +243,10 @@ export class AdriftServer { resp = bareErrorToResponse(bareError); } + delete this.requestStreams[seq]; const { payload, body } = resp; this.sendHTTPResponseStart(seq, payload); + for await (const chunk of body) { let chunkPart = null; let chunkRest = chunk; @@ -248,6 +260,21 @@ export class AdriftServer { break; } + case C2SRequestTypes.HTTPRequestChunk: { + const stream = this.requestStreams[seq]; + if (!stream) return; + stream.write(new Uint8Array(msg.slice(cursor))); + break; + } + + case C2SRequestTypes.HTTPRequestEnd: { + const stream = this.requestStreams[seq]; + if (!stream) return; + stream.end(); + delete this.requestStreams[seq]; + break; + } + case C2SRequestTypes.WSOpen: { const payload = AdriftServer.tryParseJSONPayload(msg.slice(cursor)); const ws = (this.sockets[seq] = new WebSocket(payload.url));