mirror of
https://github.com/MercuryWorkshop/adrift.git
synced 2025-05-14 06:40:02 -04:00
optimize: perform GET requests in single msg
This commit is contained in:
parent
36e29adcc1
commit
4e31a2a4e7
4 changed files with 45 additions and 35 deletions
|
@ -33,6 +33,28 @@ function createBodyStream(
|
|||
);
|
||||
}
|
||||
|
||||
if (body instanceof ArrayBuffer) {
|
||||
if (body.byteLength == 0) {
|
||||
return null;
|
||||
}
|
||||
let remaining = body;
|
||||
return new ReadableStream({
|
||||
type: "bytes",
|
||||
pull: (controller) => {
|
||||
if (remaining.byteLength <= 0) {
|
||||
return controller.close();
|
||||
}
|
||||
const current = remaining.slice(0, MAX_CHUNK_SIZE);
|
||||
remaining = remaining.slice(MAX_CHUNK_SIZE);
|
||||
controller.enqueue(new Uint8Array(current));
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
if (body instanceof FormData) {
|
||||
throw new Error("formdata todo");
|
||||
}
|
||||
|
||||
const transformer = () =>
|
||||
new TransformStream({
|
||||
transform: async (
|
||||
|
@ -73,32 +95,10 @@ function createBodyStream(
|
|||
return body.pipeThrough(transformer());
|
||||
}
|
||||
|
||||
if (body instanceof ArrayBuffer) {
|
||||
if (body.byteLength == 0) {
|
||||
return null;
|
||||
}
|
||||
let remaining = body;
|
||||
return new ReadableStream({
|
||||
type: "bytes",
|
||||
pull: (controller) => {
|
||||
if (remaining.byteLength <= 0) {
|
||||
return controller.close();
|
||||
}
|
||||
const current = remaining.slice(0, MAX_CHUNK_SIZE);
|
||||
remaining = remaining.slice(MAX_CHUNK_SIZE);
|
||||
controller.enqueue(new Uint8Array(current));
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
if (body instanceof Blob) {
|
||||
return body.stream().pipeThrough(transformer());
|
||||
}
|
||||
|
||||
if (body instanceof FormData) {
|
||||
throw new Error("formdata todo");
|
||||
}
|
||||
|
||||
throw new Error("Unexpected body type: " + body);
|
||||
}
|
||||
|
||||
|
@ -139,10 +139,6 @@ export class AdriftBareClient extends Client {
|
|||
respBody = null;
|
||||
}
|
||||
|
||||
console.log("constructing Response", {
|
||||
status: payload.status,
|
||||
body: respBody?.byteLength,
|
||||
});
|
||||
return new Response(respBody, {
|
||||
status: payload.status,
|
||||
statusText: payload.statusText,
|
||||
|
|
|
@ -4,6 +4,7 @@ import {
|
|||
C2SWSOpenPayload,
|
||||
HTTPRequestPayload,
|
||||
HTTPResponsePayload,
|
||||
ProtoBareHeaders,
|
||||
S2CRequestType,
|
||||
S2CRequestTypes,
|
||||
Transport,
|
||||
|
@ -152,17 +153,22 @@ export class Connection {
|
|||
}
|
||||
|
||||
httprequest(
|
||||
data: HTTPRequestPayload,
|
||||
data: {
|
||||
method: string;
|
||||
requestHeaders: ProtoBareHeaders;
|
||||
remote: URL;
|
||||
},
|
||||
body: ReadableStream<ArrayBuffer | Uint8Array> | null
|
||||
): Promise<{ payload: HTTPResponsePayload; body: ArrayBuffer }> {
|
||||
let json = JSON.stringify(data);
|
||||
const payload: HTTPRequestPayload = { ...data, hasBody: Boolean(body) };
|
||||
let json = JSON.stringify(payload);
|
||||
|
||||
return new Promise(async (resolve) => {
|
||||
let seq = this.nextSeq();
|
||||
this.requestCallbacks[seq] = resolve;
|
||||
await this.send(seq, C2SRequestTypes.HTTPRequestStart, new Blob([json]));
|
||||
|
||||
if (body) {
|
||||
if (payload.hasBody) {
|
||||
for await (const chunk of body as unknown as NodeJS.ReadableStream) {
|
||||
await this.send(
|
||||
seq,
|
||||
|
@ -170,8 +176,8 @@ export class Connection {
|
|||
new Uint8Array(chunk as Uint8Array | ArrayBuffer)
|
||||
);
|
||||
}
|
||||
}
|
||||
await this.send(seq, C2SRequestTypes.HTTPRequestEnd);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ export type HTTPRequestPayload = {
|
|||
method: string;
|
||||
requestHeaders: ProtoBareHeaders;
|
||||
remote: URL;
|
||||
hasBody: boolean;
|
||||
};
|
||||
|
||||
export type HTTPResponsePayload = {
|
||||
|
|
|
@ -92,13 +92,20 @@ export class AdriftServer {
|
|||
|
||||
let resp: IncomingMessage;
|
||||
try {
|
||||
const outgoing = await (this.requestStreams[seq] = bareInitialFetch(
|
||||
const outgoingPromise = bareInitialFetch(
|
||||
payload,
|
||||
abort.signal,
|
||||
new URL(payload.remote),
|
||||
options
|
||||
));
|
||||
resp = await fetchResponse(outgoing);
|
||||
);
|
||||
if (payload.hasBody) {
|
||||
this.requestStreams[seq] = outgoingPromise;
|
||||
}
|
||||
const outgoingStream = await outgoingPromise;
|
||||
if (!payload.hasBody) {
|
||||
outgoingStream.end();
|
||||
}
|
||||
resp = await fetchResponse(await outgoingPromise);
|
||||
} catch (e) {
|
||||
if (e instanceof BareError) {
|
||||
return bareErrorToResponse(e);
|
||||
|
@ -243,10 +250,10 @@ export class AdriftServer {
|
|||
resp = bareErrorToResponse(bareError);
|
||||
}
|
||||
|
||||
const { payload, body } = resp;
|
||||
const { payload, body: responseBody } = resp;
|
||||
this.sendHTTPResponseStart(seq, payload);
|
||||
|
||||
for await (const chunk of body) {
|
||||
for await (const chunk of responseBody) {
|
||||
let chunkPart = null;
|
||||
let chunkRest = chunk;
|
||||
do {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue