diff --git a/src/connection.ts b/src/connection.ts index 2bcdfc8..a092b31 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -3,7 +3,7 @@ import { BareHeaders, TransferrableResponse } from "./baretypes"; type SWClient = { postMessage: typeof MessagePort.prototype.postMessage }; export type WorkerMessage = { - type: "fetch" | "websocket" | "set" | "get", + type: "fetch" | "websocket" | "set" | "get" | "ping", fetch?: { remote: string, method: string, @@ -26,7 +26,7 @@ export type WorkerRequest = { } export type WorkerResponse = { - type: "fetch" | "websocket" | "set" | "get" | "error", + type: "fetch" | "websocket" | "set" | "get" | "pong" | "error", fetch?: TransferrableResponse, name?: string, error?: Error, @@ -59,33 +59,40 @@ function tryGetPort(client: SWClient): Promise { }); } -function createPort(path: string, channel: BroadcastChannel): MessagePort { +function createPort(path: string, channel: BroadcastChannel, registerHandlers: boolean): MessagePort { const worker = new SharedWorker(path, "bare-mux-worker"); - // uv removes navigator.serviceWorker so this errors - if (navigator.serviceWorker) { - navigator.serviceWorker.addEventListener("message", event => { - if (event.data.type === "getPort" && event.data.port) { - console.debug("bare-mux: recieved request for port from sw"); - const worker = new SharedWorker(path, "bare-mux-worker"); - event.data.port.postMessage(worker.port, [worker.port]); - } - }); - } - channel.onmessage = (event: MessageEvent) => { - if (event.data.type === "getPath") { - console.debug("bare-mux: recieved request for worker path from broadcast channel"); - channel.postMessage({ type: "path", path: path }); + if (registerHandlers) { + // uv removes navigator.serviceWorker so this errors + if (navigator.serviceWorker) { + navigator.serviceWorker.addEventListener("message", event => { + if (event.data.type === "getPort" && event.data.port) { + console.debug("bare-mux: recieved request for port from sw"); + const worker = new SharedWorker(path, "bare-mux-worker"); + event.data.port.postMessage(worker.port, [worker.port]); + } + }); } - }; + channel.onmessage = (event: MessageEvent) => { + if (event.data.type === "getPath") { + console.debug("bare-mux: recieved request for worker path from broadcast channel"); + channel.postMessage({ type: "path", path: path }); + } + }; + } return worker.port; } export class WorkerConnection { channel: BroadcastChannel; port: MessagePort | Promise; + workerPath: string; constructor(workerPath?: string) { this.channel = new BroadcastChannel("bare-mux"); + this.createChannel(workerPath, true); + } + + createChannel(workerPath?: string, inInit?: boolean) { // @ts-expect-error if (self.clients) { // running in a ServiceWorker @@ -101,14 +108,14 @@ export class WorkerConnection { // create the SharedWorker and help other bare-mux clients get the workerPath if (!workerPath.startsWith("/") && !workerPath.includes("://")) throw new Error("Invalid URL. Must be absolute or start at the root."); - this.port = createPort(workerPath, this.channel); + this.port = createPort(workerPath, this.channel, inInit); } else if (SharedWorker) { // running in a window, was not passed a workerPath // ask other bare-mux clients for the workerPath this.port = new Promise(resolve => { this.channel.onmessage = (event: MessageEvent) => { if (event.data.type === "path") { - resolve(createPort(event.data.path, this.channel)); + resolve(createPort(event.data.path, this.channel, inInit)); } } this.channel.postMessage({ type: "getPath" }); @@ -121,12 +128,29 @@ export class WorkerConnection { async sendMessage(message: WorkerMessage, transferable?: Transferable[]): Promise { if (this.port instanceof Promise) this.port = await this.port; - let channel = new MessageChannel(); - let toTransfer: Transferable[] = [channel.port2, ...(transferable || [])]; - this.port.postMessage({ message: message, port: channel.port2 }, toTransfer); + const pingChannel = new MessageChannel(); + const pingPromise: Promise = new Promise((resolve, reject) => { + pingChannel.port1.onmessage = event => { + if (event.data.type === "pong") { + resolve(); + } + }; + setTimeout(reject, 1500); + }); + this.port.postMessage({ message: { type: "ping" }, port: pingChannel.port2 }, [pingChannel.port2]); + try { + await pingPromise; + } catch { + console.warn("Failed to get a ping response from the worker within 1.5s. Assuming port is dead."); + this.createChannel(); + return await this.sendMessage(message, transferable); + } - return await new Promise((resolve, reject) => { + const channel = new MessageChannel(); + const toTransfer: Transferable[] = [channel.port2, ...(transferable || [])]; + + const promise: Promise = new Promise((resolve, reject) => { channel.port1.onmessage = event => { const message = event.data; if (message.type === "error") { @@ -136,5 +160,8 @@ export class WorkerConnection { } } }); + this.port.postMessage({ message: message, port: channel.port2 }, toTransfer); + + return await promise; } } diff --git a/src/worker.ts b/src/worker.ts index 816a896..c9fa6f1 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -18,8 +18,9 @@ function handleConnection(port: MessagePort) { port.onmessage = async (event: MessageEvent) => { const port = event.data.port; const message: WorkerMessage = event.data.message; - - if (message.type === "set") { + if (message.type === "ping") { + port.postMessage({ type: "pong" }); + } else if (message.type === "set") { try { const AsyncFunction = (async function() { }).constructor; @@ -32,6 +33,7 @@ function handleConnection(port: MessagePort) { port.postMessage({ type: "set" }); } catch (err) { + console.error(err); port.postMessage({ type: "error", error: err }); } } else if (message.type === "get") { @@ -55,6 +57,7 @@ function handleConnection(port: MessagePort) { port.postMessage({ type: "fetch", fetch: resp }); } } catch (err) { + console.error(err); port.postMessage({ type: "error", error: err }); } } else if (message.type === "websocket") { @@ -98,6 +101,7 @@ function handleConnection(port: MessagePort) { port.postMessage({ type: "websocket" }); } catch (err) { + console.error(err); port.postMessage({ type: "error", error: err }); } }