From 255783b17fa1d3b22ce0a772a0e64cad797cc993 Mon Sep 17 00:00:00 2001 From: sbosse Date: Mon, 21 Jul 2025 22:47:08 +0200 Subject: [PATCH] Mon 21 Jul 22:43:21 CEST 2025 --- js/dos/connNETsrv.js | 794 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 794 insertions(+) create mode 100644 js/dos/connNETsrv.js diff --git a/js/dos/connNETsrv.js b/js/dos/connNETsrv.js new file mode 100644 index 0000000..ff87ae9 --- /dev/null +++ b/js/dos/connNETsrv.js @@ -0,0 +1,794 @@ +/** + ** ============================== + ** O O O OOOO + ** O O O O O O + ** O O O O O O + ** OOOO OOOO O OOO OOOO + ** O O O O O O O + ** O O O O O O O + ** OOOO OOOO O O OOOO + ** ============================== + ** Dr. Stefan Bosse http://www.bsslab.de + ** + ** COPYRIGHT: THIS SOFTWARE, EXECUTABLE AND SOURCE CODE IS OWNED + ** BY THE AUTHOR(S). + ** THIS SOURCE CODE MAY NOT BE COPIED, EXTRACTED, + ** MODIFIED, OR OTHERWISE USED IN A CONTEXT + ** OUTSIDE OF THE SOFTWARE SYSTEM. + ** + ** $AUTHORS: Stefan Bosse + ** $INITIAL: (C) 2006-2017 bLAB + ** $CREATED: 26-06-16 by sbosse. + ** $VERSION: 1.3.3 + ** + ** $INFO: + ** + ** ============================================== + ** DOS: Broker Connection Module + ** Server, TCP connection + ** Data transfer: JSON + ** ============================================== + * + * Default: Mode=AUTO + * + * + * Uni- or bidirectional TCP client-only (Browser app.) Interface with + * non-blocking / blocking communication to a broker server. + * + * Unidrirectional mode: Two non-persistent TCP connections (non-blocking connection) + * Bidirectional mode: One persistent TCP connection (blocking connection) + * Keepalive: reuse socket connction (default) + * + * + ** $ENDOFINFO + */ +"use strict"; +var log = 0; + +var Io = Require('com/io'); +var Net = Require('dos/network'); +var Buf = Require('dos/buf'); +var Rpc = Require('dos/rpc'); +var util = Require('util'); +var net = Require('net'); +var xmldoc = Require('dos/ext/xmldoc'); +var Sch = Require('dos/scheduler'); +var Comp = Require('com/compat'); +var Perv = Comp.pervasives; +var String = Comp.string; +var Array = Comp.array; +var Obj = Comp.object; +var Filename = Comp.filename; +var div = Perv.div; +var Conn = Require('dos/connutils'); + +var isNode = Comp.isNodeJS(); + +var Mode = Conn.Mode; + + +/************************** + ** TCPIP Broker SERVER Interface + **************************/ +/** + * + * @param {port} hostport + * @param {string} srv_url + * @param {string} srv_ipport + * @param {rpcrouter} router + * @constructor + */ +// tyepof options = {hostport,srv_url,srv_ipport,router,verbose,env,mode} +// typeof mode = 'unidir'|'bidir' +var Server = function(options) { + var self=this; + this.env=options.env||{}; + this.srv_url=options.srv_url; // URL + this.srv_ipport=options.srv_ipport; // URL:port + this.hostport=options.hostport; // Public communication Net.Port == Host port + this.status = Net.Status.STD_OK; + this.router=options.router; + this.app=undefined; + this.server=undefined; + // Pending collect requests of clients + this.client_queue=[]; + // Message queue for client collect requests + this.rpcio_queue=[]; + // Client request and reply channel cache (ip+ipport hash) + this.hosts={}; + // Client port-host mapping cache (hostport/sendport hash) + // There can be multiple ports associated with one hosts[ipport] cache entry + this.ports={}; + + this.lock=false; + this.mode=Mode.AUTO; // automatic ONECHAN | TWOCHAN based on client reqeuest + this.keepalive=(options.keepalive==undefined?true:options.keepalive); + + this.timeout=200; // Queue timeout + this.conn_port = Net.uniqport(); + this.rpccon=Rpc.RpcConn( + self.conn_port, + /* + ** send: Connection Forward and Deliver Operation + ** ONECHAN client connectivity: collect + ** TWOCHAN client connectivity: forward + */ + function(rpcio,callback) { + // if (self.forward(msg)==0) queue it; + // We must store and handle these messages for client collection requests!! + // console.log(rpcio) + // console.log(Net.Print.port(rpcio.sendport)) + var msg=self.format(rpcio),res=0, + broadcast=rpcio.operation==Rpc.Operation.LOOKUP||rpcio.operation==Rpc.Operation.WHEREIS; + // Try first direct forwarding ... + if (broadcast) res=self.broadcast(msg); + else res=self.forward(msg); + if (!res || broadcast) { + // Now queue message for collecting clients + rpcio.timeout=Sch.GetTime()+(broadcast?self.timeout:self.timeout*5); + self.rpcio_queue.push(rpcio); + // Trigger schedule + if (callback && rpcio.callback==undefined) rpcio.callback=callback; + else if (rpcio.callback!=undefined) Io.out('[BHPS] Warning: RPCIO has a callback: '+util.inspect(rpcio)); + self.schedule(); + } else { + if (callback) callback(Net.Status.STD_OK,rpcio); + } + }, + // alive: We're handling multiple client connections - we're alive + function() { + return true; + } + ); + this.rpccon.multiport=true; + this.verbose=options.verbose||0; + this.warned=0; + this.last=undefined; + this.stats=this.rpccon.stats; + + // Transfer of multi-part messages? + this.multipart=true; +}; + +Server.prototype.add_host = function (msg,port) { + var ipid=msg.ip+msg.ipport; + // client in TWOCHAN mode, save connection record + if (this.hosts[ipid]) { this.ports[port]=this.hosts[ipid]; this.hosts[ipid].timeout=5000; } + else + this.hosts[ipid]= + this.ports[port]= + {ip:msg.ip,ipport:msg.ipport,nokeepalive:msg.nokeepalive,timeout:5000}; + +} + +/** Broadcast a message to all clients + * + */ +Server.prototype.broadcast = function (msg) { + var self=this, + res=0; + Io.log(((log+this.verbose) < 2) || ('[BTPS] broadcast message')); + + if (msg.status==undefined) msg.status='EOK'; + + for(var i in this.hosts) { + if (this.hosts[i]!=undefined) { + res++; + self.send(self.reply(this.hosts[i],msg)); + } + } + + if (res) this.stats.op_broadcast++; + return res; +} + +/** Client-App collet request (send pending and queued TRANSREQ/TRANSREP/LOOKUP messages) + * Used only in ONECHAN mode!! + * + */ + +Server.prototype.collect = function (msg) { // HTTP Callback handler + var res, + self=this, + timeout=(msg.timeout != undefined) ? (Perv.int_of_string(msg.timeout)) : 0; + if (timeout) msg.timeout=timeout; + + this.stats.op_forward++; + /* + ** 1. Forward TRANS message requests if there are pending requests for the host port. + ** 2. Forward TRANS message replies if there are pending replies for the host port. + ** 3. Forward LOOKUP messages (broadcast) + ** 4. Forward IAMHERE messages + ** If there are actually no messages, add this request to the request queue (if timeout <>0). + */ + /* + ** Forward all pending transaction requests and replies... + */ + Io.log(((log+self.verbose) < 2) || ('[BTPS] collect for client host ' + Net.Print.port(msg.hostport)+ + ' #rpcio='+ Array.length(this.rpcio_queue))); + res=this.schedule_client(msg,true); + + if (res && msg.timeout) + this.client_queue.push(msg); +}; + +/** Create a forward/reply message from a rpcio object + * + */ +Server.prototype.format = function (rpcio) { + var msg,obj,buf; + switch (rpcio.operation) { + case Rpc.Operation.TRANSREQ: + obj = rpcio.to_json('simple'); + msg={rpc:'trans',hostport:rpcio.hostport,sendport:rpcio.sendport, + tid:rpcio.tid,hop:rpcio.hop,header:obj.header,data:obj.data}; + break; + case Rpc.Operation.TRANSREP: + obj = rpcio.to_json('simple'); + msg={rpc:'reply',hostport:rpcio.hostport,sendport:rpcio.sendport, + tid:rpcio.tid,hop:rpcio.hop,header:obj.header,data:obj.data}; + break; + case Rpc.Operation.LOOKUP: + buf = Buf.Buffer(); + Buf.buf_put_port(buf, rpcio.header.h_port); + msg={rpc:'lookup',hostport:rpcio.hostport, + hop:rpcio.hop,data:Buf.buf_to_hex(buf)}; + break; + case Rpc.Operation.IAMHERE: + buf = Buf.Buffer(); + Buf.buf_put_port(buf, rpcio.header.h_port); + msg={rpc:'iamhere',hostport:rpcio.hostport,sendport:rpcio.sendport, + hop:rpcio.hop,data:Buf.buf_to_hex(buf)}; + break; + case Rpc.Operation.WHEREIS: + buf = Buf.Buffer(); + msg={rpc:'whereis',hostport:rpcio.hostport,sendport:rpcio.sendport, + hop:rpcio.hop}; + break; + case Rpc.Operation.HEREIS: + buf = Buf.Buffer(); + msg={rpc:'hereis',hostport:rpcio.hostport,sendport:rpcio.sendport, + hop:rpcio.hop}; + break; + } + if (!msg) console.log(rpcio) + return msg; +} + + + +/** Client-App forwarding (send pending and queued TRANSREQ/TRANSREP/LOOKUP/IAMHERE messages) + * Used only in TWOCHAN mode!! Called from router. + */ + +Server.prototype.forward = function (msg) { + var res=0, + self=this; + + /* + ** 1. Forward TRANS message request if there are pending requests for the host port. + ** 2. Forward TRANS message reply if there are pending replies for the host port. + ** 3. Forward LOOKUP messages (broadcast) + */ + /* + ** Forward all pending transaction requests and replies... + */ + Io.log(((log+this.verbose) < 2) || ( + '[BTPS] forward to host ' + (msg.sendport?Net.Print.port(msg.sendport)+'? '+(this.ports[msg.sendport]!=undefined): + 'broadcast') + )); + + if (msg.status==undefined) msg.status='EOK'; + + // console.log(msg) + if (msg.sendport && this.ports[msg.sendport]) { + res++; + self.send(self.reply(this.ports[msg.sendport],msg)); + } + + if (res) this.stats.op_forward++; + return res; +}; + +/** Initialize this server + * + */ +Server.prototype.init = function (callback) { + var self = this; + this.router.add_conn(self.rpccon); +}; + +Server.prototype.log = function (v) {log=v}; + + +/** Create a reply message from a request message and a reply object. + * + */ +Server.prototype.reply = function (msg,obj) { + var reply={}, + p; + if (msg.ipport) { + // Client in TWOCAHN mode with reply channel + reply.ip=msg.ip; + reply.ipport=msg.ipport; + if (msg.nokeepalive) reply.nokeepalive=true; + } else { + // client in ONECHAN mode, share request channel + reply.socket=msg.socket; + } + for (p in obj) reply[p]=obj[p]; + if (msg.tid != undefined) reply.tid=msg.tid; + switch (msg.type) { + case 'alive': + case 'ask': + case 'notify': + case 'request': + reply.type=msg.type+'-reply'; break; + } + return reply; +} + +/** Main client request entry point. + * + */ +Server.prototype.request = function (msg) { + var i,str, + self=this, + buf, rpcio, hdr, id; + this.rpccon.stats.op_get++; + Io.log(((log+self.verbose) < 2) || ('[BTPS] client request: ' + (msg.rpc||msg.type)+' from '+msg.ip+':'+msg.ipport)); + Io.log(((log+self.verbose) < 3) || ('[BTPS] client request: ' + util.inspect(msg))); + + if (this.multipart) Conn.decode(msg); + + if (msg.rpc != undefined) { + switch (msg.rpc) { + case 'lookup': + /******************************** + ** LOOKUP rpc message + ********************************/ + // hostport tid + buf = Buf.Buffer(msg.data); + // OK this.router.add_host(msg.hostport, this.rpccon.port); + rpcio=self.router.pkt_get(); + rpcio.operation=Rpc.Operation.LOOKUP; + rpcio.connport=this.rpccon.port; + rpcio.hostport=msg.hostport; + rpcio.hop=msg.hop; + //rpcio.sendport=self.router.hostport; + rpcio.header.h_port=Buf.buf_get_port(buf);; + this.router.route(rpcio); + // this.send(this.reply(msg,{status:'EOK'})); + break; + case 'iamhere': + /******************************** + ** IAMHERE rpc message + ********************************/ + buf = Buf.Buffer(msg.data); + this.router.add_host(msg.hostport, this.rpccon.port); + rpcio=this.router.pkt_get(); + rpcio.operation=Rpc.Operation.IAMHERE; + rpcio.connport=this.rpccon.port; + rpcio.hostport=msg.hostport; + rpcio.sendport=msg.sendport; + rpcio.header.h_port=Buf.buf_get_port(buf);; + rpcio.hop=msg.hop; + this.router.route(rpcio); + // this.send(this.reply(msg,{status:'EOK'})); + break; + case 'trans': + /************************************************** + ** RPC TRANS message (Client- /Browser App. transaction request) + *************************************************/ + + rpcio = self.router.pkt_get(); + buf = Buf.Buffer(msg.header); + Buf.buf_get_hdr(buf, rpcio.header); + Io.log((log < 2) || (('[BTPS] trans header: ' + Net.Print.header(rpcio.header)))); + rpcio.init(Rpc.Operation.TRANSREQ, rpcio.header, msg.data); + rpcio.context=undefined; // rpc not from here! + rpcio.tid = msg.tid; + rpcio.hostport = msg.hostport; + rpcio.sendport = msg.sendport; + rpcio.hop=msg.hop; + rpcio.connport=this.rpccon.port; + + if (msg.ipport) + // client in TWOCHAN mode + this.add_host(msg,msg.hostport); +// else + // client in ONECHAN mode +// this.send(self.reply(msg,{status:'EWOULDBLOCK'})); + + this.router.add_host(rpcio.hostport,self.rpccon.port); + this.router.route(rpcio); + break; + case 'reply': + /************************* + * RPC Transaction Reply + ************************/ + + /* + ** Reply for a transaction must be forwarded to the transaction origin collected by the source host application and + ** the next http get/?rpc=request request. + */ + // New transaction request from client app., add it to the transaction queue. + buf = Buf.Buffer(msg.header); + rpcio = this.router.pkt_get(); + Buf.buf_get_hdr(buf,rpcio.header); + rpcio.init(Rpc.Operation.TRANSREP, rpcio.header, msg.data); + rpcio.context=undefined; // rpc not from here! + rpcio.tid = msg.tid; + rpcio.hostport = msg.hostport; + rpcio.sendport = msg.sendport; + rpcio.hop=msg.hop; + rpcio.connport = this.rpccon.port; + Io.log(((log+this.verbose) < 2)||('[BTPS] reply: hostport=' + Net.Print.port(msg.hostport) + + ' sendport='+Net.Print.port(msg.sendport) + + ' tid=' + msg.tid + ' srvport=' + Net.Print.port(rpcio.header.h_port))); + // ?? this.router.add_host(msg.hostport, this.rpccon.port); + this.router.route(rpcio); + //res. + // ONECHAN mode only? + // this.send(this.reply(msg,{status:'EOK'})); + break; + default: + this.send(self.reply(msg,{status:'EINVALID'})); + } + } else if (msg.type != undefined) { + switch (msg.type) { + case 'alive': + /*************************** + ** ALIVE message request + ***************************/ + this.rpccon.stats.op_alive++; + + buf=Buf.Buffer(); + + if (this.verbose && this.router.lookup_host(msg.hostport)==undefined) + Io.out('[BTPS] ALIVE! Adding remote host ' + + Net.Print.port(msg.hostport) + ' ip=' + msg.ip + (msg.ipport!='undefined'?(' ipport=' + msg.ipport):'')); + this.router.add_host(msg.hostport,this.rpccon.port); + if (msg.ipport) + // client in TWOCHAN mode + this.add_host(msg,msg.hostport); + //res. + + Buf.buf_put_port(buf,this.hostport); + this.send(this.reply(msg,{status:'EOK',data:Buf.buf_to_hex(buf)})); + break; + case 'ask': + /*************************** + ** ASK message request + ***************************/ + this.rpccon.stats.op_ask++; + buf=Buf.Buffer(); + //console.log(self.env[msg.xname]); + Buf.buf_put_string(buf,this.env[msg.xname]||'undefined'); + this.send(this.reply(msg,{status:'EOK', data:Buf.buf_to_hex(buf)})); + break; + case 'notify': + /*************************** + ** NOTIFY message request + ***************************/ + this.rpccon.stats.op_notify++; + this.env[msg.xname]=msg.xvalue; + //console.log(self.env[msg.xname]); + this.send(this.reply(msg,{status:'EOK'})); + break; + case 'request': + /************************************************** + ** RPC collect request (ONECHAN mode only) + *************************************************/ + // Used only in ONECHAN mode! + if (msg.ipport) // client in TWOCHAN mode + this.send(this.reply(msg,{status:'EINVALID'})); + else + this.collect(msg); + break; + default: + this.send(self.reply(msg,{status:'EINVALID'})); + } + } else + this.send(self.reply(msg,{status:'EINVALID'})); +}; + + +/* +** Schedule all pending client-side requests if there is matching input. +** Called from router function. Lock required! +*/ + +Server.prototype.schedule = function () { + var self = this; + + if (self.lock) return; + this.rpccon.stats.op_schedule++; + Io.log(((log+self.verbose)<2) || ('[BTPS] schedule #clients='+this.client_queue.length+' .. ')); + + this.lock = true; + this.client_queue = Array.filter(self.client_queue, function (client) { + /* + * Forward all matching pending transaction requests and replies... + */ + Io.log(((log+self.verbose) < 2) || ('[BTPS] schedule for client host ' + Net.Print.port(client.hostport)+ + ' #rpcio='+ Array.length(self.rpcio_queue))); + return self.schedule_client(client,true); + }); + this.lock = false; +}; + +/** Schedule pending transaction replies for a client (message) + * + */ + +Server.prototype.schedule_client = function (client /* msg */, discard) { + var msgn = 0, + self = this, + data = [], + hostport = client.hostport, + len=this.rpcio_queue.length, + buf; + + this.rpcio_queue = Array.filter(this.rpcio_queue, function (rpcio) { + var hostport2; + switch (rpcio.operation) { + case Rpc.Operation.TRANSREQ: + case Rpc.Operation.TRANSREP: + hostport2 = self.router.lookup_port(rpcio.header.h_port); + if ((hostport2 && rpcio.operation==Rpc.Operation.TRANSREQ && Net.port_cmp(hostport,hostport2)) || + (rpcio.sendport && rpcio.operation==Rpc.Operation.TRANSREP && Net.port_cmp(hostport,rpcio.sendport)) || + (rpcio.sendport && rpcio.operation==Rpc.Operation.TRANSREQ && Net.port_cmp(hostport,rpcio.sendport))) { + data.push(rpcio.to_json('extended')); + msgn++; + if (rpcio.callback) {rpcio.callback(Net.Status.STD_OK,rpcio); rpcio.callback=undefined;}; + return false; + } else return true; + break; + + /* + ** Add all pending server port LOOKUP requests that are broadcast messages! ... + ** Forward a LOOKUP of a server port search only once to a client host! + ** Do not broadcast a LOOKUP to the initiator host (hostport)! + */ + case Rpc.Operation.LOOKUP: + if (rpcio.cache[hostport]==undefined && !Net.port_cmp(hostport,rpcio.hostport)) { + Io.log((self.router.monitor < 2) || ('[BTPS] schedule: lookup '+Rpc.Print.rpcio(rpcio))); + rpcio.cache[hostport]=1; + buf = Buf.Buffer(); + Buf.buf_put_port(buf, rpcio.header.h_port); + data.push({ + rpc:'lookup', + hostport:rpcio.hostport, + data:Buf.buf_to_hex(buf) + }); + // Broadcast message! rpcio.callback handled in garbage collector + msgn++; + } + return true; + break; + + case Rpc.Operation.IAMHERE: + if (Net.port_cmp(hostport,rpcio.sendport)) { + buf = Buf.Buffer(); + Buf.buf_put_port(buf, rpcio.header.h_port); + data.push({ + rpc:'iamhere', + hostport:rpcio.hostport, + sendport:rpcio.sendport, + data:Buf.buf_to_hex(buf) + }); + msgn++; + if (rpcio.callback) {rpcio.callback(Net.Status.STD_OK,rpcio); rpcio.callback=undefined;}; + return false; + } else return true; + break; + } + return true; + }); + + Io.log(((log+self.verbose) < 2) || ('[BTPS] schedule_client for host ' + Net.Print.port(hostport)+ ' ['+len+'] '+util.inspect(data))); + + if (msgn>0) { + self.send(self.reply(client,{status:'EOK',data:data})); + return false; + } else if (client.timeout<=0) { + self.send(self.reply(client,{status:'ENOENTR'})); + return false; + } else { + return true; + } +} + + +/** Send a message to a destination + * + */ +// function send(repsonse:socket,data:string) +Server.prototype.send = function (msg) { + var client, + self=this, + socket, + data,ipid; + //response.writeHead(200, {'Content-Type': 'text/plain; charset=utf8'}); + //response.writeHead(200); + this.rpccon.stats.op_send++; + if (msg.socket) + // client in ONECHAN mode + {socket=msg.socket; msg.socket=undefined;}; + if (this.multipart) Conn.encode(msg); + + Io.log(((log+this.verbose)<2)||('[BTPS] send ['+util.inspect(msg)+'] -> '+ + (socket?'socket':(msg.ip+':'+msg.ipport)))); + if (socket) { + // client in ONECHAN mode + data = JSON.stringify(msg); + // console.log(data); + if (this.multipart) socket.write(data+Conn.EOM); + else socket.write(data); + this.warned=0; + } else if (msg.ip) { + // client in TWOCHAN mode + + data = JSON.stringify(msg); + ipid=msg.ip+msg.ipport; + if (self.hosts[ipid] && self.hosts[ipid].socket) { + // use cached socket + // console.log('using socket'); + client=self.hosts[ipid].socket; + if (self.multipart) client.write(data+Conn.EOM); + else client.write(data); + self.hosts[ipid].timeout=5000; + } else { + client = net.connect(msg.ipport,msg.ip, function() { + // console.log('connected to client'); + self.warned=0; + client.setNoDelay(true); + if (self.multipart) client.write(data+Conn.EOM); + else client.write(data); + if (!self.keepalive || msg.nokeepalive) { + //client.end(); + client.destroy(); + client=undefined; + } + if (client && self.hosts[ipid]) { + self.hosts[ipid].socket=client; + self.hosts[ipid].timeout=5000; + } else if (client) self_add_host(msg,msg.sendport); + }); + client.on('error', function (e) { + if (self.warned<2) { + Io.out('[BTPS] Communication error to ' +msg.ip+':'+msg.ipport+(self.warned==1?' (more)':'')+' : '+e); + self.warned++; + } + }); + } + } else Io.warn('[BTPS] Cannot send message: Neither return IP nor request socket for: '+util.inspect(msg)); +}; + + + + + +/** + * + * @param {number} [interval] + */ +Server.prototype.start = function (interval) { + var self = this; + // Service client requests + this.server = net.createServer(function (socket){ + + // console.log(socket); + // console.log('Connection ..'); + socket.setNoDelay(true); + socket.on('data', function(data) { + var req,part,parts; + Io.log(((log+self.verbose)<2)||('[BTPS] Received: ' + data)); + if (self.multipart) parts=Conn.splitData(data); + else parts=[data]; + if (self.last) parts[0]=self.last+parts[0]; + if (Array.last(parts) != '') self.last=Array.last(parts); else self.last=undefined; + + // console.log(parts) + + for(part in parts) { + if (parts[part]=='') continue; + req = JSON.parse(parts[part]); + if (self.multipart) Conn.decode(req); + if (!req.ipport) req.socket=socket; // client is in ONECHAN mode + else if (!self.keepalive || req.nokeepalive) + // socket.end(''); // client is in TWOCHAN mode TODO keepalive? + socket.destroy(); + self.request(req); + } + }); + socket.on("error", function (er) { + if (er != 'Error: This socket has been ended by the other party') + Io.out('[BTPS] Communication error on *:' + self.srv_ipport+' : '+er); + }); + socket.on('close', function(data) { + // console.log('CLOSED: ' + socket.remoteAddress +' '+ socket.remotePort); + }); + }); + if (!interval) interval=1000; + /* + ** Start a client service and garbage collector for blocked client-side transactions (self.client_queue) + */ + Sch.AddTimer(interval, 'BTPS Garbage Collector', function (context) { + var h,p, host, port, time; + Io.log(((log+self.verbose)<4)||('[BTPS] Garbage Collector')); + if (!self.lock) { + self.lock = true; + self.client_queue = Array.filter(self.client_queue, function (client) { + var res = self.schedule_client(client,false /*?*/); + client.timeout -= interval; + return res; + }); + for (h in self.hosts) { + host=self.hosts[h]; + // console.log(self.hosts[p]) + if (self.hosts[h]) { + + host.timeout = host.timeout - interval; + if (host.timeout <= 0) { + Io.log(((log+self.verbose)<2)||('[BTPS] Host cache: Removing host '+ + host.ip+':'+host.ipport)); + self.hosts[h]=undefined; + for (p in self.ports) { + port=self.ports[p]; + if (port) { + if (host.ip==port.ip && host.ipport==port.ipport) { + self.ports[p]=undefined; + Io.log(((log+self.verbose)<2)||('[BTPS] Host cache: Removing port '+Net.Print.port(p)+' for host '+ + host.ip+':'+host.ipport)); + } + } + } + } + } + } + time=Sch.GetTime(); + self.rpcio_queue = Array.filter(self.rpcio_queue, function (rpcio) { + if (rpcio.timeout > 0 && rpcio.timeout <= time) { + Io.log(((log+self.verbose)<2)||('[BTPS] RPCIO cache: Removing '+ + Rpc.Print.rpcio(rpcio))); + + if (rpcio.callback) { rpcio.callback( + rpcio.operation==Rpc.Operation.LOOKUP?Net.Status.STD_OK:Net.Status.RPC_FAILURE, + rpcio); rpcio.callback=undefined;}; + return false; + } + else return true; + }); + + self.lock = false; + } else { + context.blocked=false; + context.timer=Sch.time; + Sch.ScheduleNext(); + } + }); + this.server.listen(this.srv_ipport, function() { +// console.log('[BTPS] Listening on *:' + self.server.address().port+ ' TCPNET in mode '+self.mode+ +// (self.keepalive?' KEEPALIVE':'')); + console.log('[BTPS] Listening on *:' + self.server.address().port+ ' TCPNET in mode '+self.mode); + }); +}; + +Server.prototype.stop = function () { + Sch.RemoveTimer('BTPS Garbage Collector'); +} + +module.exports = { + /** RCP Broker Server for client-capable-only applications + * + */ + // typeof options = {hostport:port,srv_ip,srv_ipport,router,verbose?,env?:{}} + Mode: Mode, + Server: function(options) { + var obj = new Server(options); + Object.preventExtensions(obj); + return obj; + } +};