diff --git a/js/dos/old/router.js b/js/dos/old/router.js new file mode 100644 index 0000000..e1220d0 --- /dev/null +++ b/js/dos/old/router.js @@ -0,0 +1,1827 @@ +/** + ** ============================== + ** O O O OOOO + ** O O O O O O + ** O O O O O O + ** OOOO OOOO O OOO OOOO + ** O O O O O O O + ** O O O O O O O + ** OOOO OOOO O O OOOO + ** ============================== + ** Dr. Stefan Bosse http://www.bsslab.de + ** + ** COPYRIGHT: THIS SOFTWARE, EXECUTABLE AND SOURCE CODE IS OWNED + ** BY THE AUTHOR(S). + ** THIS SOURCE CODE MAY NOT BE COPIED, EXTRACTED, + ** MODIFIED, OR OTHERWISE USED IN A CONTEXT + ** OUTSIDE OF THE SOFTWARE SYSTEM. + ** + ** $AUTHORS: Stefan Bosse + ** $INITIAL: (C) 2015-2016 bLAB + ** $CREATED: 4-4-15. + ** $VERSION: 1.3.17 + ** + ** $INFO: + ** + ** RPCIO Message ROUTER + * + * There are: + * + * A. Client-app hosts using Broker (Synchronous HTTP broker connection, i.e., Browsers, Asynchronous/Synchronous TCPNET broker connection) + * B. Client-app hosts P2P (UDP VLC connection) + * C. Broker Server (servicing client HTTP / TCPNET connctions) + * D. Mixed app-mode B+C + * + * + * The router passes RPC messages (trans, getreq, putrep, lookup, iamhere,...) between different + * processes of a host (local routing), manages synchronization with the scheduler, sends messages + * to a broker server, and collects messages from a broker server (if this is a broker client + * application). The router manages server port look-ups, generates lookup messages, queues + * transactions, and is the interface between processes and the network. Finally, the router forwards + * messages between different nodes using P2P connections. + ** + ** $ENDOFINFO + */ + +"use strict"; +var log = 0; +var timestamp=0; + +var util = Require('util'); +var Io = Require('com/io'); +var Net = Require('dos/network'); +var Buf = Require('dos/buf'); +var Rpc = Require('dos/rpc'); +var Conn = Require('dos/connutils'); +var Sch = Require('dos/scheduler'); +var Comp = Require('com/compat'); +var Perv = Comp.pervasives; +var String = Comp.string; +var Obj = Comp.obj; +var Array = Comp.array; +var Printf = Comp.printf; +var xmldoc = Require('dos/ext/xmldoc'); +var trace = Io.tracing; +var Status = Net.Status; +var assert = Comp.assert; +var div = Comp.div; + +var PORT_CACHE_MAXLIVE = 200; + +/* +** Allocate pool table in chunks of size... + */ +var CHUNK_SIZE = 100; + +/** +** The router passes RPC IO handlers from clients +** to servers and vice versa. The two different router modes: server (http server capable) +** and application mode (only http client capable) and requiring a broker server for global routing. +** +** There are : +** 1. Local Servers (getreq, putrep) +** 2. Local Clients (trans) +** 3. Remote Servers +** 4. Remote Clients + * +** There are: + * 1. Broker Client Apps (Browser-app, node-app) + * 2. P2P Client Apps (node-app, broker) + * 3. Server Apps (broker) + * +** The router must be configured differently for client applications (Browser comp.) and +** the broker server (node.js comp.). Additionally, a client app. can be hosted (node.js comp.), +** embedding the broker server. The broker server includes a in-memory DNS server. + * + * + * @param {port} hostport + * @constructor + * @typedef {{hostport:port,broker:httpConnection,connections:rpcconn [],trans_queue:[],req_queue:[],lookup_queue:[],lookup_cache:[],port_mapping:[], + * host_mapping:[],rpcio_pool:rpcio [],rpcio_pool_head,rpcio_pool_size,rpcio_pool_next,event:function(), + * stats}} rpcrouter~obj + * @see rpcrouter~obj + * @see rpcrouter~meth + */ +var rpcrouter = function (hostport,options) { + this.options=options||{}; + // this public host port + this.hostport=hostport; + // Is the router used by a broker server? + this.broker=undefined; + + // Connection to a broker (HTTP client mode, Browser App.) : Connection + this.brokerconn=undefined; + this.brokerserver=false; + + // Connections links to other nodes, only in Server-App. mode used (broker server, node) : [rpcconn] + this.connections=[]; + + // Local service: transaction queue (local and remote transactions serviced locally) + this.trans_queue=[]; + // Local service: local server request queue + this.req_queue=[]; + + // Pending lookup of service ports (rpcio array) + this.lookup_queue=[]; + // Cache all client hosts checking already for a server port + this.lookup_cache=[]; + + /* + ** Host/Application port -- Server port mapping + ** Hash table: key:public app-server port, value: public host port + ** port_mapping[srvport]==hostport + */ + this.port_mapping=[]; // server port => host port + this.port_mapping_live=[]; // Timeout for garbage colletion + + /* + ** Hostport routing table + ** Hash table: key:public host port, value: connection port + ** host_mapping[hostport]==[url,ipport] + ** + */ + this.host_mapping=[]; // host port => connection link port + this.host_mapping_live=[]; + + /* + ** Semi-static packet pool (rpcio ~ RPC/FLIP packet) + */ + this.rpcio_pool=[]; + this.rpcio_pool_head=-1; + this.rpcio_pool_size=0; + this.rpcio_pool_next=-1; + + /* + ** Externally supplied event handler(s) + */ + this.event=[]; + + this.stats = { + op_transreq:0, + op_transrep:0, + op_getreq:0, + op_putrep:0, + op_lookup:0, + op_whereis:0, + op_hereis:0, + op_iamhere:0, + op_brokerreq:0, + op_brokerrep:0, + op_brokernoent:0, + op_transabort:0, + op_lookupabort:0, + op_error:0 + }; + + this.monitor=this.options.monitor||0; + this.verbose=this.options.verbose||0; +}; + + +/** + * @typedef {{route:rpcrouter.route}} rpcrouter~meth + */ + +/** + ** Add a client-to-broker connection. + ** A HTTP or TPCNET broker connection is used by a client application. The connection + ** provides the send and service operations to the broker server. + * + * + * @param {httpConnection} conn + */ +rpcrouter.prototype.set_broker_conn = function(conn) { + this.brokerconn=conn; + this.add_conn(conn.rpccon); +}; + +/** + * Add a broker server object (i.e., the router is used by a broker server). + */ +rpcrouter.prototype.set_broker = function(service) { + this.broker=service; +}; + +rpcrouter.prototype.log = function(v) { + log=v; +}; + +/** + * Add an optional event handler called each time the trans_queue,.., are modified. + * + * @param {function} evfun + */ +rpcrouter.prototype.add_event = function(evfun) { + this.event.push(evfun); +}; + +/* +** Packet pool (rpcio handle) management +*/ + +rpcrouter.prototype.pkt_alloc = function() { + var rpcio; + var off=this.rpcio_pool_size; + for(var i=0;i= 0) { + next=this.rpcio_pool_next; + this.rpcio_pool_next=-1; + } else if (this.rpcio_pool_head==this.rpcio_pool_size-1) { + // Try to find a free slot + loop: for(var i=0;i=this.rpcio_pool_size) + this.pkt_alloc(); + } + if (next!=-1) { + rpcio = this.rpcio_pool[next]; + // Only a handle in use has a valid index value! + rpcio.index=next; + this.pkt_init(rpcio); + } + else rpcio = undefined; + return rpcio; +}; + +/** + * Return a packet rpcio object to the pool. + * + * @param {rpcio} rpcio + */ +rpcrouter.prototype.pkt_discard = function(rpcio) { + if (rpcio.index != -1) this.rpcio_pool_next=rpcio.index; + else Io.warn('pkt_discard: packet not from packet pool! '+Rpc.Print.rpcio(rpcio)); + // mark rpcio packet handle as free! + rpcio.index=-1; + rpcio.data=undefined; +}; + +/** +** Add and remove local service ports (register public port of a server) +* + * + * @param {port} srvport + * @param {port} [hostport] + * @param [timeout] + */ +rpcrouter.prototype.add_port = function(srvport,hostport,timeout) { + var exists=false; + if (hostport==undefined) hostport=this.hostport; + if (this.port_mapping[srvport] != undefined) exists=true; + this.port_mapping[srvport]=hostport; + if (timeout != undefined) + this.port_mapping_live[srvport]=timeout; + else + this.port_mapping_live[srvport]=PORT_CACHE_MAXLIVE; + // forward local server ports to the broker server if in client appl. mode + if (!exists && this.brokerconn && this.brokerconn.status == Net.Status.STD_OK && Net.port_cmp(hostport,this.hostport)) { + // short message + + Io.log((this.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] add_port: publishing port on broker '+Net.Print.port(srvport))); + var msg = {type:'iamhere',hostport:this.hostport,srvport:srvport}; + this.brokerconn.send(msg,function (reply) { + Io.log((log<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] add_port: got answer from broker '+reply)); + }) + } else if (!exists) { + Io.log((this.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] add_port: publishing port '+Net.Print.port(srvport))); + + } +}; + +/** + * + * @param {port} srvport + */ +rpcrouter.prototype.remove_port = function(srvport) { + this.port_mapping[srvport]=undefined; +}; + +/** + * + * @param {port} srvport + * @returns {port} + */ +rpcrouter.prototype.lookup_port = function(srvport) { + var found = this.port_mapping[srvport] ; + // Refresh live time + if (found!= undefined && this.port_mapping_live[srvport]>0) + this.port_mapping_live[srvport]=PORT_CACHE_MAXLIVE; + return found; +}; + +/** +** Add a local RPC service listener (ready getreq called by a server) + * + * + * @param rpcio + */ +rpcrouter.prototype.add_service = function(rpcio) { + this.req_queue.push(rpcio); +}; + +/** Find a RPC service listener (matching getreq) + * + * @param {port} port + * @param {rpcio} rpcio + * @param {function} callback + * @returns {rpcio} + */ +rpcrouter.prototype.lookup_service = function(port,rpcio,callback) { + var found=undefined; + loop: for (var i in this.req_queue) { + var rpcios=this.req_queue[i]; + // public port compare + if (Net.port_cmp(rpcios.pubport,port)) { + // TBD check valid hdr.hdr_priv field + found=rpcios; + Net.Copy.header(rpcio.header,rpcios.header); + rpcios.tid=rpcio.tid; + Buf.buf_copy(rpcios,rpcio); + if (callback != undefined) rpcios.callback=callback; + this.req_queue.splice(i,1); + break loop; + } + } + return found; +}; + +/** +** Add a local (client&server appl.) or remote (server app. only) pending transaction +** (1. local server not ready, but it is registered, 2. remote client has not downloaded the request) + * + * + * @param {rpcio} rpcio + */ +rpcrouter.prototype.add_trans = function(rpcio) { + this.trans_queue.push(rpcio); + /* + ** Trigger a callback notification function that handles transaction queue updates. It is optional. + */ + if (this.event.length>0) + Array.iter(this.event,function (f) {f()}); +}; + +/** Remove a RPCIO transaction from the queue. + * + * @param rpcio + * @returns {boolean} + */ +rpcrouter.prototype.remove_trans = function(rpcio) { + var found=false; + loop: for (var i in this.trans_queue) { + var rpcioc=this.trans_queue[i]; + if (rpcioc.index==rpcio.index) { + found=true; + this.trans_queue.splice(i,1); + break loop; + } + } + return found; +}; + +/** Find and remove a queued transaction for public RPC server port 'port'. +** + * + * @param {port} srvport + * @param {rpcio} rpcio + * @returns {rpcio} + */ +rpcrouter.prototype.lookup_trans = function(srvport,rpcio) { + var rpcios=rpcio; + var found=undefined; + loop: for (var i in this.trans_queue) { + var rpcioc=this.trans_queue[i]; + // public port compare + + // TBD + if (rpcioc.operation==Rpc.Operation.TRANSREQ && Net.port_cmp(rpcioc.header.h_port,srvport)) { + // TBD check valid hdr.hdr_priv field + found=rpcioc; + if (rpcios != undefined) { + Net.Copy.header(rpcioc.header,rpcios.header); + Buf.buf_copy(rpcios,rpcioc); + rpcios.tid = rpcioc.tid; + if (rpcioc.callback != undefined) rpcios.callback = rpcioc.callback; + } + this.trans_queue.splice(i,1); + break loop; + } + } + return found; +}; + +/** Find and remove a queued transaction for: +** 1. A server on host 'hostport' (TRANSREQ) +** 2. A client on host 'hostport' (TRANSREP) + * + * + * @param {port} hostport + * @param {rpcio} [rpcio] + * @returns {rpcio} + */ +rpcrouter.prototype.lookup_trans_for_host = function(hostport,rpcio) { + var rpcios=rpcio; + var found=undefined; + loop: for (var i in this.trans_queue) { + var rpcioc=this.trans_queue[i]; + // public port compare + var hostport2 = this.lookup_port(rpcioc.header.h_port); + Io.log((this.monitor<10)||('[ROUT'+(timestamp?Perv.mtime():'')+'] lookup_trans_for_host: '+ + Net.Print.port(hostport)+'? '+Net.Print.port(rpcioc.header.h_port)+' -> '+Net.Print.port(hostport2))); + if ((hostport2 && rpcioc.operation==Rpc.Operation.TRANSREQ && Net.port_cmp(hostport,hostport2)) || + (rpcioc.sendport && rpcioc.operation==Rpc.Operation.TRANSREP && Net.port_cmp(hostport,rpcioc.sendport)) || + (rpcioc.sendport && rpcioc.operation==Rpc.Operation.TRANSREQ && Net.port_cmp(hostport,rpcioc.sendport))) { + found=rpcioc; + if (rpcios != undefined) { + Net.Copy.header(rpcioc.header,rpcios.header); + Buf.buf_copy(rpcios,rpcioc); + rpcios.tid = rpcioc.tid; + if (rpcioc.callback != undefined) rpcios.callback = rpcioc.callback; + } + this.trans_queue.splice(i,1); + break loop; + } + } + return found; +}; + +/** Find and remove a queued waiting transaction with transaction number tid + * Default expected operation: TRANSAWAIT (can be overriden). + * + * @param {port} srvport + * @param {number} tid + * @param {rpcio} [rpcio] + * @param {Operation} [operation] + * @returns {rpcio} + */ +rpcrouter.prototype.lookup_trans_await = function(srvport,tid,rpcio,operation) { + var rpcios=rpcio; + var found=undefined; + loop: for (var i in this.trans_queue) { + var rpcioc=this.trans_queue[i]; + // public port compare + if (rpcioc.operation==(operation||Rpc.Operation.TRANSAWAIT) && + rpcioc.tid==tid && Net.port_cmp(rpcioc.header.h_port,srvport)) { + found=rpcioc; + if (rpcios != undefined) { + Net.Copy.header(rpcios.header,rpcioc.header); + Buf.buf_copy(rpcioc,rpcios); + rpcioc.tid = rpcios.tid; + } + this.trans_queue.splice(i,1); + break loop; + } + } + return found; +}; + +/** Find all queued waiting transactions for the specified RPC server port. + * + * + * @param {port} srvport + * @returns {rpcio []} + */ +rpcrouter.prototype.find_all_trans = function(srvport) { + var found=[]; + for (var i in this.trans_queue) { + var rpcioc=this.trans_queue[i]; + // public port compare + if (Net.port_cmp(rpcioc.header.h_port,srvport)) { + found.push(rpcioc); + } + } + return found; +}; + + +/** + ** Create and add a new pending RPC server lookup (register public port of a server) + * + * + * @param {port} srvport + * @returns {rpcio} + */ +rpcrouter.prototype.new_lookup = function(srvport) { + var i,found; + found=undefined; + loop: for(i in this.lookup_queue) { + var lookup=this.lookup_queue[i]; + if (Net.port_cmp(lookup.header.h_port,srvport) && + Net.port_cmp(lookup.hostport,this.hostport)) { + found=lookup; + break loop; + } + } + if (!found) { + var rpcio=this.pkt_get(); + rpcio.init(Rpc.Operation.LOOKUP); + rpcio.header.h_port=srvport; + rpcio.hostport=this.hostport; + rpcio.timeout=Sch.GetTime()+Net.TIMEOUT; + this.lookup_queue.push(rpcio); + this.lookup_cache.push([]); + found=rpcio; + } + return found; +}; + +/** + ** Add a pending RPC server lookup + * + * + * @param {rpcio} rpcio + */ +rpcrouter.prototype.add_lookup = function(rpcio) { + var i,found; + found=undefined; + loop: for(i in this.lookup_queue) { + var lookup=this.lookup_queue[i]; + if (Net.port_cmp(lookup.header.h_port,rpcio.header.h_port)) { + found=lookup; + break loop; + } + } + if (!found) { + this.lookup_queue.push(rpcio); + this.lookup_cache.push([]); + } + if (this.event.length>0) + Array.iter(this.event,function (f) {f()}); +}; + +/** Remove all pending LOOKUPs for specified RPC server port. + * Return all found LOOKUP messages. + * + * @param {port} srvport + * @returns {rpcio []} + */ +rpcrouter.prototype.remove_lookup = function(srvport) { + var i; + var found = []; + loop: for(i in this.lookup_queue) { + var lookup=this.lookup_queue[i]; + if (Net.port_cmp(lookup.header.h_port,srvport)) { + this.lookup_queue.splice(i,1); + this.lookup_cache.splice(i,1); + found.push(lookup); + //this.pkt_discard(lookup); + } + } + return found; +}; + +/** Host to connection link port (route to host) mapping + * + * @param {port} hostport + * @param {port} connport + */ +rpcrouter.prototype.add_host = function(hostport,connport) { + this.host_mapping[hostport]=connport; + this.host_mapping_live[hostport]=PORT_CACHE_MAXLIVE; +}; + +/** + * + * @param {port} hostport + */ +rpcrouter.prototype.remove_host = function(hostport) { + this.host_mapping[hostport]=undefined; + this.host_mapping_live[hostport]=undefined; +}; + +/** + * + * @param {port} hostport + * @param {port} connport + */ +rpcrouter.prototype.update_host = function(hostport,connport) { + this.host_mapping[hostport]=connport; + this.host_mapping_live[hostport]=PORT_CACHE_MAXLIVE; +}; +/** Return the communication port that connects the endpoint of the path to the host. + * + * @param {port} hostport + * @returns {port} + */ +rpcrouter.prototype.lookup_host = function(hostport) { + var found = this.host_mapping[hostport]; + if (found!=undefined) this.host_mapping_live[hostport]=PORT_CACHE_MAXLIVE; + return found; +}; + +/** + ** Add a pending host lookup (find connection link port to host) + * + * + * @param {port} hostport + * @returns {rpcio} + */ +rpcrouter.prototype.add_hostlookup = function(hostport) { + var i,found; + found=undefined; + for(i in this.lookup_queue) { + var lookup=this.lookup_queue[i]; + if (Net.port_cmp(lookup.sendport,hostport)) found=lookup; + } + if (!found) { + var rpcio=this.pkt_get(); + rpcio.init(Rpc.Operation.WHEREIS); + rpcio.hostport=this.hostport; + rpcio.sendport=hostport; + rpcio.timeout=Sch.GetTime()+Net.TIMEOUT; + this.lookup_queue.push(rpcio); + this.lookup_cache.push([]); + found=rpcio; + } + return found; +}; + +/** + * + * @param {port} hostport + */ +rpcrouter.prototype.remove_hostlookup = function(hostport) { + var i; + var found=[]; + loop: for(i in this.lookup_queue) { + var lookup=this.lookup_queue[i]; + if (Net.port_cmp(lookup.header.h_port,Net.nilport) && + Net.port_cmp(lookup.sendport,hostport)) { + this.lookup_queue.splice(i,1); + this.lookup_cache.splice(i,1); + found.push(lookup); + //this.pkt_discard(lookup); + } + } + return found; +}; + +/** Add a connection link to this router. + * + * @param {rpcconn} conn + */ +rpcrouter.prototype.add_conn = function(conn) { + this.connections[conn.port]=conn; +}; + +/** Remove a connection link. + * + * @param {rpcconn} conn + */ +rpcrouter.prototype.remove_conn = function(conn) { + this.connections[conn.port]=undefined; +}; + +/** Find a connection link by its connection port. + * + * @param {port} connport + * @returns {rpcconn} + */ +rpcrouter.prototype.find_conn = function(connport) { + return this.connections[connport]; +}; + +/** Set default cache table entry timeout + * + * @param tmo + */ +rpcrouter.prototype.set_cachetimeout = function (tmo) { + PORT_CACHE_MAXLIVE=tmo; +}; + +/** Broadcast a message (e.g., LOOKUP) to all known connection links w/o the + * rpcio.connport link (was the incoming link of rpcio message). + * + * @param rpcio + */ +rpcrouter.prototype.broadcast = function(rpcio) { + var self=this, + buf,msg,data; + for (var i in this.connections) { + var conn=self.connections[i]; + if (conn.alive() && conn.send!=undefined && !Net.Equal.port(conn.port,rpcio.connport)) conn.send(rpcio); + } + // TWOCHAN message broadcasting? + if (this.broker.broadcast) { + switch (rpcio.operation) { + case Rpc.Operation.LOOKUP: + buf = Buf.Buffer(); + Buf.buf_put_port(buf, rpcio.header.h_port); + data = [{rpc:'lookup',hostport:rpcio.hostport, + data:Buf.buf_to_hex(buf)}]; + msg = {type:'broadcast',data:data}; + this.broker.broadcast(msg); + // console.log(msg) + break; + } + } +}; + +/** + * + * @param rpcio + * @param [connport] + * @param {function((Status.STD_OK|*),rpcio)} [callback] + * @returns {number} status + */ +rpcrouter.prototype.forward = function (rpcio,connport,callback) { + var self=this; + assert((rpcio!=undefined)||('Router.forward: rpcio undefined')); + rpcio.hop++; + var conn = this.find_conn(connport||rpcio.connport); + if (!conn || conn.send==undefined || !conn.alive() || rpcio.hop > rpcio.max) return 0; + assert((conn!=undefined)||('Router.forward: invalid connection link '+Net.Print.port(connport||rpcio.connport))); + conn.send(rpcio,callback); + //Io.log((log<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] Router.forward, cannot forward: connection '+Net.Print.port(connport||rpcio.connport)+'not alive!')) + return 1; +}; + +/** Swap hostport<->sendport + * + * @param rpcio + */ +rpcrouter.prototype.swapports = function(rpcio) { + var tmp = rpcio.hostport; + rpcio.hostport=rpcio.sendport; + rpcio.sendport=tmp; +}; + + +/** + * + */ +rpcrouter.prototype.status = function() { + var str =''; + var nl = '\n'; + var i; + var trans,port,lookup; + var used=0; + for(i in this.rpcio_pool) { + trans = this.rpcio_pool[i]; + if (trans.index!=-1) used++; + } + + str=str+'RPCIO Pool: '+used+'/'+this.rpcio_pool_size+' (next '+this.rpcio_pool_next+', head '+this.rpcio_pool_head+')'+nl; + for(i in this.rpcio_pool) { + trans = this.rpcio_pool[i]; + if (trans.index!=-1) { + str = str + ' ' + trans.index+': '+Rpc.Operation.print(trans.operation); + if (trans.timeout>0) str=str+' tmo='+trans.timeout; + if (trans.tid >= 0) str=str+' tid='+trans.tid; + if (trans.status) str=str+' status='+Status.print(trans.status); + if (trans.context) str=str+' context='+trans.context.id; + if (trans.hostport) str=str+' hostport='+Net.Print.port(trans.hostport); + if (trans.sendport) str=str+' sendport='+Net.Print.port(trans.sendport); + if (trans.connport) str=str+' connport='+Net.Print.port(trans.connport); + str = str + ' hop='+trans.hop+'('+trans.hop_max+')'; + str = str + nl; + } + } + + str=str+'Connection Port Table: '+nl; + for(i in this.connections) { + var conn = this.connections[i]; + str=str+' '+Net.Print.port(i)+' -> CON('+ + (conn.send!=undefined?'+SEND ':'')+ + (conn.alive()?'ALIVE':'DEAD')+ + (conn.send==undefined?(' GET='+conn.stats.op_get):(' RCV='+conn.stats.op_receive))+ + (conn.send==undefined?(' PUT='+conn.stats.op_put):(' SND='+conn.stats.op_send))+ + (conn.send==undefined?(' ALV='+conn.stats.op_alive):(' LNK='+conn.stats.op_link))+ + (conn.send==undefined?(' FWD='+conn.stats.op_forward):(' PNG='+conn.stats.op_ping))+ + (conn.send==undefined?(' SCH='+conn.stats.op_schedule):(' POG='+conn.stats.op_pong))+ + (conn.send==undefined?(' MSG='+conn.stats.op_messages):'')+ + (conn.send==undefined?(' ENT='+conn.stats.op_noentr):'')+ + (conn.send==undefined?(' ERR='+conn.stats.op_error):'')+ + ')'+nl; + } + str=str+'RPC Server to Host Port Mapping Table: '+nl; + for(i in this.port_mapping) { + port = this.port_mapping[i]; + if (port!=undefined) + str=str+' '+Net.Print.port(i)+' -> '+Net.Print.port(port)+' ['+this.port_mapping_live[i]+']'+nl; + } + str=str+'Host to Connection Port Mapping Table: '+nl; + for(i in this.host_mapping) { + port = this.host_mapping[i]; + if (port!=undefined) + str=str+' '+Net.Print.port(i)+' -> '+Net.Print.port(port)+' ['+this.host_mapping_live[i]+']'+nl; + } + str=str+'Queued RPC Transactions: '+nl; + for(i in this.trans_queue) { + trans = this.trans_queue[i]; + str=str+' '+Net.Print.port(trans.header.h_port)+ ' ['+trans.timeout+'] '+ Net.Command.print(trans.header.h_command)+ nl; + } + str=str+'Queued RPC Lookups: LQ['+Array.length(this.lookup_queue)+'] LC['+Array.length(this.lookup_cache)+']'+nl; + for(i in this.lookup_queue) { + lookup = this.lookup_queue[i]; + if (lookup.operation == Rpc.Operation.LOOKUP) + str=str+' LOOKUP '+Net.Print.port(lookup.header.h_port)+ ' ['+lookup.timeout+']'+ nl; + else + str=str+' WHEREIS '+Net.Print.port(lookup.sendport)+ ' ['+lookup.timeout+']'+ nl; + } + str=str+'Queued RPC Server Requests: '+nl; + for(i in this.req_queue) { + trans = this.req_queue[i]; + str=str+' '+Net.Print.port(trans.header.h_port)+ ' ['+trans.timeout+']'+ nl; + } + str=str+nl+'Message Statistics'+nl+'----------------------------------------'+nl; + /* + op_transreq:0, + op_transrep:0, + op_getreq:0, + op_putrep:0, + op_lookup:0, + op_whereis:0, + op_hereis:0, + op_iamhere:0, + op_brokerreq:0, + op_brokerrep:0, + op_brokernoent:0, + op_transabort:0, + op_lookupabort:0 + */ + str=str+Printf.sprintf2([['%20s','TRANSREQ'],':',['%8d',this.stats.op_transreq],' ',['%20s','TRANSREP'],':',['%8d',this.stats.op_transrep]])+nl; + str=str+Printf.sprintf2([['%20s','GETREQ'],':',['%8d',this.stats.op_getreq],' ',['%20s','PUTREP'],':',['%8d',this.stats.op_putrep]])+nl; + str=str+Printf.sprintf2([['%20s','LOOKUP'],':',['%8d',this.stats.op_lookup],' ',['%20s','IAMHERE'],':',['%8d',this.stats.op_iamhere]])+nl; + str=str+Printf.sprintf2([['%20s','WHEREIS'],':',['%8d',this.stats.op_whereis],' ',['%20s','HEREIS'],':',['%8d',this.stats.op_hereis]])+nl; + if (this.brokerconn != undefined) { + str = str + Printf.sprintf2([['%20s', 'BROKERREQ'], ':', ['%8d', this.stats.op_brokerreq], ' ', ['%20s', 'BROKERREP'], ':', ['%8d', this.stats.op_brokerrep]]) + nl; + str = str + Printf.sprintf2([['%20s', 'BROKERNOENT'], ':', ['%8d', this.stats.op_brokernoent]]) + nl; + } + str=str+Printf.sprintf2([['%20s','TRANSABORT'],':',['%8d',this.stats.op_transabort],' ',['%20s','LOOKUPABORT'],':',['%8d',this.stats.op_lookupabort]])+nl; + return str; +}; + +/** +** Main entry function for RPC message routing. +** The calling of the routing function expects an already blocked caller process context +** except for remote transaction execution. + * + * The router serves different connection modes: + * + * 1. Client Router using Broker (supporting only passive receiving) + * 2. Client Router using P2P (supporting active sending) + * 3. Broker Server (passive or active sending) + * 4. Mixed mode: App + Broker + * + * + * @param {rpcio} rpcio + */ +rpcrouter.prototype.route = function(rpcio) { + var self=this; + var trans,body,msg,data,path,buf,rpcio2, + i,conn,connport,hostport,hostsrvport,transl, + whereis,lookup,lookups,res,rpcios, + local,hdr; + + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] ON ENTRY: '+Rpc.Print.rpcio(rpcio))); + Io.trace(trace||('[ROUT] ON ENTRY: '+Rpc.Print.rpcio(rpcio))); + + local = Net.Equal.port(rpcio.hostport,self.hostport); + + hdr=rpcio.header; + if (rpcio.hop>rpcio.hop_max) { + Io.log((self.verbose<0)||('[ROUT] Discarding RPCIO message with invalid hop count: '+Rpc.Print.rpcio(rpcio))); + self.pkt_discard(rpcio); + return; + } + if (rpcio.connport != undefined && rpcio.hostport!=undefined) self.add_host(rpcio.hostport,rpcio.connport); + + switch (rpcio.operation) { + case Rpc.Operation.TRANSREQ: + self.stats.op_transreq++; + + hostsrvport = self.lookup_port(hdr.h_port); + + if (hostsrvport && Net.port_cmp(hostsrvport,self.hostport)) { + Io.log((self.monitor<1)||('[ROUT] TRANSREQ: local RPC server, local (or remote) RPC client!')); + /****************************************** + ** Local RPC Server AND (Local OR Remote Transaction)! + ******************************************/ + // Check for local ready servers + rpcios = self.lookup_service( + hdr.h_port,rpcio, + function(reply) { + // Transfer header and data from server rpcio to client rpcio + if (rpcio.status==undefined) rpcio.status=Status.STD_OK; // RPC succeeded, right? + + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] TRANSREQ: Servicing reply callback srvport='+ + Net.Print.port(rpcio.header.h_port)+' tid='+rpcio.tid)); + Io.log((self.monitor<1)||('[ROUT] TRANSREQ: '+Rpc.Print.rpcio(rpcio))); + + Net.Copy.header(reply.header,rpcio.header); + Buf.buf_copy(rpcio,reply); + // Remote transaction? Send reply to broker back... + if (rpcio.context != undefined) { + // Local request already served, + // now unblock and schedule the client + Sch.Wakeup(rpcio.context); + Sch.ScheduleNext(); + } else { + /* + ** Remote transaction + * 1. Forwarded by a connection link + * 2. Handled by the broker + */ + // Swap host- and sendports, update hostport with this hostport! + var tmp = rpcio.hostport; + rpcio.hostport=self.hostport; + rpcio.sendport=tmp; + rpcio.operation=Rpc.Operation.TRANSREP; + rpcio.hop=0; + self.route(rpcio); + } + // do not pkt_discard(reply)! It comes from outside of the router + }); + if (rpcios!=undefined) { + // Found server, servers rpcio is already updated, now wake-up server process... + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] TRANSREQ: Being serviced for host '+ + Net.Print.port(rpcio.hostport)+': srv='+Net.Print.port(hdr.h_port)+' tid='+rpcio.tid)); + Sch.Wakeup(rpcios.context); + Sch.ScheduleNext(); + } else { + // actually no server available, wait ... + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] TRANSREQ: Suspend client for host '+ + Net.Print.port(rpcio.hostport)+': srv='+Net.Print.port(hdr.h_port)+' tid='+rpcio.tid)); + rpcio.timeout=Sch.GetTime()+Net.TIMEOUT; + self.add_trans(rpcio); + } + } else if (self.brokerconn && self.brokerconn.status==Net.Status.STD_OK) { + /* + *********************************** + ** Client-App. with Broker connection + *********************************** + ** A local transaction request from HERE to be sent to a remote server by using the broker. + ** TRANSREQ must be forwarded to the broker and finally to the RPC server. + ** Check rpcio.hostport==this.hostport to avoid message ping-pong loops between broker and this node. + ** The broker thinks the server is here, we received the message, but it is no longer here. + ** We do no message forwarding here. + */ + if (Net.Equal.port(rpcio.hostport,self.hostport)) { + /* + ** + ** Application client mode: transaction AND server lookup must be handled by broker server + */ + data = rpcio.to_json('simple'); + Io.log((self.monitor < 1) || ('[ROUT'+(timestamp?Perv.mtime():'')+ + '] TRANSREQ: Sending message to broker for RPC server ' + + Net.Print.port(hdr.h_port) + ' tid ' + rpcio.tid)); + // Wait for reply, do nothing, add this transaction again to transaction queue + Io.log((self.monitor < 1) || ('[ROUT] TRANSREQ: Await, client waiting for reply from RPC server ' + + Net.Print.port(hdr.h_port) + ' tid ' + rpcio.tid)); + rpcio.operation = Rpc.Operation.TRANSAWAIT; + rpcio.timeout = Sch.GetTime() + Net.TIMEOUT; + self.add_trans(rpcio); + msg = {rpc:'trans',hostport:self.hostport,tid:rpcio.tid,data:data}; + self.brokerconn.send(msg, function (reply) { + if (reply.status=='EWOULDBLOCK') { + // Expected + } else if (reply.status=='EINVALID') { + Perv.failwith('[ROUT] TRANSREQ broker HTTP PUT request returns unexpected reply:'+util.inspect(rpcio)+ + ', got'+reply); + } else { + // Cancel transaction! + } + }); + } else { + Io.warn('[ROUT'+(timestamp?Perv.mtime():'')+'] TRANSREQ: not from here ('+Net.Print.port(rpcio.hostport)+'), discard it: '+Net.Print.header(rpcio.header)); + self.pkt_discard(rpcio); + } + } else if (this.brokerconn) { + /* + *********************************** + ** Client-App. with Broker connection + *********************************** + ** Just put it in the transaction queue. + ** Queue transaction due to lost broker connection! + ** Must be forwarded later... + */ + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] TRANSREQ: Queuing transaction due to lost broker connection from host '+ + Net.Print.port(rpcio.hostport)+': srvport='+Net.Print.port(hdr.h_port)+' tid='+rpcio.tid+ ' for host'+ + Net.Print.port(hostsrvport))); + rpcio.timeout=Sch.GetTime()+Net.TIMEOUT; + self.add_trans(rpcio); + } else { + /* + ************************** + ** Client-App. with P2P connection and / or broker + ************************** + ** Just put it in the transaction queue. + ** 1. A client app. (the RPC server) must request the transaction for further processing (from broker server). + ** 2. If the server port is actually unknown, start lookup(srvport) + * 3. The server is located on another P2P-App. host (node), locate server, forward message. TODO + */ + if (local && hostsrvport==undefined) { + hostsrvport=self.lookup_port(hdr.h_port); + if (hostsrvport==undefined) { + Io.log((self.monitor < 1) || ('[ROUT' + (timestamp ? Perv.mtime() : '') + '] TRANSREQ: Doing lookup for srvport=' + Net.Print.port(hdr.h_port))); + lookup = this.new_lookup(hdr.h_port); + this.route(lookup); + } else { + rpcio.sendport=hostsrvport; + } + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] TRANSREQ: Queuing (local) transaction from host '+ + Net.Print.port(rpcio.hostport)+': srvport='+Net.Print.port(hdr.h_port)+' tid='+rpcio.tid)); + if (Net.Equal.port(rpcio.hostport,this.hostport)) { + // Local transaction client, remote server, pending lookup + // rpcio.operation = Rpc.Operation.TRANSAWAIT; + } + rpcio.timeout=Sch.GetTime()+Net.TIMEOUT; + self.add_trans(rpcio); + } else if (local && hostsrvport!=undefined) { + // Queuing and Forwarding (local) transaction + connport = self.lookup_host(hostsrvport); + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] TRANSREQ: Queuing and Forwarding (local) transaction from host '+ + Net.Print.port(rpcio.hostport) +' to host '+ + Net.Print.port(hostsrvport)+': srvport='+Net.Print.port(hdr.h_port)+' tid='+rpcio.tid+ + ' on connection port '+Net.Print.port(connport))); + rpcio.sendport=hostsrvport; + if (connport!=undefined) res=self.forward(rpcio, connport); + rpcio.timeout=Sch.GetTime()+Net.TIMEOUT; + rpcio.operation=Rpc.Operation.TRANSAWAIT; + self.add_trans(rpcio); + } else if (rpcio.sendport != undefined) { + // Discarding or Queuing or Forwarding transaction + connport = self.lookup_host(rpcio.sendport); + if (rpcio.hop>=rpcio.hop_max) { + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] TRANSREQ: Discarding transaction (hop count reached) from host '+ + Net.Print.port(rpcio.hostport)+' to host '+ + Net.Print.port(rpcio.sendport)+': srvport='+Net.Print.port(hdr.h_port)+' tid='+rpcio.tid)); + res=1; + self.pkt_discard(rpcio); + } + else if (connport!=undefined) { + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] TRANSREQ: Forwarding transaction from host '+ + Net.Print.port(rpcio.hostport)+' to host '+ + Net.Print.port(rpcio.sendport)+': srvport='+Net.Print.port(hdr.h_port)+' tid='+rpcio.tid+ + ' on connection port '+Net.Print.port(connport))); + res=self.forward(rpcio, connport, function (stat,rpcio) {self.pkt_discard(rpcio)}); + } + else if (!connport) { + /* + ** Look-up host port (WHEREIS)!! + */ + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] TRANSREQ: Queueing and doing host lookup for sendport='+Net.Print.port(rpcio.sendport))); + whereis = self.add_hostlookup(rpcio.sendport); + rpcio.timeout=Sch.GetTime()+Net.TIMEOUT; + self.add_trans(rpcio); + self.route(whereis); + } + if (res==0) { + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] TRANSREQ: Queuing transaction from host '+ + Net.Print.port(rpcio.hostport)+': srvport='+Net.Print.port(hdr.h_port)+' tid='+rpcio.tid)); + rpcio.timeout=Sch.GetTime()+Net.TIMEOUT; + self.add_trans(rpcio); + } + } else { + // Broker-app RPC client + hostsrvport=self.lookup_port(hdr.h_port); + conn=undefined; + if (hostsrvport==undefined) { + Io.log((self.monitor < 1) || ('[ROUT' + (timestamp ? Perv.mtime() : '') + '] TRANSREQ: Doing lookup for srvport=' + Net.Print.port(hdr.h_port))); + lookup = this.new_lookup(hdr.h_port); + this.route(lookup); + } else { + rpcio.sendport=hostsrvport; + connport = self.lookup_host(rpcio.sendport); + if (connport) conn = self.find_conn(connport); + } + + + if (conn && conn.alive() && conn.send != undefined) { + if (rpcio.hop>=rpcio.hop_max) { + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] TRANSREQ: Discarding transaction (hop count reached) from host '+ + Net.Print.port(rpcio.hostport)+' to host '+ + Net.Print.port(rpcio.sendport)+': srvport='+Net.Print.port(hdr.h_port)+' tid='+rpcio.tid)); + res=1; + self.pkt_discard(rpcio); + } else { + // Remote RPC server, forward transaction + Io.log((self.monitor < 1) || ('[ROUT' + (timestamp ? Perv.mtime() : '') + '] TRANSREQ: Forwarding transaction from host ' + + Net.Print.port(rpcio.hostport) + ' to host ' + + Net.Print.port(rpcio.sendport) + ': srvport=' + Net.Print.port(hdr.h_port) + ' tid=' + rpcio.tid + + ' on connection port ' + Net.Print.port(connport))); + res = self.forward(rpcio, connport, function (stat, rpcio) { + self.pkt_discard(rpcio) + }); + } + } else { + // Broker-app RPC server? + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] TRANSREQ: Queuing transaction from host '+ + Net.Print.port(rpcio.hostport)+': srvport='+Net.Print.port(hdr.h_port)+' tid='+rpcio.tid)); + if (Net.Equal.port(rpcio.hostport,this.hostport)) { + // Local transaction client, remote server, pending lookup + // rpcio.operation = Rpc.Operation.TRANSAWAIT; + } + rpcio.timeout = Sch.GetTime() + Net.TIMEOUT; + self.add_trans(rpcio); + } + } + + } + break; + + case Rpc.Operation.TRANSREP: + self.stats.op_transrep++; + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] TRANSREP: for host='+ + Net.Print.port(rpcio.sendport)+' RPC server='+Net.Print.port(hdr.h_port)+' tid='+rpcio.tid)); + if (rpcio.sendport && Net.Equal.port(rpcio.sendport,self.hostport)) { + /************************************ + ** Local RPC Client, remote Server! + ************************************ + ** Remote reply for local waiting transaction process + */ + // lookup matching TRANSAWAIT rpcio... + + trans = self.lookup_trans_await(rpcio.header.h_port, rpcio.tid, rpcio); + Io.log((self.monitor < 1) || ('[ROUT' + (timestamp ? Perv.mtime() : '') + '] TRANSREP: Matching transaction ' + Rpc.Print.rpcio(trans))); + if (trans) { + trans.status = rpcio.status || trans.header.h_status; + trans.operation=Rpc.Operation.TRANSREP; + if (trans.callback) trans.callback(trans); else { + Sch.Wakeup(trans.context); + Sch.ScheduleNext(); + } + self.pkt_discard(rpcio); + } else { + // Sent by the garbage collector? + if (rpcio.callback) { + rpcio.callback(rpcio); + if (rpcio.index!=-1) self.pkt_discard(rpcio); + } + else if (rpcio.context && Sch.IsBlocked(rpcio.context)) { + // Usually the context process will release the package! + Sch.Wakeup(rpcio.context); + Sch.ScheduleNext(); + } + } + + } else if (this.brokerconn && this.brokerconn.status==Net.Status.STD_OK) { + /* + ************************************************ + ** Client-App. with Broker conenction, Local Server + ************************************************ + ** Send transaction reply to broker + */ + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] TRANSREP: Sending reply to broker for '+ + Net.Print.port(rpcio.sendport)+': '+Net.Print.port(hdr.h_port)+' tid='+rpcio.tid)); + data = rpcio.to_json('simple'); + msg = {rpc:'reply',hostport:rpcio.hostport,sendport:rpcio.sendport, + tid:rpcio.tid, data:data} + self.brokerconn.send(msg); + self.pkt_discard(rpcio); + } else if (!this.brokerconn) { + /* + ************************** + ** Client-App. with P2P connection or this is Broker + ************************** + ** + ** 1a. We are the broker: Queue transaction reply for a Client-app RPC client (!broker.forward) + ** 1b. We are the broker: Forward transaction reply to a Client-app RPC client (broker.forward) + ** 2. The server is located on another P2P-App. host (node), forward reply message. + ** 3. The server is here, forward reply message. + */ + if (rpcio.hop == 0) { + // Reply from here, set-up rpcio for forwarding... + rpcio.connport = undefined; + } + + res = 0; + + if (this.broker.forward) { + data = rpcio.to_json('extended'); + msg = {rpc:'reply',hostport:rpcio.hostport,sendport:rpcio.sendport, + tid:rpcio.tid, data:data} + res = this.broker.forward(msg); + } + if (res == 0) { + connport = self.lookup_host(rpcio.sendport); + if (connport) { + // P2P + conn = self.find_conn(connport); + if (conn.send != undefined) { + Io.log((self.monitor < 1) || ('[ROUT' + (timestamp ? Perv.mtime() : '') + '] TRANSREP: Forwarding reply to source host ' + + Net.Print.port(rpcio.sendport) + + ' on connection port ' + + Net.Print.port(connport))); + if (rpcio.hop < rpcio.hop_max && !Net.Equal.port(connport, rpcio.connport)) { + res = self.forward(rpcio, connport, function (stat, rpcio) { + self.pkt_discard(rpcio) + }); + } else { + res = 1; + self.pkt_discard(rpcio); + } + } // else queue it! + } else { + if (rpcio.hop < rpcio.hop_max) { + /* + ** Look-up host port (WHEREIS)!! + */ + res = 1; + Io.log((self.monitor < 1) || ('[ROUT' + (timestamp ? Perv.mtime() : '') + '] TRANSREP: Doing WHEREIS and Queuing transaction reply')); + rpcio.timeout = Sch.GetTime() + Net.TIMEOUT; + this.add_trans(rpcio); // will wake up pending transaction request (this.event() -> connection.schedule) + whereis = self.add_hostlookup(rpcio.sendport); + self.route(whereis); + } else { + res = 1; + self.pkt_discard(rpcio); + } + } + } + if (res == 0) { + Io.log((self.monitor < 1) || ('[ROUT' + (timestamp ? Perv.mtime() : '') + '] TRANSREP: Queuing transaction reply for Broker-App [tid='+rpcio.tid+ + ']')); + rpcio.timeout = Sch.GetTime() + Net.TIMEOUT; + this.add_trans(rpcio); // will wake up pending transaction request (this.event() -> connection.schedule) + } + + } + break; + + case Rpc.Operation.GETREQ: + self.stats.op_getreq++; + this.add_port(rpcio.pubport); + trans = this.lookup_trans(rpcio.pubport,rpcio); + if (trans != undefined) { + // Matching and pending transaction found, service the request immed. ... + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] GETREQ: Servicing trans '+Net.Print.port(rpcio.pubport)+' tid '+trans.tid)); + Sch.SetBlocked(false); + } else { + // After a TRANS request was received, this process will be woken up by lookup_service and the scheduler! + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] GETREQ: Waiting '+Net.Print.port(rpcio.pubport))); + this.add_service(rpcio); + } + break; + + case Rpc.Operation.PUTREP: + self.stats.op_putrep++; + if (rpcio.callback != undefined) { + // local transaction, client will be unblocked in callback! + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] PUTREP: Servicing reply callback '+Net.Print.port(rpcio.pubport)+' tid '+rpcio.tid)); + rpcio.callback(rpcio); + rpcio.callback=undefined; + } else { + // transaction managed by broker + } + Sch.SetBlocked(false); + break; + + case Rpc.Operation.LOOKUP: + self.stats.op_lookup++; + + if (!this.brokerconn) { + hostsrvport = undefined; + hostsrvport = this.lookup_port(hdr.h_port); + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] LOOKUP RPC Server port ' + + Net.Print.port(hdr.h_port) + ' -> Host port? ' + + Net.Print.port(hostsrvport))); + /* + ************************** + ** Client-App. with P2P connection or Broker + ************************** + ** + ** We are the broker or a P2P-APP: + * 1. Check RPC server port, is it here? + * 2. Broadcast lookup to all known links of connected hosts excluding the incoming link port + * + */ + if (hostsrvport != undefined) { + /* + ** Send back an IAMHERE message + */ + Io.log((self.monitor < 1) || ('[ROUT' + (timestamp ? Perv.mtime() : '') + '] LOOKUP: Sending IAMHERE reply to host ' + + Net.Print.port(rpcio.hostport) + ' for RPC server ' + Net.Print.port(hdr.h_port))); + rpcio.operation = Rpc.Operation.IAMHERE; + rpcio.hop = 0; + rpcio.sendport = rpcio.hostport; + rpcio.hostport = hostsrvport; + res = self.forward(rpcio, undefined, function (stat, rpcio) { + self.pkt_discard(rpcio); + }); + } else if (hostsrvport==undefined && rpcio.hop < rpcio.hop_max) { + /* + ** Broadcast message... + */ + rpcio.hop++; + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] LOOKUP: Broadcasting LOOKUP from host '+ + Net.Print.port(rpcio.hostport)+' for RPC server '+Net.Print.port(hdr.h_port))); + self.broadcast(rpcio); + // Maybe we have client-app hosts connected to the broker server, we must add the lookup message to the pending queue!!! + if (!local && !self.brokerserver) self.pkt_discard(rpcio); + else if (self.brokerserver) { + rpcio.timeout=Sch.GetTime()+Net.TIMEOUT; + self.add_lookup(rpcio); + } + } + } + break; + + case Rpc.Operation.WHEREIS: + self.stats.op_whereis++; + if (Net.Equal.port(rpcio.sendport,self.hostport)) { + /* + ** Send back an HEREIS message + */ + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] WHEREIS: Sending HEREIS reply to '+ + Net.Print.port(rpcio.hostport))); + rpcio.operation = Rpc.Operation.HEREIS; + rpcio.hop = 0; + rpcio.sendport = rpcio.hostport; + rpcio.hostport = self.hostport; + res=self.forward(rpcio, undefined, function () { + self.pkt_discard(rpcio); + }); + } else if (rpcio.hop < rpcio.hop_max) { + // Broadcast message to all connections w/o incoming connection + rpcio.hop++; + self.broadcast(rpcio); + if (!local && !self.brokerserver) self.pkt_discard(rpcio); + else if (self.brokerserver) { + rpcio.timeout=Sch.GetTime()+Net.TIMEOUT; + self.add_lookup(rpcio); + } + } + break; + + case Rpc.Operation.IAMHERE: + self.stats.op_iamhere++; + if (this.brokerconn && this.brokerconn.status==Net.Status.STD_OK) { + /* + ************************** + ** Client-App. with Broker connection + **************************/ + + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] IAMHERE: Sending reply to broker from host port '+ + Net.Print.port(rpcio.hostport)+' for RPC server port '+Net.Print.port(hdr.h_port))); + // path = '/?rpc=iamhere&hostport='+Net.port_to_str(rpcio.hostport)+'&srvport='+Net.port_to_str(rpcio.header.h_port); + msg = {rpc:'iamhere',hostport:rpcio.hostport,srvport:rpcio.header.h_port}; + self.brokerconn.send(msg,function () {}); + // self.brokerconn.get(path,function () {}); + self.pkt_discard(rpcio); + } else if (!this.brokerconn) { + /* + ************************** + ** Client-App. or Broker with P2P connection + ************************** + */ + // TODO + if (rpcio.connport!=undefined) { + // original host port + hostport=rpcio.hostport; + if (Net.Equal.port(rpcio.sendport,self.hostport)) { + /* + ** 1. Destination reached, it is for this host. + * 2. We are the broker server: Stalled broker client transaction lookup + * 2. We are the broker server: Pending lookup from another host and reply from a broker client + */ + self.add_port(rpcio.header.h_port,rpcio.hostport); + transl = self.find_all_trans(rpcio.header.h_port); + for(i in transl) { + trans=transl[i]; + if (trans.operation==Rpc.Operation.TRANSREQ) { + trans.sendport = rpcio.hostport; + if (Net.Equal.port(trans.hostport,self.hostport)) { + // Local transaction client, TRANSREQ already queued + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] IAMHERE: Forwarding transaction from host '+ + Net.Print.port(trans.hostport)+' to host '+Net.Print.port(trans.sendport)+': srvport='+ + Net.Print.port(trans.header.h_port)+' tid='+trans.tid+ + ' on connection port '+Net.Print.port(rpcio.connport))); + res=self.forward(trans, rpcio.connport); + trans.operation = Rpc.Operation.TRANSAWAIT; + } else { + // Routing only + conn=self.find_conn(rpcio.connport); +console.log(rpcio) + if (conn && conn.send==undefined) { + // Keep transaction queued for Client-App RPC Server + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] IAMHERE: Keeping transaction queued from host '+ + Net.Print.port(trans.hostport)+': srvport='+Net.Print.port(trans.header.h_port)+' tid='+trans.tid)); + } else { + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] IAMHERE: Forwarding transaction from host '+ + Net.Print.port(trans.hostport)+' to host '+Net.Print.port(trans.sendport)+': srvport='+ + Net.Print.port(trans.header.h_port)+' tid='+trans.tid+ + ' on connection port '+Net.Print.port(rpcio.connport))); + self.remove_trans(trans); + res = self.forward(trans, rpcio.connport, function (stat, rpcio) { + self.pkt_discard(rpcio) + }); + if (res == 0) { + // ?? + + } + } + } + } + } + /* + ** Check pending LOOKUP messages and forward IAMHERE message... + */ + lookups = self.remove_lookup(rpcio.header.h_port); + for (i in lookups) { + // stalled lookups + lookup=lookups[i]; + if (lookup.connport != undefined) { + conn=self.find_conn(lookup.connport); + if (conn && conn.alive() && conn.send != undefined) { + // Forward message... + lookup.operation=Rpc.Operation.IAMHERE; + lookup.sendport=lookup.hostport; + lookup.hostport=hostport; + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] IAMHERE: Forwarding IAMHERE from host '+ + Net.Print.port(lookup.hostport)+' to host '+ + Net.Print.port(lookup.sendport)+' on port '+ + Net.Print.port(lookup.connport))); + lookup.hop=0; + res=self.forward(lookup,lookup.connport,function(stat,rpcio){self.pkt_discard(rpcio);}); + } else self.pkt_discard(lookup); + } else self.pkt_discard(lookup); + } + self.pkt_discard(rpcio); + } else { + // Forward message... + connport=self.lookup_host(rpcio.sendport); + Io.log((self.monitor<1)||('[ROUT'+(timestamp?Perv.mtime():'')+'] IAMHERE: Forwarding IAMHERE from host '+ + Net.Print.port(rpcio.hostport)+' to host '+Net.Print.port(rpcio.sendport)+' on port '+Net.Print.port(connport))); + if (connport && !Net.Equal.port(connport,rpcio.connport) && + rpcio.hop Host port ' +Net.Print.port(mysrv))); + if (mysrv) { + rpcio.operation= Rpc.Operation.IAMHERE; + rpcio.sendport = rpcio.hostport; + rpcio.hostport = self.hostport + } else { + self.pkt_discard(rpcio); + rpcio=undefined; + } + } + Io.log((self.monitor<10)||('[ROUH] Broker Service: received ' + Rpc.Print.rpcio(rpcio))); + Io.trace(trace||('[ROUH] Broker Service: received ' + Rpc.Print.rpcio(rpcio))); + if (rpcio) { routed++; self.route(rpcio);} + } + return routed; +} + +/** + ** 1. Start a broker service handler (client-side appl.) + ** 2. Start a garbage collector service handler + * + * @param {number} [interval] + */ +rpcrouter.prototype.start = function (interval) { + var self=this; + var rpcio,i; + + if (!interval) interval=100; + + var step=interval/20; + + if (this.brokerconn != undefined && this.brokerconn.service != undefined) { + self.brokerconn.pending=0; + Io.log((self.monitor<1)||('[ROUT] Starting broker request service ')); + // Install broker service handler + Sch.AddTimer(interval, 'RPC broker service', function (context) { + Io.log((self.monitor<3)||('[ROUH] broker.service run '+Status.print(self.brokerconn.status)+ ' '+ self.brokerconn.pending)); + + if (self.brokerconn.status==Net.Status.STD_OK && self.brokerconn.pending<=0) { + var timeout=interval*10; + self.brokerconn.pending=timeout; + Io.log((self.monitor<2)||('[ROUH] REQ...')); + Io.trace(trace||('[ROUH] REQ...')); + /* + ** Check for available TRANS messages for THIS application identified by the app. port (name..) ... + ** The broker request is blocked until RPC transactions are available or a timeout occurred. + */ + self.brokerconn.service({hostport:self.hostport,timeout:timeout},function (reply) { + var msg,rpc,i,res; + self.stats.op_brokerrep++; + self.brokerconn.pending=0; +//console.log(reply) + if (reply.status=='EINVALID') { + // Not a valid reply, communication error or wrong server. + self.stats.op_error++; + self.brokerconn.rpccon.stats.op_error++; + Io.out('[ROUH] Received invalid HTTP message w/o data'); + } else if (reply.status!='ENOENTR') { + /* + ** We can get more than one message contained in the reply: ...... + ** including lookup/WHOIS messages + */ + self.parse(reply); + // Force new execution of this handler immediately + Sch.Wakeup(context); + context.timer=Sch.time; + Sch.ScheduleNext(); + + } else { + self.stats.op_brokernoent++; + self.brokerconn.rpccon.stats.op_noentr++; + // callback('ENOENTR'); + // Force new execution of this handler immediately + Sch.Wakeup(context); + context.timer=Sch.time; + Sch.ScheduleNext(); + } + }); + } else if (self.brokerconn.status!=Net.Status.STD_OK) self.brokerconn.pending=0; + else if (self.brokerconn.pending>0) self.brokerconn.pending=self.brokerconn.pending-step; + }) + } + // Install a garbage collector service + Sch.AddTimer(interval, 'RPC Garbage Collector and Timeout Service', function () { + var i,remove,trans,lookup,cache,entry,key,live,time, + lookups=[], + caches=[], + transactions=[], + routing=[], + hostport; + + Io.log((self.monitor<4)||('[ROUG] garbage collector run')); + // TODO Locking!!!! + time=Sch.GetTime(); + /* + ** Pending look-up requests... + */ + for(i in self.lookup_queue) { + lookup=self.lookup_queue[i]; + cache=self.lookup_cache[i]; + remove=false; + if (lookup.timeout > 0 && lookup.timeout <= time) { + // remove lookup entry, no response + self.stats.op_lookupabort++; + remove=true; + self.pkt_discard(lookup); + if (lookup.operation == Rpc.Operation.LOOKUP) + Io.log((self.verbose<1)||('[ROUG] garbage collector: removing lookup (LOOKUP) for RPC server '+Net.Print.port(lookup.header.h_port))); + else + Io.log((self.verbose<1)||('[ROUG] garbage collector: removing lookup (WHERIS) for host '+Net.Print.port(lookup.sendport))); + } + if (!remove) { + lookups.push(lookup); + caches.push(cache); + } + } + self.lookup_queue=lookups; + self.lookup_cache=caches; + /* + ** Pending transactions... + */ + for(i in self.trans_queue) { + trans=self.trans_queue[i]; + remove=false; + // TODO: check for successfull server lookups and forward matching transactions... !? + + if (trans.timeout > 0 && trans.timeout <= time) { + self.stats.op_transabort++; + // Reduce port and host mapping cache lifetimes + hostport=self.port_mapping[trans.header.h_port]; + if (hostport) { + self.port_mapping_live[trans.header.h_port]=1; + if (self.host_mapping[hostport]) self.host_mapping_live[hostport]=1; + } + + // remove transaction entry, no response, deliver RPC_FAILURE + if (trans.operation == Rpc.Operation.TRANSAWAIT) { + trans.header.h_status = Net.Status.RPC_FAILURE; + trans.status = Net.Status.RPC_FAILURE; + trans.operation = Rpc.Operation.TRANSREP; + self.swapports(trans); + Buf.buf_init(trans); + Buf.buf_put_string(trans,'RPC_FAILURE'); + if (trans.context) { + Sch.Wakeup(trans.context); + Sch.ScheduleNext(); + } + remove = true; + Io.log((self.verbose<1)||('[ROUG] garbage collector: aborting local transaction to RPC server ' + + Net.Print.port(trans.header.h_port) + ' tid=' + trans.tid)); + } else if (trans.operation == Rpc.Operation.TRANSREQ) { + trans.header.h_status = Net.Status.RPC_FAILURE; + trans.status = Net.Status.RPC_FAILURE; + trans.operation = Rpc.Operation.TRANSREP; + self.swapports(trans); + Buf.buf_init(trans); + Buf.buf_put_string(trans,'RPC_FAILURE'); + trans.timeout=Sch.GetTime()+Net.TIMEOUT; + remove = true; + routing.push(trans); + Io.log((self.verbose<1)||('[ROUG] garbage collector: removing remote transaction (TRANSREQ) to RPC server ' + + Net.Print.port(trans.header.h_port) + ' tid=' + trans.tid)); + } else if (trans.operation == Rpc.Operation.TRANSREP) { + remove = true; + self.pkt_discard(trans); + Io.log((self.verbose<1)||('[ROUG] garbage collector: removing transaction reply (TRANSREP) from RPC server ' + + Net.Print.port(trans.header.h_port) + ' tid=' + trans.tid)); + } + } + + if (!remove) transactions.push(trans); + } + self.trans_queue=transactions; + + /* + ** Garbage collection of look-up tables (hash tables) + */ + for (key in self.port_mapping) { + if (!Net.Equal.port(self.port_mapping[key],self.hostport)) { + live = self.port_mapping_live[key]; + if (live == 0) { + self.port_mapping_live[key] = undefined; + self.port_mapping[key] = undefined; + } else if (live > 0) self.port_mapping_live[key]--; + } + } + for (key in self.host_mapping) { + live = self.host_mapping_live[key]; + if (live==0) { + if (self.verbose>0) + Io.out('[ROUG] Removing host '+Net.Print.port(key)); + self.host_mapping_live[key]=undefined; + self.host_mapping[key]=undefined; + } else self.host_mapping_live[key]--; + } + + for (i in routing) { + trans=routing[i]; + self.route(trans); + } + }); + +}; + +/** Stop all services + * + */ +rpcrouter.prototype.stop = function () { + Sch.RemoveTimer('RPC Garbage Collector and Timeout Service'); + Sch.RemoveTimer('RPC broker service'); + +} +/** + * + * @param {port} hostport + * @returns {rpcrouter} + * @constructor + */ +function RpcRouter(hostport,options) { + var obj = new rpcrouter(hostport,options); + Object.preventExtensions(obj); + return obj; +} + +module.exports = { + RpcRouter: RpcRouter +};