mirror of
https://github.com/MercuryWorkshop/bare-mux.git
synced 2025-05-14 14:50:03 -04:00
remote transport
This commit is contained in:
parent
9facd9d4f7
commit
3a43dfcc15
3 changed files with 137 additions and 68 deletions
|
@ -1,6 +1,7 @@
|
||||||
import { BareHeaders, maxRedirects } from './baretypes';
|
import { BareHeaders, BareTransport, maxRedirects } from './baretypes';
|
||||||
import { WorkerConnection, WorkerMessage } from './connection';
|
import { WorkerConnection, WorkerMessage } from './connection';
|
||||||
import { WebSocketFields } from './snapshot';
|
import { WebSocketFields } from './snapshot';
|
||||||
|
import { handleFetch, handleWebsocket, sendError } from './workerHandlers';
|
||||||
|
|
||||||
const validChars =
|
const validChars =
|
||||||
"!#$%&'*+-.0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz|~";
|
"!#$%&'*+-.0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz|~";
|
||||||
|
@ -113,6 +114,7 @@ export class BareMuxConnection {
|
||||||
}
|
}
|
||||||
|
|
||||||
async setManualTransport(functionBody: string, options: any[], transferables?: Transferable[]) {
|
async setManualTransport(functionBody: string, options: any[], transferables?: Transferable[]) {
|
||||||
|
if (functionBody === "bare-mux-remote") throw new Error("Use setRemoteTransport.");
|
||||||
await this.worker.sendMessage({
|
await this.worker.sendMessage({
|
||||||
type: "set",
|
type: "set",
|
||||||
client: {
|
client: {
|
||||||
|
@ -122,6 +124,39 @@ export class BareMuxConnection {
|
||||||
}, transferables);
|
}, transferables);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async setRemoteTransport(transport: BareTransport, name: string) {
|
||||||
|
const channel = new MessageChannel();
|
||||||
|
|
||||||
|
channel.port1.onmessage = async (event: MessageEvent) => {
|
||||||
|
const port = event.data.port;
|
||||||
|
const message: WorkerMessage = event.data.message;
|
||||||
|
|
||||||
|
if (message.type === "fetch") {
|
||||||
|
try {
|
||||||
|
if (!transport.ready) await transport.init();
|
||||||
|
await handleFetch(message, port, transport);
|
||||||
|
} catch (err) {
|
||||||
|
sendError(port, err, "fetch");
|
||||||
|
}
|
||||||
|
} else if (message.type === "websocket") {
|
||||||
|
try {
|
||||||
|
if (!transport.ready) await transport.init();
|
||||||
|
await handleWebsocket(message, port, transport);
|
||||||
|
} catch (err) {
|
||||||
|
sendError(port, err, "websocket");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.worker.sendMessage({
|
||||||
|
type: "set",
|
||||||
|
client: {
|
||||||
|
function: "bare-mux-remote",
|
||||||
|
args: [channel.port2, name]
|
||||||
|
},
|
||||||
|
}, [channel.port2]);
|
||||||
|
}
|
||||||
|
|
||||||
getInnerPort(): MessagePort | Promise<MessagePort> {
|
getInnerPort(): MessagePort | Promise<MessagePort> {
|
||||||
return this.worker.port;
|
return this.worker.port;
|
||||||
}
|
}
|
||||||
|
|
102
src/worker.ts
102
src/worker.ts
|
@ -1,7 +1,8 @@
|
||||||
import { BareTransport } from "./baretypes";
|
import { BareTransport } from "./baretypes";
|
||||||
import { BroadcastMessage, WorkerMessage, WorkerResponse, browserSupportsTransferringStreams } from "./connection"
|
import { BroadcastMessage, WorkerMessage, WorkerRequest, WorkerResponse } from "./connection"
|
||||||
|
import { handleFetch, handleWebsocket, sendError } from "./workerHandlers";
|
||||||
|
|
||||||
let currentTransport: BareTransport | null = null;
|
let currentTransport: BareTransport | MessagePort | null = null;
|
||||||
let currentTransportName: string = "";
|
let currentTransportName: string = "";
|
||||||
|
|
||||||
const channel = new BroadcastChannel("bare-mux");
|
const channel = new BroadcastChannel("bare-mux");
|
||||||
|
@ -14,6 +15,14 @@ function noClients(): Error {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function handleRemoteClient(message: WorkerMessage, port: MessagePort) {
|
||||||
|
const remote = currentTransport as MessagePort;
|
||||||
|
let transferables: Transferable[] = [port];
|
||||||
|
if (message.fetch?.body) transferables.push(message.fetch.body);
|
||||||
|
if (message.websocket?.channel) transferables.push(message.websocket.channel);
|
||||||
|
remote.postMessage(<WorkerRequest>{ message, port }, transferables);
|
||||||
|
}
|
||||||
|
|
||||||
function handleConnection(port: MessagePort) {
|
function handleConnection(port: MessagePort) {
|
||||||
port.onmessage = async (event: MessageEvent) => {
|
port.onmessage = async (event: MessageEvent) => {
|
||||||
const port = event.data.port;
|
const port = event.data.port;
|
||||||
|
@ -24,90 +33,49 @@ function handleConnection(port: MessagePort) {
|
||||||
try {
|
try {
|
||||||
const AsyncFunction = (async function() { }).constructor;
|
const AsyncFunction = (async function() { }).constructor;
|
||||||
|
|
||||||
// @ts-expect-error
|
if (message.client.function === "bare-mux-remote") {
|
||||||
const func = new AsyncFunction(message.client.function);
|
currentTransport = message.client.args[0] as MessagePort;
|
||||||
const [newTransport, name] = await func();
|
currentTransportName = `bare-mux-remote (${message.client.args[1]})`;
|
||||||
currentTransport = new newTransport(...message.client.args);
|
} else {
|
||||||
currentTransportName = name;
|
// @ts-expect-error
|
||||||
console.log("set transport to ", currentTransport, name);
|
const func = new AsyncFunction(message.client.function);
|
||||||
|
const [newTransport, name] = await func();
|
||||||
|
currentTransport = new newTransport(...message.client.args);
|
||||||
|
currentTransportName = name;
|
||||||
|
}
|
||||||
|
console.log("set transport to ", currentTransport, currentTransportName);
|
||||||
|
|
||||||
port.postMessage(<WorkerResponse>{ type: "set" });
|
port.postMessage(<WorkerResponse>{ type: "set" });
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error("error while processing 'set': ", err);
|
sendError(port, err, 'set');
|
||||||
port.postMessage(<WorkerResponse>{ type: "error", error: err });
|
|
||||||
}
|
}
|
||||||
} else if (message.type === "get") {
|
} else if (message.type === "get") {
|
||||||
port.postMessage(<WorkerResponse>{ type: "get", name: currentTransportName });
|
port.postMessage(<WorkerResponse>{ type: "get", name: currentTransportName });
|
||||||
} else if (message.type === "fetch") {
|
} else if (message.type === "fetch") {
|
||||||
try {
|
try {
|
||||||
if (!currentTransport) throw noClients();
|
if (!currentTransport) throw noClients();
|
||||||
|
if (currentTransport instanceof MessagePort) {
|
||||||
|
handleRemoteClient(message, port);
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (!currentTransport.ready) await currentTransport.init();
|
if (!currentTransport.ready) await currentTransport.init();
|
||||||
|
|
||||||
const resp = await currentTransport.request(
|
await handleFetch(message, port, currentTransport);
|
||||||
new URL(message.fetch.remote),
|
|
||||||
message.fetch.method,
|
|
||||||
message.fetch.body,
|
|
||||||
message.fetch.headers,
|
|
||||||
null
|
|
||||||
);
|
|
||||||
|
|
||||||
if (!browserSupportsTransferringStreams() && resp.body instanceof ReadableStream) {
|
|
||||||
const conversionResp = new Response(resp.body);
|
|
||||||
resp.body = await conversionResp.arrayBuffer();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (resp.body instanceof ReadableStream || resp.body instanceof ArrayBuffer) {
|
|
||||||
port.postMessage(<WorkerResponse>{ type: "fetch", fetch: resp }, [resp.body]);
|
|
||||||
} else {
|
|
||||||
port.postMessage(<WorkerResponse>{ type: "fetch", fetch: resp });
|
|
||||||
}
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error("error while processing 'fetch': ", err);
|
sendError(port, err, 'fetch');
|
||||||
port.postMessage(<WorkerResponse>{ type: "error", error: err });
|
|
||||||
}
|
}
|
||||||
} else if (message.type === "websocket") {
|
} else if (message.type === "websocket") {
|
||||||
try {
|
try {
|
||||||
if (!currentTransport) throw noClients();
|
if (!currentTransport) throw noClients();
|
||||||
|
if (currentTransport instanceof MessagePort) {
|
||||||
|
handleRemoteClient(message, port);
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (!currentTransport.ready) await currentTransport.init();
|
if (!currentTransport.ready) await currentTransport.init();
|
||||||
|
|
||||||
const onopen = (protocol: string) => {
|
await handleWebsocket(message, port, currentTransport);
|
||||||
message.websocket.channel.postMessage({ type: "open", args: [protocol] });
|
|
||||||
};
|
|
||||||
const onclose = (code: number, reason: string) => {
|
|
||||||
message.websocket.channel.postMessage({ type: "close", args: [code, reason] });
|
|
||||||
};
|
|
||||||
const onerror = (error: string) => {
|
|
||||||
message.websocket.channel.postMessage({ type: "error", args: [error] });
|
|
||||||
};
|
|
||||||
const onmessage = (data: Blob | ArrayBuffer | string) => {
|
|
||||||
if (data instanceof ArrayBuffer) {
|
|
||||||
message.websocket.channel.postMessage({ type: "message", args: [data] }, [data]);
|
|
||||||
} else {
|
|
||||||
message.websocket.channel.postMessage({ type: "message", args: [data] });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
const [data, close] = currentTransport.connect(
|
|
||||||
new URL(message.websocket.url),
|
|
||||||
message.websocket.origin,
|
|
||||||
message.websocket.protocols,
|
|
||||||
message.websocket.requestHeaders,
|
|
||||||
onopen,
|
|
||||||
onmessage,
|
|
||||||
onclose,
|
|
||||||
onerror,
|
|
||||||
);
|
|
||||||
message.websocket.channel.onmessage = (event: MessageEvent) => {
|
|
||||||
if (event.data.type === "data") {
|
|
||||||
data(event.data.data);
|
|
||||||
} else if (event.data.type === "close") {
|
|
||||||
close(event.data.closeCode, event.data.closeReason);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
port.postMessage(<WorkerResponse>{ type: "websocket" });
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error("error while processing 'websocket': ", err);
|
sendError(port, err, 'websocket');
|
||||||
port.postMessage(<WorkerResponse>{ type: "error", error: err });
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
66
src/workerHandlers.ts
Normal file
66
src/workerHandlers.ts
Normal file
|
@ -0,0 +1,66 @@
|
||||||
|
import { BareTransport } from "./baretypes";
|
||||||
|
import { browserSupportsTransferringStreams, WorkerMessage, WorkerResponse } from "./connection";
|
||||||
|
|
||||||
|
export function sendError(port: MessagePort, err: Error, name: string) {
|
||||||
|
console.error(`error while processing '${name}': `, err);
|
||||||
|
port.postMessage(<WorkerResponse>{ type: "error", error: err });
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function handleFetch(message: WorkerMessage, port: MessagePort, transport: BareTransport) {
|
||||||
|
const resp = await transport.request(
|
||||||
|
new URL(message.fetch.remote),
|
||||||
|
message.fetch.method,
|
||||||
|
message.fetch.body,
|
||||||
|
message.fetch.headers,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
|
||||||
|
if (!browserSupportsTransferringStreams() && resp.body instanceof ReadableStream) {
|
||||||
|
const conversionResp = new Response(resp.body);
|
||||||
|
resp.body = await conversionResp.arrayBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (resp.body instanceof ReadableStream || resp.body instanceof ArrayBuffer) {
|
||||||
|
port.postMessage(<WorkerResponse>{ type: "fetch", fetch: resp }, [resp.body]);
|
||||||
|
} else {
|
||||||
|
port.postMessage(<WorkerResponse>{ type: "fetch", fetch: resp });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function handleWebsocket(message: WorkerMessage, port: MessagePort, transport: BareTransport) {
|
||||||
|
const onopen = (protocol: string) => {
|
||||||
|
message.websocket.channel.postMessage({ type: "open", args: [protocol] });
|
||||||
|
};
|
||||||
|
const onclose = (code: number, reason: string) => {
|
||||||
|
message.websocket.channel.postMessage({ type: "close", args: [code, reason] });
|
||||||
|
};
|
||||||
|
const onerror = (error: string) => {
|
||||||
|
message.websocket.channel.postMessage({ type: "error", args: [error] });
|
||||||
|
};
|
||||||
|
const onmessage = (data: Blob | ArrayBuffer | string) => {
|
||||||
|
if (data instanceof ArrayBuffer) {
|
||||||
|
message.websocket.channel.postMessage({ type: "message", args: [data] }, [data]);
|
||||||
|
} else {
|
||||||
|
message.websocket.channel.postMessage({ type: "message", args: [data] });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const [data, close] = transport.connect(
|
||||||
|
new URL(message.websocket.url),
|
||||||
|
message.websocket.origin,
|
||||||
|
message.websocket.protocols,
|
||||||
|
message.websocket.requestHeaders,
|
||||||
|
onopen,
|
||||||
|
onmessage,
|
||||||
|
onclose,
|
||||||
|
onerror,
|
||||||
|
);
|
||||||
|
message.websocket.channel.onmessage = (event: MessageEvent) => {
|
||||||
|
if (event.data.type === "data") {
|
||||||
|
data(event.data.data);
|
||||||
|
} else if (event.data.type === "close") {
|
||||||
|
close(event.data.closeCode, event.data.closeReason);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
port.postMessage(<WorkerResponse>{ type: "websocket" });
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue