websocket progress

This commit is contained in:
Spencer Pogorzelski 2023-08-13 22:40:50 -07:00
parent cf5d378147
commit e5d51e0868
4 changed files with 74 additions and 8 deletions

View file

@ -67,6 +67,7 @@ export class AdriftBareClient extends Client {
webSocketImpl: WebSocketImpl webSocketImpl: WebSocketImpl
): WebSocket { ): WebSocket {
const ws = new webSocketImpl("ws:null", protocols); const ws = new webSocketImpl("ws:null", protocols);
const closeEmitter = new EventTarget();
// this will error. that's okay // this will error. that's okay
let { send, close } = this.connection.wsconnect( let { send, close } = this.connection.wsconnect(
@ -77,7 +78,7 @@ export class AdriftBareClient extends Client {
}, },
() => { () => {
onReadyState(WebSocket.CLOSED); onReadyState(WebSocket.CLOSED);
ws.dispatchEvent(new Event("close")); closeEmitter.dispatchEvent(new Event("close"));
}, },
(data) => { (data) => {
ws.dispatchEvent( ws.dispatchEvent(
@ -101,6 +102,30 @@ export class AdriftBareClient extends Client {
close(code, reason); close(code, reason);
} }
); );
let onClose = ws.onclose;
(ws as any).__defineGetter__("onclose", () => onClose);
(ws as any).__defineSetter__("onclose", (newOnClose: any) => {
onClose = newOnClose;
});
closeEmitter.addEventListener("close", () => {
if (onClose) onClose.call(ws, new Event("close"));
});
const _addEventListener = ws.addEventListener.bind(ws);
(ws as any).addEventListener = (evt: string, cb: any, options: any) => {
if (evt == "close") {
closeEmitter.addEventListener("close", cb, options);
return;
}
_addEventListener(evt, cb, options);
};
const _removeEventListener = ws.removeEventListener.bind(ws);
(ws as any).removeEventListener = (evt: string, cb: any, options: any) => {
if (evt == "close") {
closeEmitter.removeEventListener("close", cb, options);
return;
}
_removeEventListener(evt, cb, options);
};
return ws; return ws;
} }

View file

@ -9,14 +9,13 @@ import {
Transport, Transport,
} from "protocol"; } from "protocol";
type OpenWSMeta = { onclose: () => void; onmessage: (data: any) => void };
export class Connection { export class Connection {
requestCallbacks: Record<number, Function> = {}; requestCallbacks: Record<number, Function> = {};
openRequestStreams: Record<number, ReadableStreamDefaultController<any>> = {}; openRequestStreams: Record<number, ReadableStreamDefaultController<any>> = {};
openingSockets: Record<number, () => void>; openingSockets: Record<number, { onopen: () => void; rest: OpenWSMeta }> = {};
openSockets: Record< openSockets: Record<number, OpenWSMeta> = {};
number,
{ onclose: () => void; onmessage: (data: any) => void }
>;
counter: number = 0; counter: number = 0;
@ -68,6 +67,16 @@ export class Connection {
this.openRequestStreams[requestID]?.close(); this.openRequestStreams[requestID]?.close();
delete this.openRequestStreams[requestID]; delete this.openRequestStreams[requestID];
break; break;
case S2CRequestTypes.WSOpen:
const { onopen, rest } = this.openingSockets[requestID];
delete this.openingSockets[requestID];
this.openSockets[requestID] = rest;
onopen();
break;
default:
break;
} }
} }
@ -126,7 +135,7 @@ export class Connection {
}); });
// this can't be async, just call onopen when opened // this can't be async, just call onopen when opened
this.openingSockets[seq] = onopen; this.openingSockets[seq] = { onopen, rest: { onmessage, onclose } };
return { return {
send: (data) => { send: (data) => {

View file

@ -157,6 +157,16 @@
} }
(window as any).bare = new BareClient(); (window as any).bare = new BareClient();
(window as any).myWsTest = () => {
const ws = ((window as any).ws = (
(window as any).bare as BareClient
).createWebSocket("wss://ws.postman-echo.com/raw", [], {}));
ws.onopen = () => console.log("onopen");
ws.addEventListener("open", () => console.log("open listener"));
ws.onclose = () => console.error(new Error("onclose"));
ws.addEventListener("close", () => console.log("close listener"));
ws.onmessage = console.log;
};
</script> </script>
{#if ready} {#if ready}

View file

@ -29,7 +29,7 @@ function bareErrorToResponse(e: BareError): {
export class AdriftServer { export class AdriftServer {
send: (msg: ArrayBuffer) => void; send: (msg: ArrayBuffer) => void;
sockets: Record<number, WebSocket>; sockets: Record<number, WebSocket> = {};
events: EventEmitter; events: EventEmitter;
constructor(send: (msg: ArrayBuffer) => void) { constructor(send: (msg: ArrayBuffer) => void) {
@ -240,6 +240,7 @@ export class AdriftServer {
const payload = AdriftServer.tryParseJSONPayload(msg.slice(cursor)); const payload = AdriftServer.tryParseJSONPayload(msg.slice(cursor));
const ws = (this.sockets[seq] = new WebSocket(payload.url)); const ws = (this.sockets[seq] = new WebSocket(payload.url));
ws.binaryType = "arraybuffer"; ws.binaryType = "arraybuffer";
// TODO v important: onerror
ws.onopen = () => { ws.onopen = () => {
this.sendWSOpen(seq); this.sendWSOpen(seq);
}; };
@ -282,6 +283,27 @@ export class AdriftServer {
break; break;
} }
case C2SRequestTypes.WSSendText: {
const socket = this.sockets[seq];
if (!socket) return;
socket.send(new TextDecoder().decode(msg.slice(cursor)));
break;
}
case C2SRequestTypes.WSSendBinary: {
const socket = this.sockets[seq];
if (!socket) return;
socket.send(msg.slice(cursor));
break;
}
case C2SRequestTypes.WSClose: {
const socket = this.sockets[seq];
if (!socket) return;
socket.close();
break;
}
default: default:
// not implemented // not implemented
break; break;