From 85ec94e36cf5fbab9318fe006f91788cd7d8e9a6 Mon Sep 17 00:00:00 2001 From: rustdesk Date: Wed, 19 Jan 2022 22:26:23 +0800 Subject: [PATCH] ws async works --- src/websock.ts | 135 +++++++++++++++++++++++++++++++++++-------------- 1 file changed, 97 insertions(+), 38 deletions(-) diff --git a/src/websock.ts b/src/websock.ts index 052adf9b2..acd9c3b58 100644 --- a/src/websock.ts +++ b/src/websock.ts @@ -7,55 +7,115 @@ type Keys = "message" | "open" | "close" | "error"; export default class Websock { _websocket: WebSocket; _eventHandlers: { [key in Keys]: Function }; + _buf: Uint8Array[]; + _status: any; - send_message(msg: message.Message) { - this._websocket.send(message.Message.encode(msg).finish()); + constructor(uri: string) { + this._eventHandlers = { + message: (_: any) => {}, + open: () => {}, + close: () => {}, + error: () => {}, + }; + this._status = ""; + this._buf = []; + this._websocket = new WebSocket(uri); + this._websocket.onmessage = this._recv_message.bind(this); + this._websocket.binaryType = "arraybuffer"; } - send_rendezvous(msg: rendezvous.RendezvousMessage) { - this._websocket.send(rendezvous.RendezvousMessage.encode(msg).finish()); + sendMessage(data: any) { + this._websocket.send( + message.Message.encode(message.Message.fromJSON(data)).finish() + ); + } + + sendRendezvous(data: any) { + this._websocket.send( + rendezvous.RendezvousMessage.encode( + rendezvous.RendezvousMessage.fromJSON(data) + ).finish() + ); + } + + parseMessage(data: Uint8Array) { + return message.Message.decode(data); + } + + parseRendezvous(data: Uint8Array) { + return rendezvous.RendezvousMessage.decode(data); } // Event Handlers off(evt: Keys) { - this._eventHandlers[evt] = () => { }; + this._eventHandlers[evt] = () => {}; } on(evt: Keys, handler: Function) { this._eventHandlers[evt] = handler; } - constructor(uri: string, protocols: string) { - this._eventHandlers = { - message: (_: any) => { }, - open: () => { }, - close: () => { }, - error: () => { }, - }; + async open(timeout: number = 12000): Promise { + return new Promise((resolve, reject) => { + setTimeout(() => { + if (this._status != "open") { + reject(this._status || "timeout"); + } + }, timeout); + this._websocket.onopen = () => { + this._status = "open"; + console.debug(">> WebSock.onopen"); + if (this._websocket?.protocol) { + console.info( + "Server choose sub-protocol: " + this._websocket.protocol + ); + } - this._websocket = new WebSocket(uri, protocols); + this._eventHandlers.open(); + console.debug("<< WebSock.onopen"); + resolve(this); + }; + this._websocket.onclose = (e) => { + this._status = e; + console.debug(">> WebSock.onclose"); + this._eventHandlers.close(e); + console.debug("<< WebSock.onclose"); + reject(e); + }; + this._websocket.onerror = (e) => { + this._status = e; + console.debug(">> WebSock.onerror: " + e); + this._eventHandlers.error(e); + console.debug("<< WebSock.onerror: " + e); + reject(e); + }; + }); + } - this._websocket.onmessage = this._recv_message.bind(this); - this._websocket.binaryType = "arraybuffer"; - this._websocket.onopen = () => { - console.debug(">> WebSock.onopen"); - if (this._websocket.protocol) { - console.info("Server choose sub-protocol: " + this._websocket.protocol); + async next(timeout = 12000): Promise { + let func = ( + resolve: (value: Uint8Array) => void, + reject: (reason: any) => void, + tm0: number + ) => { + if (this._buf.length) { + resolve(this._buf[0]); + this._buf.splice(0, 1); + } else { + if (this._status != 'open') { + reject(this._status); + return; + } + if (new Date().getTime() > tm0 + timeout) { + reject("timeout"); + } else { + setTimeout(() => func(resolve, reject, tm0), 1); + } } - - this._eventHandlers.open(); - console.debug("<< WebSock.onopen"); - }; - this._websocket.onclose = (e) => { - console.debug(">> WebSock.onclose"); - this._eventHandlers.close(e); - console.debug("<< WebSock.onclose"); - }; - this._websocket.onerror = (e) => { - console.debug(">> WebSock.onerror: " + e); - this._eventHandlers.error(e); - console.debug("<< WebSock.onerror: " + e); }; + return new Promise((resolve, reject) => { + func(resolve, reject, new Date().getTime()); + }); } close() { @@ -68,13 +128,14 @@ export default class Websock { this._websocket.close(); } - this._websocket.onmessage = () => { }; + this._websocket.onmessage = () => {}; } } _recv_message(e: any) { if (e.data instanceof window.ArrayBuffer) { let bytes = new Uint8Array(e.data); + this._buf.push(bytes); } this._eventHandlers.message(e.data); } @@ -86,14 +147,12 @@ export default class Websock { } } -/* -let ws = new Websock('ws://207.148.17.15:21118'); +let ws = new Websock("ws://207.148.17.15:21118"); await ws.open(); console.log("ws connected"); // let punchHole = rendezvous.PunchHoleRequest.fromJSON({ id: '' }); // ws.send_rendezvous(rendezvous.RendezvousMessage.fromJSON({ punchHole })); let testNatRequest = rendezvous.TestNatRequest.fromJSON({ serial: 0 }); -ws.send_rendezvous(rendezvous.RendezvousMessage.fromJSON({ testNatRequest })); -let msg = await ws.next(); +ws.sendRendezvous({ testNatRequest }); +let msg = ws.parseRendezvous(await ws.next()); console.log(msg); -*/ \ No newline at end of file