diff --git a/client/src/AdriftClient.ts b/client/src/AdriftClient.ts index ccb418f..16d0dde 100644 --- a/client/src/AdriftClient.ts +++ b/client/src/AdriftClient.ts @@ -93,11 +93,13 @@ export class AdriftBareClient extends Client { data, }) ); + }, + (message: string) => { + ws.dispatchEvent(new ErrorEvent("error", { message })); } ); ws.send = (data: any) => { - console.log("Reached AdriftClient.ts send"); send(data); }; diff --git a/client/src/Connection.ts b/client/src/Connection.ts index 876d2b6..1219528 100644 --- a/client/src/Connection.ts +++ b/client/src/Connection.ts @@ -8,11 +8,13 @@ import { S2CRequestTypes, Transport, WSClosePayload, + WSErrorPayload, } from "protocol"; type OpenWSMeta = { onclose: (code: number, reason: string, wasClean: boolean) => void; onmessage: (data: any) => void; + onerror: (message: string) => void; }; export class Connection { @@ -42,10 +44,12 @@ export class Connection { console.log(requestID, requestType); + const msgText = () => new TextDecoder().decode(data.slice(cursor)); + const msgJSON = () => JSON.parse(msgText()); + switch (requestType) { case S2CRequestTypes.HTTPResponseStart: - const decoder = new TextDecoder(); - const payload = JSON.parse(decoder.decode(data.slice(cursor))); + const payload = msgJSON(); const stream = new ReadableStream({ start: (controller) => { this.openRequestStreams[requestID] = controller; @@ -82,32 +86,41 @@ export class Connection { case S2CRequestTypes.WSDataText: { const socketMeta = this.openSockets[requestID]; if (!socketMeta) return; - socketMeta.onmessage(new TextDecoder().decode(data.slice(cursor))); + setTimeout(() => socketMeta.onmessage(msgText())); break; } case S2CRequestTypes.WSDataBinary: { const socketMeta = this.openSockets[requestID]; if (!socketMeta) return; - socketMeta.onmessage(data.slice(cursor)); + setTimeout(() => socketMeta.onmessage(data.slice(cursor))); break; } case S2CRequestTypes.WSClose: { const socketMeta = this.openSockets[requestID]; if (!socketMeta) return; - const payload: WSClosePayload = JSON.parse( - new TextDecoder().decode(data.slice(cursor)) - ); - socketMeta.onclose( - payload.code || 1005, - payload.reason || "", - "wasClean" in payload ? Boolean(payload.wasClean) : false + const payload: WSClosePayload = msgJSON(); + setTimeout(() => + socketMeta.onclose( + payload.code || 1005, + payload.reason || "", + "wasClean" in payload ? Boolean(payload.wasClean) : false + ) ); delete this.openSockets[requestID]; break; } + case S2CRequestTypes.WSError: { + const socketMeta = this.openSockets[requestID]; + if (!socketMeta) return; + const payload: WSErrorPayload = msgJSON(); + setTimeout(() => socketMeta.onerror(payload.message)); + setTimeout(() => socketMeta.onclose(1006, "", false)); + break; + } + default: break; } @@ -150,7 +163,8 @@ export class Connection { url: URL, onopen: () => void, onclose: (code: number, reason: string, wasClean: boolean) => void, - onmessage: (data: any) => void + onmessage: (data: any) => void, + onerror: (message: string) => void ): { send: (data: any) => void; close: (code?: number, reason?: string) => void; @@ -171,7 +185,10 @@ export class Connection { }); // this can't be async, just call onopen when opened - this.openingSockets[seq] = { onopen, rest: { onmessage, onclose } }; + this.openingSockets[seq] = { + onopen, + rest: { onmessage, onclose, onerror }, + }; return { send: (data) => { diff --git a/protocol/src/index.ts b/protocol/src/index.ts index 2bd4ac8..805fc8d 100644 --- a/protocol/src/index.ts +++ b/protocol/src/index.ts @@ -16,6 +16,7 @@ export const S2CRequestTypes = { WSDataText: 4, WSDataBinary: 5, WSClose: 6, + WSError: 7, } as const; export type S2CRequestType = ObjectValues; @@ -44,4 +45,8 @@ export type WSClosePayload = { wasClean: boolean; }; +export type WSErrorPayload = { + message: string; +}; + export { Transport } from "./Transport"; diff --git a/server/src/server.ts b/server/src/server.ts index 68ddc5e..39b74e4 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -9,6 +9,7 @@ import { S2CRequestType, S2CRequestTypes, WSClosePayload, + WSErrorPayload, } from "protocol"; import { Readable } from "stream"; import { BareError, bareFetch, options } from "./http"; @@ -164,6 +165,10 @@ export class AdriftServer { this._sendJSONRes(seq, S2CRequestTypes.WSClose, payload); } + sendWSError(seq: number, payload: WSErrorPayload) { + this._sendJSONRes(seq, S2CRequestTypes.WSError, payload); + } + sendWSText(seq: number, textEncoded: ArrayBuffer) { const buf = new ArrayBuffer(2 + 1 + textEncoded.byteLength); const dataView = new DataView(buf); @@ -240,10 +245,9 @@ export class AdriftServer { const payload = AdriftServer.tryParseJSONPayload(msg.slice(cursor)); const ws = (this.sockets[seq] = new WebSocket(payload.url)); ws.binaryType = "arraybuffer"; - // TODO v important: onerror ws.onerror = (e) => { - console.log("ws onerror", e); - this.sendWSClose(seq, { code: 1006, reason: "", wasClean: false }); + // WSError implies close with code 1006, reason "" and wasClean false + this.sendWSError(seq, { message: e.message }); }; ws.onopen = () => { this.sendWSOpen(seq);