From d433f67d67ad335c79ccac8450fdb7b8a505e6a0 Mon Sep 17 00:00:00 2001 From: velzie Date: Tue, 16 Jul 2024 16:54:39 -0400 Subject: [PATCH] =?UTF-8?q?threads=20=F0=9F=9A=80=F0=9F=9A=80=F0=9F=9A=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rspack.config.js | 3 +- src/scramjet.config.ts | 1 + src/thread/thread.ts | 46 ++++++++++++++++++++ src/types.d.ts | 1 + src/worker/fetch.ts | 3 +- src/worker/index.ts | 8 ++-- src/worker/threadpool.ts | 92 ++++++++++++++++++++++++++++++++++++++++ static/ui.js | 51 +++++++++++++--------- 8 files changed, 179 insertions(+), 26 deletions(-) create mode 100644 src/thread/thread.ts create mode 100644 src/worker/threadpool.ts diff --git a/rspack.config.js b/rspack.config.js index 4d4f64b..56418e7 100644 --- a/rspack.config.js +++ b/rspack.config.js @@ -7,10 +7,11 @@ const __dirname = fileURLToPath(new URL(".", import.meta.url)); export default defineConfig({ // change to production when needed - mode: "production", + mode: "development", entry: { shared: join(__dirname, "src/shared/index.ts"), worker: join(__dirname, "src/worker/index.ts"), + thread: join(__dirname, "src/thread/thread.ts"), client: join(__dirname, "src/client/index.ts"), config: join(__dirname, "src/scramjet.config.ts"), codecs: join(__dirname, "src/codecs/index.ts"), diff --git a/src/scramjet.config.ts b/src/scramjet.config.ts index a5e2082..736622b 100644 --- a/src/scramjet.config.ts +++ b/src/scramjet.config.ts @@ -8,6 +8,7 @@ self.$scramjet.config = { config: "/scram/scramjet.config.js", shared: "/scram/scramjet.shared.js", worker: "/scram/scramjet.worker.js", + thread: "/scram/scramjet.thread.js", client: "/scram/scramjet.client.js", codecs: "/scram/scramjet.codecs.js", }; diff --git a/src/thread/thread.ts b/src/thread/thread.ts new file mode 100644 index 0000000..0010d5e --- /dev/null +++ b/src/thread/thread.ts @@ -0,0 +1,46 @@ +import { rewriteJs } from "../shared/rewriters/js"; + + + +// @ts-ignore +onconnect = (e) => { + const port = e.ports[0]; + + + console.log("thread: connected to port", port) + port.postMessage("ready"); + + let syncToken = 0; + port.onmessage = ({ data }) => { + console.log("thread: received message", data) + const [task, ...args] = data; + let token = syncToken++; + + try { + let res = tasks[task](...args); + console.log("thread: task", task, "completed with token", token) + port.postMessage({ + token, + result: res + }) + } catch (e) { + port.postMessage({ + token, + error: e.message + }) + } + + + port.postMessage("idle"); + } +} + + +const tasks = { + "rewriteJs": taskRewriteJs, +} + + +function taskRewriteJs(js: ArrayBuffer, origin: string): string { + return rewriteJs(js, new URL(origin)); +} diff --git a/src/types.d.ts b/src/types.d.ts index bea4255..8a16769 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -37,6 +37,7 @@ declare global { config: string; shared: string; worker: string; + thread: string; client: string; codecs: string; }; diff --git a/src/worker/fetch.ts b/src/worker/fetch.ts index d19d8c4..b943eab 100644 --- a/src/worker/fetch.ts +++ b/src/worker/fetch.ts @@ -115,8 +115,7 @@ export async function swfetch(this: ScramjetServiceWorker, { request }: FetchEve } break; case "script": - // responseBody = rewriteJs(await response.text(), url); - responseBody = rewriteJs(await response.text()); + responseBody = await this.threadpool.rewriteJs(await response.arrayBuffer(), url.toString()); break; case "style": responseBody = rewriteCss(await response.text(), url); diff --git a/src/worker/index.ts b/src/worker/index.ts index 1c471e2..daa1422 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -1,20 +1,24 @@ import { swfetch } from "./fetch"; +import { ScramjetThreadpool } from "./threadpool"; declare global { interface Window { ScramjetServiceWorker; - ScramjetThread; } } export class ScramjetServiceWorker { client: typeof self.$scramjet.shared.util.BareClient.prototype; config: typeof self.$scramjet.config; + threadpool: ScramjetThreadpool; constructor(config = self.$scramjet.config) { this.client = new self.$scramjet.shared.util.BareClient(); if (!config.prefix) config.prefix = "/scramjet/"; this.config = config; + + this.threadpool = new ScramjetThreadpool(); + } route({ request }: FetchEvent) { @@ -27,6 +31,4 @@ export class ScramjetServiceWorker { }; - - self.ScramjetServiceWorker = ScramjetServiceWorker; diff --git a/src/worker/threadpool.ts b/src/worker/threadpool.ts new file mode 100644 index 0000000..b71cb63 --- /dev/null +++ b/src/worker/threadpool.ts @@ -0,0 +1,92 @@ + +type Thread = { + handle: MessagePort; + ready: boolean; + busy: boolean; + syncToken: number; + promises: Map; +} + +export class ScramjetThreadpool { + threads: Thread[] = []; + constructor() { + self.addEventListener("message", ({ data }) => { + if (data.scramjet$type == "add") { + this.spawn(data.handle); + } + }); + } + + spawn(handle) { + const thread = { + handle, + ready: false, + busy: false, + syncToken: 0, + promises: new Map() + } + + this.threads.push(thread); + + thread.handle.onmessage = (e) => { + if (e.data === "ready") { + thread.ready = true; + return; + } + if (e.data === "idle") { + thread.busy = false; + return; + } + + const { token, result, error } = e.data; + const { resolve, reject } = thread.promises.get(token); + thread.promises.delete(token); + + if (error) { + reject(error); + } else { + resolve(result); + } + } + + thread.handle.start(); + } + + pick(): Thread | undefined { + const alive = this.threads.filter(t => t.ready); + const idle = alive.filter(t => !t.busy); + + // no threads + if (!alive.length) return; + + // there is a thread, but it's busy + if (!idle.length) return alive[Math.floor(Math.random() * alive.length)]; + + // there's an open thread + return idle[Math.floor(Math.random() * idle.length)]; + } + + run(task: string, args: any[], transferrable: any[]): Promise { + const thread = this.pick(); + if (!thread) throw new Error("No threads available"); + thread.busy = true; + + + let token = thread.syncToken++; + + // console.log("runthread: dispatching task", task, "to thread", thread, "of token", token) + return new Promise((resolve, reject) => { + thread.promises.set(token, { resolve, reject }); + + thread.handle.postMessage([ + task, + ...args + ], transferrable); + }); + } + + + async rewriteJs(js: ArrayBuffer, origin: string): Promise { + return await this.run("rewriteJs", [js, origin], [js]); + } +} diff --git a/static/ui.js b/static/ui.js index 3db5da0..07bce98 100644 --- a/static/ui.js +++ b/static/ui.js @@ -1,10 +1,21 @@ navigator.serviceWorker - .register("./sw.js", { - scope: $scramjet.config.prefix, - }) - .then((reg) => { - reg.update(); - }); + .register("./sw.js") + .then((reg) => { + reg.update(); + }); + +navigator.serviceWorker.ready.then((reg) => { + for (let i = 0; i < 20; i++) { + const thread = new SharedWorker($scramjet.config.thread, { name: "thread" + i }); + + reg.active.postMessage({ + scramjet$type: "add", + handle: thread.port + }, [thread.port]); + + } +}); + const connection = new BareMux.BareMuxConnection("/baremux/worker.js"); const flex = css` display: flex; @@ -13,21 +24,21 @@ const col = css` flex-direction: column; `; const store = $store( - { - url: "https://google.com", - wispurl: "wss://wisp.mercurywork.shop/", - bareurl: - (location.protocol === "https:" ? "https" : "http") + - "://" + - location.host + - "/bare/", - }, - { ident: "settings", backing: "localstorage", autosave: "auto" } + { + url: "https://google.com", + wispurl: "wss://wisp.mercurywork.shop/", + bareurl: + (location.protocol === "https:" ? "https" : "http") + + "://" + + location.host + + "/bare/", + }, + { ident: "settings", backing: "localstorage", autosave: "auto" } ); connection.setTransport("/baremod/index.mjs", [store.bareurl]); function App() { - this.urlencoded = ""; - this.css = ` + this.urlencoded = ""; + this.css = ` width: 100%; height: 100%; color: #e0def4; @@ -86,7 +97,7 @@ function App() { } `; - return html` + return html`

Percury Unblocker

surf the unblocked and mostly buggy web

@@ -110,5 +121,5 @@ function App() { } window.addEventListener("load", () => { - document.body.appendChild(h(App)); + document.body.appendChild(h(App)); });