refractor websocket again

This commit is contained in:
ading2210 2024-03-06 15:53:11 -05:00
parent c681e21777
commit c9236f90d5
9 changed files with 233 additions and 208 deletions

View file

@ -1,112 +0,0 @@
//class for custom websocket
//multiple classes attempt to replicate the websocket API
//so this prevents code duplication
class CustomWebSocket extends EventTarget {
constructor(url, protocols=[]) {
super();
this.url = url;
this.protocols = protocols;
this.binaryType = "blob";
//legacy event handlers
this.onopen = () => {};
this.onerror = () => {};
this.onmessage = () => {};
this.onclose = () => {};
this.CONNECTING = 0;
this.OPEN = 1;
this.CLOSING = 2;
this.CLOSED = 3;
this.status = this.CONNECTING;
}
custom_recv() {}
recv() {
let {success, data, is_text} = this.custom_recv();
if (!success) return;
let converted;
if (is_text) {
converted = new TextDecoder().decode(data);
}
else {
if (this.binaryType == "blob") {
converted = new Blob(data);
}
else if (this.binaryType == "arraybuffer") {
converted = data.buffer;
}
else {
throw "invalid binaryType string";
}
}
let msg_event = new MessageEvent("message", {data: converted});
this.onmessage(msg_event);
this.dispatchEvent(msg_event);
}
recv_loop() {
this.event_loop = setInterval(() => {
this.recv();
}, 0);
}
close_callback(error) {
if (this.status == this.CLOSED) return;
this.status = this.CLOSED;
if (error) {
let error_event = new Event("error");
this.dispatchEvent(error_event);
this.onerror(error_event);
}
else {
let close_event = new CloseEvent("close");
this.dispatchEvent(close_event);
this.onclose(close_event);
}
}
open_callback() {
this.status = this.OPEN;
let open_event = new Event("open");
this.onopen(open_event);
this.dispatchEvent(open_event);
}
send(data) {
let is_text = typeof data === "string";
if (this.status === this.CONNECTING) {
throw new DOMException("ws not ready yet");
}
if (this.status === this.CLOSED) {
return;
}
let data_array = any_to_array(data);
this.custom_send(data_array, is_text);
}
close() {
this.status = this.CLOSING;
this.custom_close();
}
get readyState() {
return this.status;
}
get bufferedAmount() {
return 0;
}
get protocol() {
return this.protocols;
}
get extensions() {
return "";
}
}

View file

@ -0,0 +1,18 @@
function logger(type, text) {
if (type === "log")
console.log(text);
else if (type === "warn")
console.warn(text);
else if (type === "error")
console.error(text);
}
function log_msg(text) {
logger("log", text);
}
function warn_msg(text) {
logger("warn", text);
}
function error_msg(text) {
logger("error", text);
}

View file

@ -141,51 +141,11 @@ function create_response(response_data, response_info) {
return response_obj;
}
async function parse_body(data) {
let data_array = null;
if (typeof data === "string") {
data_array = new TextEncoder().encode(data);
}
else if (data instanceof Blob) {
let array_buffer = await data.arrayBuffer();
data_array = new Uint8Array(array_buffer);
}
//any typedarray
else if (data instanceof ArrayBuffer) {
//dataview objects
if (ArrayBuffer.isView(data) && data instanceof DataView) {
data_array = new Uint8Array(data.buffer);
}
//regular typed arrays
else if (ArrayBuffer.isView(data)) {
data_array = Uint8Array.from(data);
}
//regular arraybuffers
else {
data_array = new Uint8Array(data);
}
}
else if (data instanceof ReadableStream) {
let chunks = [];
for await (let chunk of data) {
chunks.push(chunk);
}
data_array = merge_arrays(chunks);
}
else {
throw "invalid data type to be sent";
}
return data_array;
}
async function create_options(params) {
let body = null;
if (params.body) {
body = await parse_body(params.body);
body = await data_to_array(params.body);
params.body = true;
}
@ -279,7 +239,8 @@ api = {
fetch: libcurl_fetch,
set_websocket: set_websocket_url,
load_wasm: load_wasm,
WebSocket: CurlWebSocket,
WebSocket: FakeWebSocket,
CurlWebSocket: CurlWebSocket,
TLSSocket: TLSSocket,
get_cacert: get_cacert,
@ -295,6 +256,8 @@ api = {
set stdout(callback) {out = callback},
get stderr() {return err},
set stderr(callback) {err = callback},
get logger() {return logger},
set logger(func) {logger = func},
onload() {}
};

View file

@ -1,3 +1,5 @@
//currently broken
class TLSSocket extends CustomWebSocket {
constructor(hostname, port, debug) {
super();

View file

@ -35,36 +35,43 @@ function allocate_array(array) {
}
//convert any data to a uint8array
function any_to_array(data) {
let data_array;
async function data_to_array(data) {
let data_array = null;
if (typeof data === "string") {
data_array = new TextEncoder().encode(data);
}
else if (data instanceof Blob) {
data.arrayBuffer().then(array_buffer => {
data_array = new Uint8Array(array_buffer);
this.send(data_array);
});
return;
let array_buffer = await data.arrayBuffer();
data_array = new Uint8Array(array_buffer);
}
//any typedarray
else if (data instanceof ArrayBuffer) {
//dataview objects
if (ArrayBuffer.isView(data) && data instanceof DataView) {
data_array = new Uint8Array(data.buffer);
}
//regular typed arrays
else if (ArrayBuffer.isView(data)) {
data_array = Uint8Array.from(data);
}
//regular arraybuffers
else {
data_array = new Uint8Array(data);
}
}
//regular typed arrays
else if (ArrayBuffer.isView(data)) {
data_array = Uint8Array.from(data);
}
else {
throw "invalid data type";
else if (data instanceof ReadableStream) {
let chunks = [];
for await (let chunk of data) {
chunks.push(chunk);
}
data_array = merge_arrays(chunks);
}
else {
throw "invalid data type to be sent";
}
return data_array;
}
}

View file

@ -1,13 +1,21 @@
class CurlWebSocket extends CustomWebSocket {
constructor(url, protocols=[], debug=false) {
super(url, protocols);
class CurlWebSocket {
constructor(url, protocols=[], options={}) {
check_loaded(true);
if (!url.startsWith("wss://") && !url.startsWith("ws://")) {
throw new SyntaxError("invalid url");
}
this.url = url;
this.protocols = protocols;
this.debug = debug;
this.options = options;
this.onopen = () => {};
this.onerror = () => {};
this.onmessage = () => {};
this.onclose = () => {};
this.connected = false;
this.event_loop = null;
this.recv_buffer = [];
this.connect();
@ -17,87 +25,108 @@ class CurlWebSocket extends CustomWebSocket {
let data_callback = () => {};
let finish_callback = (error, response_info) => {
if (error === 0) {
this.status = this.OPEN;
this.open_callback();
this.recv_loop();
this.connected = true;
this.event_loop = setInterval(() => {
let data = this.recv();
if (data !== null) this.onmessage(data);
}, 0);
this.onopen();
}
else {
this.status = this.CLOSED;
this.cleanup(error);
}
}
let options = {};
let request_options = {
headers: this.options.headers || {}
};
if (this.protocols) {
options.headers = {
"Sec-Websocket-Protocol": this.protocols.join(", "),
};
request_options.headers["Sec-Websocket-Protocol"] = this.protocols.join(", ");
}
if (this.debug) {
options._libcurl_verbose = 1;
if (this.options.verbose) {
request_options._libcurl_verbose = 1;
}
this.http_handle = perform_request(this.url, options, data_callback, finish_callback, null);
this.http_handle = perform_request(this.url, request_options, data_callback, finish_callback, null);
}
custom_recv() {
recv() {
let buffer_size = 64*1024;
let result_ptr = _recv_from_websocket(this.http_handle, buffer_size);
let data_ptr = _get_result_buffer(result_ptr);
let result_code = _get_result_code(result_ptr);
let returned_data = null;
if (result_code == 0) { //CURLE_OK - data received
function free_result() {
_free(data_ptr);
_free(result_ptr);
}
console.log(result_code);
if (result_code === 0) { //CURLE_OK - data received
if (_get_result_closed(result_ptr)) {
_free(data_ptr);
_free(result_ptr);
free_result();
this.cleanup();
return;
return returned_data;
}
let data_size = _get_result_size(result_ptr);
let data_heap = Module.HEAPU8.subarray(data_ptr, data_ptr + data_size);
let data = new Uint8Array(data_heap);
console.log(data, data_size, buffer_size, _get_result_bytes_left(result_ptr));
this.recv_buffer.push(data);
if (data_size !== buffer_size && !_get_result_bytes_left(result_ptr)) { //message finished
let full_data = merge_arrays(this.recv_buffer);
let is_text = _get_result_is_text(result_ptr)
this.recv_buffer = [];
return {
success: true,
data: full_data,
is_text: is_text
if (is_text) {
returned_data = new TextDecoder().decode(full_data);
}
else {
returned_data = full_data;
}
}
}
if (result_code == 52) { //CURLE_GOT_NOTHING - socket closed
//CURLE_GOT_NOTHING, CURLE_RECV_ERROR, CURLE_SEND_ERROR - socket closed
else if (result_code === 52 || result_code === 55 || result_code === 56) {
this.cleanup();
}
_free(data_ptr);
_free(result_ptr);
return {
success: false,
data: null,
is_text: false
}
free_result();
return returned_data;
}
cleanup(error=false) {
if (this.http_handle) _cleanup_handle(this.http_handle);
clearInterval(this.event_loop);
this.close_callback(error);
this.connected = false;
if (error) {
this.onerror(error);
}
else {
this.onclose();
}
}
custom_send(data_array, is_text) {
let data_ptr = allocate_array(data_array);
let data_len = data_array.length;
send(data) {
let is_text = typeof data === "string";
if (!this.connected) {
throw new DOMException("websocket not connected");
}
if (is_text) {
data = new TextEncoder().encode(data);
}
let data_ptr = allocate_array(data);
let data_len = data.length;
_send_to_websocket(this.http_handle, data_ptr, data_len, is_text);
_free(data_ptr);
}
custom_close() {
close() {
this.cleanup();
this.status = this.CLOSED;
}
}

View file

@ -0,0 +1,113 @@
//class for websocket polyfill
class FakeWebSocket extends EventTarget {
constructor(url, protocols=[], options={}) {
super();
this.url = url;
this.protocols = protocols;
this.options = options;
this.binaryType = "blob";
//legacy event handlers
this.onopen = () => {};
this.onerror = () => {};
this.onmessage = () => {};
this.onclose = () => {};
this.CONNECTING = 0;
this.OPEN = 1;
this.CLOSING = 2;
this.CLOSED = 3;
this.status = this.CONNECTING;
this.socket = null;
this.connect();
}
connect() {
this.socket = new CurlWebSocket(this.url, this.protocols, this.options);
this.socket.onopen = () => {
this.status = this.OPEN;
let open_event = new Event("open");
this.onopen(open_event);
this.dispatchEvent(open_event);
}
this.socket.onclose = () => {
this.status = this.CLOSED;
let close_event = new CloseEvent("close");
this.dispatchEvent(close_event);
this.onclose(close_event);
};
this.socket.onerror = (error) => {
this.status = this.CLOSED;
console.error(`websocket ${this.url} encountered an error (${error})`);
let error_event = new Event("error");
this.dispatchEvent(error_event);
this.onerror(error_event);
}
this.socket.onmessage = (data) => {
let converted;
if (typeof data === "string") {
converted = data;
}
else { //binary frame received as uint8array
if (this.binaryType == "blob") {
converted = new Blob(data);
}
else if (this.binaryType == "arraybuffer") {
converted = data.buffer;
}
else {
throw "invalid binaryType string";
}
}
let msg_event = new MessageEvent("message", {data: converted});
this.onmessage(msg_event);
this.dispatchEvent(msg_event);
}
}
send(data) {
let is_text = typeof data === "string";
if (this.status === this.CONNECTING) {
throw new DOMException("websocket not ready yet");
}
if (this.status === this.CLOSED) {
return;
}
(async () => {
if (is_text) {
this.socket.send(data);
}
else {
let data_array = await data_to_array(data);
this.send(data_array);
}
})();
}
close() {
this.status = this.CLOSING;
this.socket.close();
}
get readyState() {
return this.status;
}
get bufferedAmount() {
return 0;
}
get protocol() {
return this.protocols[0] || "";
}
get extensions() {
return "";
}
}