add a ping message to make sure worker port is alive

This commit is contained in:
Toshit Chawda 2024-07-08 12:59:37 -07:00
parent fde90f3ec1
commit 6e86375a69
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
2 changed files with 57 additions and 26 deletions

View file

@ -3,7 +3,7 @@ import { BareHeaders, TransferrableResponse } from "./baretypes";
type SWClient = { postMessage: typeof MessagePort.prototype.postMessage }; type SWClient = { postMessage: typeof MessagePort.prototype.postMessage };
export type WorkerMessage = { export type WorkerMessage = {
type: "fetch" | "websocket" | "set" | "get", type: "fetch" | "websocket" | "set" | "get" | "ping",
fetch?: { fetch?: {
remote: string, remote: string,
method: string, method: string,
@ -26,7 +26,7 @@ export type WorkerRequest = {
} }
export type WorkerResponse = { export type WorkerResponse = {
type: "fetch" | "websocket" | "set" | "get" | "error", type: "fetch" | "websocket" | "set" | "get" | "pong" | "error",
fetch?: TransferrableResponse, fetch?: TransferrableResponse,
name?: string, name?: string,
error?: Error, error?: Error,
@ -59,33 +59,40 @@ function tryGetPort(client: SWClient): Promise<MessagePort> {
}); });
} }
function createPort(path: string, channel: BroadcastChannel): MessagePort { function createPort(path: string, channel: BroadcastChannel, registerHandlers: boolean): MessagePort {
const worker = new SharedWorker(path, "bare-mux-worker"); const worker = new SharedWorker(path, "bare-mux-worker");
// uv removes navigator.serviceWorker so this errors if (registerHandlers) {
if (navigator.serviceWorker) { // uv removes navigator.serviceWorker so this errors
navigator.serviceWorker.addEventListener("message", event => { if (navigator.serviceWorker) {
if (event.data.type === "getPort" && event.data.port) { navigator.serviceWorker.addEventListener("message", event => {
console.debug("bare-mux: recieved request for port from sw"); if (event.data.type === "getPort" && event.data.port) {
const worker = new SharedWorker(path, "bare-mux-worker"); console.debug("bare-mux: recieved request for port from sw");
event.data.port.postMessage(worker.port, [worker.port]); const worker = new SharedWorker(path, "bare-mux-worker");
} event.data.port.postMessage(worker.port, [worker.port]);
}); }
} });
channel.onmessage = (event: MessageEvent) => {
if (event.data.type === "getPath") {
console.debug("bare-mux: recieved request for worker path from broadcast channel");
channel.postMessage(<BroadcastMessage>{ type: "path", path: path });
} }
}; channel.onmessage = (event: MessageEvent) => {
if (event.data.type === "getPath") {
console.debug("bare-mux: recieved request for worker path from broadcast channel");
channel.postMessage(<BroadcastMessage>{ type: "path", path: path });
}
};
}
return worker.port; return worker.port;
} }
export class WorkerConnection { export class WorkerConnection {
channel: BroadcastChannel; channel: BroadcastChannel;
port: MessagePort | Promise<MessagePort>; port: MessagePort | Promise<MessagePort>;
workerPath: string;
constructor(workerPath?: string) { constructor(workerPath?: string) {
this.channel = new BroadcastChannel("bare-mux"); this.channel = new BroadcastChannel("bare-mux");
this.createChannel(workerPath, true);
}
createChannel(workerPath?: string, inInit?: boolean) {
// @ts-expect-error // @ts-expect-error
if (self.clients) { if (self.clients) {
// running in a ServiceWorker // running in a ServiceWorker
@ -101,14 +108,14 @@ export class WorkerConnection {
// create the SharedWorker and help other bare-mux clients get the workerPath // create the SharedWorker and help other bare-mux clients get the workerPath
if (!workerPath.startsWith("/") && !workerPath.includes("://")) throw new Error("Invalid URL. Must be absolute or start at the root."); if (!workerPath.startsWith("/") && !workerPath.includes("://")) throw new Error("Invalid URL. Must be absolute or start at the root.");
this.port = createPort(workerPath, this.channel); this.port = createPort(workerPath, this.channel, inInit);
} else if (SharedWorker) { } else if (SharedWorker) {
// running in a window, was not passed a workerPath // running in a window, was not passed a workerPath
// ask other bare-mux clients for the workerPath // ask other bare-mux clients for the workerPath
this.port = new Promise(resolve => { this.port = new Promise(resolve => {
this.channel.onmessage = (event: MessageEvent) => { this.channel.onmessage = (event: MessageEvent) => {
if (event.data.type === "path") { if (event.data.type === "path") {
resolve(createPort(event.data.path, this.channel)); resolve(createPort(event.data.path, this.channel, inInit));
} }
} }
this.channel.postMessage(<BroadcastMessage>{ type: "getPath" }); this.channel.postMessage(<BroadcastMessage>{ type: "getPath" });
@ -121,12 +128,29 @@ export class WorkerConnection {
async sendMessage(message: WorkerMessage, transferable?: Transferable[]): Promise<WorkerResponse> { async sendMessage(message: WorkerMessage, transferable?: Transferable[]): Promise<WorkerResponse> {
if (this.port instanceof Promise) this.port = await this.port; if (this.port instanceof Promise) this.port = await this.port;
let channel = new MessageChannel();
let toTransfer: Transferable[] = [channel.port2, ...(transferable || [])];
this.port.postMessage(<WorkerRequest>{ message: message, port: channel.port2 }, toTransfer); const pingChannel = new MessageChannel();
const pingPromise: Promise<void> = new Promise((resolve, reject) => {
pingChannel.port1.onmessage = event => {
if (event.data.type === "pong") {
resolve();
}
};
setTimeout(reject, 1500);
});
this.port.postMessage(<WorkerRequest>{ message: { type: "ping" }, port: pingChannel.port2 }, [pingChannel.port2]);
try {
await pingPromise;
} catch {
console.warn("Failed to get a ping response from the worker within 1.5s. Assuming port is dead.");
this.createChannel();
return await this.sendMessage(message, transferable);
}
return await new Promise((resolve, reject) => { const channel = new MessageChannel();
const toTransfer: Transferable[] = [channel.port2, ...(transferable || [])];
const promise: Promise<WorkerResponse> = new Promise((resolve, reject) => {
channel.port1.onmessage = event => { channel.port1.onmessage = event => {
const message = event.data; const message = event.data;
if (message.type === "error") { if (message.type === "error") {
@ -136,5 +160,8 @@ export class WorkerConnection {
} }
} }
}); });
this.port.postMessage(<WorkerRequest>{ message: message, port: channel.port2 }, toTransfer);
return await promise;
} }
} }

View file

@ -18,8 +18,9 @@ function handleConnection(port: MessagePort) {
port.onmessage = async (event: MessageEvent) => { port.onmessage = async (event: MessageEvent) => {
const port = event.data.port; const port = event.data.port;
const message: WorkerMessage = event.data.message; const message: WorkerMessage = event.data.message;
if (message.type === "ping") {
if (message.type === "set") { port.postMessage(<WorkerResponse>{ type: "pong" });
} else if (message.type === "set") {
try { try {
const AsyncFunction = (async function() { }).constructor; const AsyncFunction = (async function() { }).constructor;
@ -32,6 +33,7 @@ function handleConnection(port: MessagePort) {
port.postMessage(<WorkerResponse>{ type: "set" }); port.postMessage(<WorkerResponse>{ type: "set" });
} catch (err) { } catch (err) {
console.error(err);
port.postMessage(<WorkerResponse>{ type: "error", error: err }); port.postMessage(<WorkerResponse>{ type: "error", error: err });
} }
} else if (message.type === "get") { } else if (message.type === "get") {
@ -55,6 +57,7 @@ function handleConnection(port: MessagePort) {
port.postMessage(<WorkerResponse>{ type: "fetch", fetch: resp }); port.postMessage(<WorkerResponse>{ type: "fetch", fetch: resp });
} }
} catch (err) { } catch (err) {
console.error(err);
port.postMessage(<WorkerResponse>{ type: "error", error: err }); port.postMessage(<WorkerResponse>{ type: "error", error: err });
} }
} else if (message.type === "websocket") { } else if (message.type === "websocket") {
@ -98,6 +101,7 @@ function handleConnection(port: MessagePort) {
port.postMessage(<WorkerResponse>{ type: "websocket" }); port.postMessage(<WorkerResponse>{ type: "websocket" });
} catch (err) { } catch (err) {
console.error(err);
port.postMessage(<WorkerResponse>{ type: "error", error: err }); port.postMessage(<WorkerResponse>{ type: "error", error: err });
} }
} }