//var clog = clog; var clog = function() {}; var EventEmitter = require('events').EventEmitter; var util = require('util'); var argument_length = {}; argument_length.C = 1; argument_length.S = 2; argument_length.s = 2; argument_length.L = 4; argument_length.l = 4; argument_length.x = 1; function ReadFormatRequest(format, callback) { this.format = format; this.current_arg = 0; this.data = []; this.callback = callback; } /* function ReadBuflist(length, callback) { this.length = length this.callback = callback; } */ function ReadFixedRequest(length, callback) { this.length = length; this.callback = callback; //clog(length); this.data = new Buffer(length); this.received_bytes = 0; } ReadFixedRequest.prototype.execute = function(bufferlist) { // TODO: this is a brute force version // replace with Buffer.slice calls // bufferlist: // offset // readlist: // [ b1 b2 b3 b4 b5 ] var to_receive = this.length - this.received_bytes; //clog([bufferlist.offset, bufferlist.length, to_receive]); var buffs = bufferlist.readlist; var off = bufferlist.offset; if (buffs.length == 0) return false; var curbuff = buffs[0]; // first buffer is bigger than request if (curbuff.length - bufferlist.offset >= to_receive) { // copy(targetBuffer, targetStart=0, sourceStart=0, sourceEnd=buffer.length) curbuff.copy(this.data, this.received_bytes, off, off+to_receive); bufferlist.offset += to_receive; this.received_bytes += to_receive; bufferlist.length -= to_receive; if (bufferlist.offset == curbuff.length) { bufferlist.readlist.shift(); bufferlist.offset = 0; } //clog([bufferlist.readlist.length, bufferlist.offset, bufferlist.length, to_receive]); this.callback(this.data); return true; } // while (buffs.length > 0) // { // } if (0)//bufferlist.readlist.length == 1) { var to_receive = this.length - this.received_bytes; var buff = bufferlist.readlist[0]; if ( (buff.length-bufferlist.offset) >= to_receive){ clog(["using Buffer.copy", buff.length]); buff.copy(this.data, to_receive, bufferlist.offset, bufferlist.offset + to_receive); bufferlist.length -= to_receive; return false; } //var to_receive = this.length - this.received_bytes; //clog([bufferlist.readlist.length, bufferlist.offset, bufferlist.length, to_receive]); } //clog([bufferlist.readlist.length, bufferlist.offset, bufferlist.length, to_receive]); //clog(["byte by byte copy", bufferlist.length]); var to_receive = this.length - this.received_bytes; clog([bufferlist.readlist.length, bufferlist.offset, bufferlist.length, to_receive]); for(var i=0 ; i < to_receive; ++i) { if (bufferlist.length == 0) return false; this.data[this.received_bytes++] = bufferlist.getbyte(); } this.callback(this.data); return true; } ReadFormatRequest.prototype.execute = function(bufferlist) { while (this.current_arg < this.format.length) { var arg = this.format[this.current_arg]; if (bufferlist.length < argument_length[arg]) return false; // need to wait for more data to prcess this argument // TODO: measure Buffer.readIntXXX performance and use them if faster // note: 4 and 2-byte values may cross chunk border & split. need to handle this correctly // maybe best approach is to wait all data required for format and then process fixed buffer // TODO: byte order!!! switch (arg) { case 'C': this.data.push(bufferlist.getbyte()); break; case 'S': case 's': var b1 = bufferlist.getbyte(); var b2 = bufferlist.getbyte(); if (bufferlist.serverBigEndian) this.data.push(b2*256+b1); else this.data.push(b1*256+b2); break; case 'l': case 'L': var b1 = bufferlist.getbyte(); var b2 = bufferlist.getbyte(); var b3 = bufferlist.getbyte(); var b4 = bufferlist.getbyte(); var res; if (bufferlist.serverBigEndian) res = (((b4*256+b3)*256 + b2)*256 + b1); else res = (((b1*256+b2)*256 + b3)*256 + b4); if (arg == 'l') { var neg = res & 0x80000000; if (!neg) { this.data.push(res); } else this.data.push((0xffffffff - res + 1) * - 1); } else this.data.push(res); break; case 'x': bufferlist.getbyte(); break; } this.current_arg++; } this.callback(this.data); return true; } function UnpackStream() { EventEmitter.call(this); this.readlist = []; this.length = 0; this.offset = 0; this.read_queue = []; this.write_queue = []; this.write_length = 0; } util.inherits(UnpackStream, EventEmitter); UnpackStream.prototype.write = function(buf) { this.readlist.push(buf); this.length += buf.length; this.resume(); } UnpackStream.prototype.pipe = function(stream) { // TODO: ondrain & pause this.on('data', function(data) { stream.write(data); }); } UnpackStream.prototype.unpack = function(format, callback) { this.read_queue.push(new ReadFormatRequest(format, callback)); this.resume(); } UnpackStream.prototype.unpackTo = function(destination, names_formats, callback) { var names = []; var format = ''; for (var i=0; i < names_formats.length; ++i) { var off = 0; while(off < names_formats[i].length && names_formats[i][off] == 'x') { format += 'x'; off++; } if (off < names_formats[i].length) { format += names_formats[i][off]; var name = names_formats[i].substr(off+2); names.push(name); } } this.unpack(format, function(data) { if (data.length != names.length) throw 'Number of arguments mismatch, ' + names.length + ' fields and ' + data.length + ' arguments'; for (var fld = 0; fld < data.length; ++fld) { destination[names[fld]] = data[fld]; } callback(destination); }); } UnpackStream.prototype.get = function(length, callback) { this.read_queue.push(new ReadFixedRequest(length, callback)); this.resume(); } UnpackStream.prototype.resume = function() { //clog('resume!!!!'); if (this.resumed) return; if (this.read_queue.length == 0) { //clog('at resume: no data, skip'); return; } this.resumed = true; //clog('++++++resumed = ' + this.resumed); // process all read requests until enough data in the buffer while(this.read_queue[0].execute(this)) { //clog('executing read request ...'); //clog(this.read_queue); this.read_queue.shift(); if (this.read_queue.length == 0) { //clog('problem?????'); this.resumed = false; //clog('------resumed = ' + this.resumed); return; } } this.resumed = false; //clog('------resumed = ' + this.resumed); } UnpackStream.prototype.getbyte = function() { var res = 0; var b = this.readlist[0]; if (this.offset + 1 < b.length) { res = b[this.offset]; this.offset++; this.length--; } else { // last byte in current buffer, shift read list res = b[this.offset]; this.readlist.shift(); this.length--; this.offset = 0; } return res; } // TODO: measure node 0.5+ buffer serialisers performance UnpackStream.prototype.pack = function(format, args) { var packetlength = 0; var arg = 0; for (var i = 0; i < format.length; ++i) { var f = format[i]; if (f == 'x') { packetlength++; } else if (f == 'a') { packetlength += args[arg].length; arg++; } else { // this is a fixed-length format, get length from argument_length table packetlength += argument_length[f]; arg++; } } var buf = new Buffer(packetlength); var offset = 0; var arg = 0; for (var i = 0; i < format.length; ++i) { switch(format[i]) { case 'x': buf[offset++] = 0; break; case 'C': var n = args[arg++]; buf[offset++] = n; break; case 's': // TODO: implement signed INT16!!! case 'S': var n = args[arg++]; if (this.clientBigEndian) { buf[offset++] = n & 0xff; buf[offset++] = (n >> 8) & 0xff; } else { buf[offset++] = (n >> 8) & 0xff; buf[offset++] = n & 0xff; } break; case 'l': // TODO: implement signed INT32!!! case 'L': var n = args[arg++]; if (this.clientBigEndian) { buf[offset++] = n & 0xff; buf[offset++] = (n >> 8) & 0xff; buf[offset++] = (n >> 16) & 0xff; buf[offset++] = (n >> 24) & 0xff; } else { buf[offset++] = (n >> 24) & 0xff; buf[offset++] = (n >> 16) & 0xff; buf[offset++] = (n >> 8) & 0xff; buf[offset++] = n & 0xff; } break; case 'a': // string or buffer var str = args[arg++]; if (Buffer.isBuffer(str)) { str.copy(buf, offset); offset += str.length; } else { // TODO: buffer.write could be faster for (var c = 0; c < str.length; ++c) buf[offset++] = str.charCodeAt(c); } break; case 'p': // padded string var str = args[arg++]; var len = xutil.padded_length(str.length); // TODO: buffer.write could be faster var c = 0; for (; c < str.length; ++c) buf[offset++] = str.charCodeAt(c); for (; c < len; ++c) buf[offset++] = 0; break; } } this.write_queue.push(buf); this.write_length += buf.length; return this; } UnpackStream.prototype.flush = function(stream) { // TODO: measure performance benefit of // creating and writing one big concatenated buffer // TODO: check write result // pause/resume streaming for (var i=0; i < this.write_queue.length; ++i) { //stream.write(this.write_queue[i]) this.emit('data', this.write_queue[i]); } this.write_queue = []; this.write_length = 0; } module.exports = UnpackStream;