From 173227685e37f892864df29aa02e22797f11dece Mon Sep 17 00:00:00 2001 From: sbosse Date: Mon, 21 Jul 2025 23:02:39 +0200 Subject: [PATCH] Mon 21 Jul 22:43:21 CEST 2025 --- .../core/examples/vncviewer/unpackstream.js | 418 ++++++++++++++++++ 1 file changed, 418 insertions(+) create mode 100644 js/x11/core/examples/vncviewer/unpackstream.js diff --git a/js/x11/core/examples/vncviewer/unpackstream.js b/js/x11/core/examples/vncviewer/unpackstream.js new file mode 100644 index 0000000..807b27b --- /dev/null +++ b/js/x11/core/examples/vncviewer/unpackstream.js @@ -0,0 +1,418 @@ +//var clog = clog; +var clog = function() {}; + +var EventEmitter = require('events').EventEmitter; +var util = require('util'); + +var argument_length = {}; +argument_length.C = 1; +argument_length.S = 2; +argument_length.s = 2; +argument_length.L = 4; +argument_length.l = 4; +argument_length.x = 1; + +function ReadFormatRequest(format, callback) +{ + this.format = format; + this.current_arg = 0; + this.data = []; + this.callback = callback; +} + +/* +function ReadBuflist(length, callback) +{ + this.length = length + this.callback = callback; + +} +*/ + +function ReadFixedRequest(length, callback) +{ + this.length = length; + this.callback = callback; + //clog(length); + this.data = new Buffer(length); + this.received_bytes = 0; +} + +ReadFixedRequest.prototype.execute = function(bufferlist) +{ + + // TODO: this is a brute force version + // replace with Buffer.slice calls + + + // bufferlist: + // offset + // readlist: + // [ b1 b2 b3 b4 b5 ] + + + var to_receive = this.length - this.received_bytes; + //clog([bufferlist.offset, bufferlist.length, to_receive]); + + var buffs = bufferlist.readlist; + var off = bufferlist.offset; + if (buffs.length == 0) + return false; + + var curbuff = buffs[0]; + + // first buffer is bigger than request + if (curbuff.length - bufferlist.offset >= to_receive) + { + // copy(targetBuffer, targetStart=0, sourceStart=0, sourceEnd=buffer.length) + curbuff.copy(this.data, this.received_bytes, off, off+to_receive); + bufferlist.offset += to_receive; + this.received_bytes += to_receive; + bufferlist.length -= to_receive; + + if (bufferlist.offset == curbuff.length) + { + bufferlist.readlist.shift(); + bufferlist.offset = 0; + } + + //clog([bufferlist.readlist.length, bufferlist.offset, bufferlist.length, to_receive]); + this.callback(this.data); + return true; + } + + + + // while (buffs.length > 0) + // { + // } + + + + + + + + + if (0)//bufferlist.readlist.length == 1) + { + var to_receive = this.length - this.received_bytes; + var buff = bufferlist.readlist[0]; + if ( (buff.length-bufferlist.offset) >= to_receive){ + clog(["using Buffer.copy", buff.length]); + buff.copy(this.data, to_receive, bufferlist.offset, bufferlist.offset + to_receive); + bufferlist.length -= to_receive; + return false; + } + //var to_receive = this.length - this.received_bytes; + //clog([bufferlist.readlist.length, bufferlist.offset, bufferlist.length, to_receive]); + + } + //clog([bufferlist.readlist.length, bufferlist.offset, bufferlist.length, to_receive]); + //clog(["byte by byte copy", bufferlist.length]); + var to_receive = this.length - this.received_bytes; + clog([bufferlist.readlist.length, bufferlist.offset, bufferlist.length, to_receive]); + for(var i=0 ; i < to_receive; ++i) + { + if (bufferlist.length == 0) + return false; + this.data[this.received_bytes++] = bufferlist.getbyte(); + } + this.callback(this.data); + return true; +} + +ReadFormatRequest.prototype.execute = function(bufferlist) +{ + while (this.current_arg < this.format.length) + { + var arg = this.format[this.current_arg]; + if (bufferlist.length < argument_length[arg]) + return false; // need to wait for more data to prcess this argument + + // TODO: measure Buffer.readIntXXX performance and use them if faster + // note: 4 and 2-byte values may cross chunk border & split. need to handle this correctly + // maybe best approach is to wait all data required for format and then process fixed buffer + // TODO: byte order!!! + switch (arg) { + case 'C': + this.data.push(bufferlist.getbyte()); + break; + case 'S': + case 's': + var b1 = bufferlist.getbyte(); + var b2 = bufferlist.getbyte(); + if (bufferlist.serverBigEndian) + this.data.push(b2*256+b1); + else + this.data.push(b1*256+b2); + break; + case 'l': + case 'L': + var b1 = bufferlist.getbyte(); + var b2 = bufferlist.getbyte(); + var b3 = bufferlist.getbyte(); + var b4 = bufferlist.getbyte(); + var res; + if (bufferlist.serverBigEndian) + res = (((b4*256+b3)*256 + b2)*256 + b1); + else + res = (((b1*256+b2)*256 + b3)*256 + b4); + + if (arg == 'l') { + var neg = res & 0x80000000; + if (!neg) { + this.data.push(res); + } else + this.data.push((0xffffffff - res + 1) * - 1); + } else + this.data.push(res); + + break; + case 'x': + bufferlist.getbyte(); + break; + } + this.current_arg++; + } + this.callback(this.data); + return true; +} + +function UnpackStream() +{ + EventEmitter.call(this); + + this.readlist = []; + this.length = 0; + this.offset = 0; + this.read_queue = []; + this.write_queue = []; + this.write_length = 0; +} +util.inherits(UnpackStream, EventEmitter); + +UnpackStream.prototype.write = function(buf) +{ + this.readlist.push(buf); + this.length += buf.length; + this.resume(); +} + +UnpackStream.prototype.pipe = function(stream) +{ + // TODO: ondrain & pause + this.on('data', function(data) + { + stream.write(data); + }); +} + +UnpackStream.prototype.unpack = function(format, callback) +{ + this.read_queue.push(new ReadFormatRequest(format, callback)); + this.resume(); +} + +UnpackStream.prototype.unpackTo = function(destination, names_formats, callback) +{ + var names = []; + var format = ''; + + for (var i=0; i < names_formats.length; ++i) + { + var off = 0; + while(off < names_formats[i].length && names_formats[i][off] == 'x') + { + format += 'x'; + off++; + } + + if (off < names_formats[i].length) + { + format += names_formats[i][off]; + var name = names_formats[i].substr(off+2); + names.push(name); + } + } + + this.unpack(format, function(data) { + if (data.length != names.length) + throw 'Number of arguments mismatch, ' + names.length + ' fields and ' + data.length + ' arguments'; + for (var fld = 0; fld < data.length; ++fld) + { + destination[names[fld]] = data[fld]; + } + callback(destination); + }); +} + +UnpackStream.prototype.get = function(length, callback) +{ + this.read_queue.push(new ReadFixedRequest(length, callback)); + this.resume(); +} + +UnpackStream.prototype.resume = function() +{ + //clog('resume!!!!'); + + if (this.resumed) + return; + if (this.read_queue.length == 0) + { + //clog('at resume: no data, skip'); + return; + } + + this.resumed = true; + //clog('++++++resumed = ' + this.resumed); + // process all read requests until enough data in the buffer + while(this.read_queue[0].execute(this)) + { + //clog('executing read request ...'); + //clog(this.read_queue); + this.read_queue.shift(); + if (this.read_queue.length == 0) + { + //clog('problem?????'); + this.resumed = false; + //clog('------resumed = ' + this.resumed); + return; + } + } + this.resumed = false; + //clog('------resumed = ' + this.resumed); +} + +UnpackStream.prototype.getbyte = function() +{ + var res = 0; + var b = this.readlist[0]; + if (this.offset + 1 < b.length) + { + res = b[this.offset]; + this.offset++; + this.length--; + + } else { + + // last byte in current buffer, shift read list + res = b[this.offset]; + this.readlist.shift(); + this.length--; + this.offset = 0; + } + return res; +} + +// TODO: measure node 0.5+ buffer serialisers performance +UnpackStream.prototype.pack = function(format, args) +{ + var packetlength = 0; + + var arg = 0; + for (var i = 0; i < format.length; ++i) + { + var f = format[i]; + if (f == 'x') + { + packetlength++; + } else if (f == 'a') { + packetlength += args[arg].length; + arg++; + } else { + // this is a fixed-length format, get length from argument_length table + packetlength += argument_length[f]; + arg++; + } + } + + var buf = new Buffer(packetlength); + var offset = 0; + var arg = 0; + for (var i = 0; i < format.length; ++i) + { + switch(format[i]) + { + case 'x': + buf[offset++] = 0; + break; + case 'C': + var n = args[arg++]; + buf[offset++] = n; + break; + case 's': // TODO: implement signed INT16!!! + case 'S': + var n = args[arg++]; + if (this.clientBigEndian) + { + buf[offset++] = n & 0xff; + buf[offset++] = (n >> 8) & 0xff; + } else { + buf[offset++] = (n >> 8) & 0xff; + buf[offset++] = n & 0xff; + } + break; + case 'l': // TODO: implement signed INT32!!! + case 'L': + var n = args[arg++]; + if (this.clientBigEndian) + { + buf[offset++] = n & 0xff; + buf[offset++] = (n >> 8) & 0xff; + buf[offset++] = (n >> 16) & 0xff; + buf[offset++] = (n >> 24) & 0xff; + } else { + buf[offset++] = (n >> 24) & 0xff; + buf[offset++] = (n >> 16) & 0xff; + buf[offset++] = (n >> 8) & 0xff; + buf[offset++] = n & 0xff; + } + break; + case 'a': // string or buffer + var str = args[arg++]; + if (Buffer.isBuffer(str)) + { + str.copy(buf, offset); + offset += str.length; + } else { + // TODO: buffer.write could be faster + for (var c = 0; c < str.length; ++c) + buf[offset++] = str.charCodeAt(c); + } + break; + case 'p': // padded string + var str = args[arg++]; + var len = xutil.padded_length(str.length); + // TODO: buffer.write could be faster + var c = 0; + for (; c < str.length; ++c) + buf[offset++] = str.charCodeAt(c); + for (; c < len; ++c) + buf[offset++] = 0; + break; + } + } + this.write_queue.push(buf); + this.write_length += buf.length; + return this; +} + +UnpackStream.prototype.flush = function(stream) +{ + // TODO: measure performance benefit of + // creating and writing one big concatenated buffer + + // TODO: check write result + // pause/resume streaming + for (var i=0; i < this.write_queue.length; ++i) + { + //stream.write(this.write_queue[i]) + this.emit('data', this.write_queue[i]); + } + this.write_queue = []; + this.write_length = 0; +} + +module.exports = UnpackStream;