From e5b2f7c6054c419d8a031e8825fbebae1c166a19 Mon Sep 17 00:00:00 2001 From: sbosse Date: Mon, 21 Jul 2025 22:47:38 +0200 Subject: [PATCH] Mon 21 Jul 22:43:21 CEST 2025 --- js/dos/connFIFOsrv.js | 621 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 621 insertions(+) create mode 100644 js/dos/connFIFOsrv.js diff --git a/js/dos/connFIFOsrv.js b/js/dos/connFIFOsrv.js new file mode 100644 index 0000000..73a45ee --- /dev/null +++ b/js/dos/connFIFOsrv.js @@ -0,0 +1,621 @@ +/** + ** ============================== + ** O O O OOOO + ** O O O O O O + ** O O O O O O + ** OOOO OOOO O OOO OOOO + ** O O O O O O O + ** O O O O O O O + ** OOOO OOOO O O OOOO + ** ============================== + ** Dr. Stefan Bosse http://www.bsslab.de + ** + ** COPYRIGHT: THIS SOFTWARE, EXECUTABLE AND SOURCE CODE IS OWNED + ** BY THE AUTHOR(S). + ** THIS SOURCE CODE MAY NOT BE COPIED, EXTRACTED, + ** MODIFIED, OR OTHERWISE USED IN A CONTEXT + ** OUTSIDE OF THE SOFTWARE SYSTEM. + ** + ** $AUTHORS: Stefan Bosse + ** $INITIAL: (C) 2006-2017 bLAB + ** $CREATED: 31-10-16 by sbosse. + ** $VERSION: 1.2.2 + ** + ** $INFO: + ** + ** ============================================== + ** DOS: Broker Connection Module + ** Server, FS FIFO communication, limited to 2 connections (node.js bug) + ** Data transfer: JSON + ** ============================================== + * + * + * Two unidirectional FIFOs are used for bidirectional client-server communication. + * There is one shared public channel request link, and multiple private P2P client-server links (FIFO pair). + * Clients writing initially to the request channel to get a free private channel from the server. + * Note: All clients simultaneously requesting a channel will receive multiple replies for different clients! + * A FIFO channel is shared by all participants! + * + * + * + * Notes: + * Superfluous? All http function callback computations are wrapped by scheduler callbacks + * to avoid main thread preemption! + * Is event queuing always guaranteed in JS? + * + * + * + ** $ENDOFINFO + */ +"use strict"; +var log = 0; + +var Io = Require('com/io'); +var Net = Require('dos/network'); +var Buf = Require('dos/buf'); +var Rpc = Require('dos/rpc'); +var util = Require('util'); +var net = Require('net'); +var Sch = Require('dos/scheduler'); +var Comp = Require('com/compat'); +var Perv = Comp.pervasives; +var String = Comp.string; +var Array = Comp.array; +var Filename = Comp.filename; +var trace = Io.tracing; +var div = Perv.div; +var Conn = Require('dos/connutils'); +var Fs = Require('fs'); + +var isNode = Comp.isNodeJS(); + +var Mode = Conn.Mode; + + +/************************** + ** FIFO Broker SERVER Interface + **************************/ +/** + * + * @param {port} hostport + * @param {string} srv_url + * @param {string} srv_ipport + * @param {rpcrouter} router + * @constructor + */ +// tyepof options = {hostport,path,channels,router,verbose,env,mode} +// typeof mode = 'unidir'|'bidir' +var Server = function(options) { + this.env=options.env||{}; + this.hostport=options.hostport; // Public communication Net.Port == Host port + this.status = Net.Status.STD_OK; + this.router=options.router; + this.app=undefined; + this.server=undefined; + this.client_queue=[]; + this.hosts={}; + this.lock=false; + this.mode=Mode.TWOCHAN; + this.keepalive=options.keepalive||true; + + // FIFO file system path + this.path=options.path||'/tmp/broker'; + // Number of channels + this.channels=Math.min(options.channels||2,2); + + this.conn_port = Net.uniqport(); + this.rpccon=Rpc.RpcConn( + self.conn_port, + /* + ** Connection Forward and Deliver Operation + ** ONECHAN client connectivity: collect + ** TWOCHAN client connectivity: forward + */ + function(rpcio,callback) { + // TODO rpcio -> msg + }, + // We're handling multiple client connections - we're alive + function() { + return true; + } + ); + this.verbose=options.verbose||0; + this.enabled=false; + this.warned=0; + this.last=undefined; + + // Transfer of multi-part messages? + this.multipart=true; +}; + +/** Client-App request (collecting pending and queued TRANSREQ/TRANSREP/LOOKUP messages) + * Used only in ONECHAN mode!! + */ + +Server.prototype.collect = function (self, msg) { // HTTP Callback handler + var res, + timeout=(msg.timeout != undefined) ? (Perv.int_of_string(msg.timeout)) : 0; + if (timeout) msg.timeout=timeout; + + this.rpccon.stats.op_forward++; + /* + ** 1. Forward TRANS message requests if there are pending requests for the host port. + ** 2. Forward TRANS message replies if there are pending replies for the host port. + ** 3. Forward LOOKUP messages (broadcast) + ** If there are actually no messages, add this request to the request queue (if timeout <>0). + */ + /* + ** Forward all pending transaction requests and replies... + */ + Io.log(((log+self.verbose) < 2) || ('[BFIS] collect for host ' + Net.Print.port(msg.hostport)+ + ' #trans='+ Array.length(self.router.trans_queue)+' #lookup='+Array.length(self.router.lookup_queue))); + res=this.schedule_client(msg,true); + + if (res && msg.timeout) + self.client_queue.push(msg); +}; + +/** Client-App forwarding (send pending and queued TRANSREQ/TRANSREP/LOOKUP messages) + * Used only in TWOCHAN mode!! Called from router. + */ + +Server.prototype.forward = function (msg) { // HTTP Callback handler + var res=0, + self=this; + /* + ** 1. Forward TRANS message request if there are pending requests for the host port. + ** 2. Forward TRANS message reply if there are pending replies for the host port. + ** 3. Forward LOOKUP messages (broadcast) + */ + /* + ** Forward all pending transaction requests and replies... + */ + Io.log(((log+self.verbose) < 2) || ('[BFIS] forward for host ' + Net.Print.port(msg.hostport)+ + ' #trans='+ Array.length(self.router.trans_queue)+' #lookup='+Array.length(self.router.lookup_queue))); + + if (msg.status==undefined) msg.status='EOK'; + + // console.log(msg) + if (this.hosts[msg.sendport]) { + res++; + self.send(self.reply(this.hosts[msg.sendport],msg)); + } + + if (res) this.rpccon.stats.op_forward++; + + return res; +}; + +/** Initialize this server + * + */ +Server.prototype.init = function () { + var self = this,pathI,pathO; + // Service client requests + this.enabled=true; + function receiver (i) { + var chI, chO, + pathI=self.path+i+'I', + pathO=self.path+i+'O'; + if (!Fs.existsSync(pathO)) { + Io.out('[BFIS] Cannot open ' + pathO+': not existing!'); + return; + } + if (!Fs.existsSync(pathI)) { + Io.out('[BFIS] Cannot open ' + pathI+': not existing!'); + return; + } + chO=Fs.createWriteStream(pathO); + + function listen () { + // Wait for a client connecting to the FIFO + Io.out('[BFIS] Listening on ' + pathI+' in mode '+self.mode+' '+(self.keepalive?'KEEPALIVE':'')); + chI=Fs.createReadStream(pathI); + chI.on('open',function () { + Io.log(((log+self.verbose)<2)||('[BFIS] Connected on ' + pathI)); + }); + chI.on('close',function () { + Io.log(((log+self.verbose)<2)||('[BFIS] Disconnect on ' + pathI)); + if (self.enabled) listen(); + }); + chI.on('error',function () { + Io.log(((log+self.verbose)<2)||('[BFIS] Disconnect on ' + pathI)); + if (self.enabled) listen(); + }); + chI.on('data',function (chunk) { + var req,part,parts,data; + Io.log(((log+self.verbose)<2)||('[BFIS] Received: ' + chunk.length)); + data=chunk.toString('ascii', 0, chunk.length); + Io.log(((log+self.verbose)<2)||('[BFIS] Received: ' + data)); + parts=Conn.splitData(data); + if (self.last) parts[0]=self.last+parts[0]; + if (Array.last(parts) != '') self.last=Array.last(parts); else self.last=undefined; + + // console.log(parts) + + for(part in parts) { + if (parts[part]=='') continue; + req = JSON.parse(parts[part]); + if (self.multipart) Conn.decode(req); + req.socket=chO; + self.request(self,req); + } + }); + } + listen(); + } + this.server=[]; + for (var i=0;i0) { + self.send(self.reply(client,{status:'EOK',data:data})); + return false; + } else if (client.timeout<=0) { + self.send(self.reply(client,{status:'ENOENTR'})); + return false; + } else return true; +} + +/* +** Schedule all pending client-side requests if there is matching input. +** Called from router function. Lock required! +*/ + +Server.prototype.schedule = function () { + var self = this; + + if (self.lock) return; + this.rpccon.stats.op_schedule++; + Io.log(((log+self.verbose)<2) || ('[BFIS] server schedule ['+this.client_queue.length+'] .. ')); + Io.trace(trace || ('[BFIS] server schedule ['+this.client_queue.length+'] .. ')); + self.lock = true; + self.client_queue = Array.filter(self.client_queue, function (client) { + /* + * Forward all matching pending transaction requests and replies... + */ + Io.log(((log+self.verbose) < 2) || ('[BFIS] schedule for client host ' + Net.Print.port(client.hostport)+ + ' #trans='+ Array.length(self.router.trans_queue)+' #lookup='+Array.length(self.router.lookup_queue))); + return self.schedule_client(client,true); + }); + self.lock = false; +}; + +/** Send a message to a destination + * + */ +// function send(repsonse:socket,data:string) +Server.prototype.send = function (msg) { + var client, + self=this, + socket, + data; + //response.writeHead(200, {'Content-Type': 'text/plain; charset=utf8'}); + //response.writeHead(200); + this.rpccon.stats.op_send++; + socket=msg.socket; msg.socket=undefined; + Conn.encode(msg); + + Io.log(((log+this.verbose)<2)||('[BFIS] send ['+util.inspect(msg)+'] -> '+socket)); + if (socket) { + data = JSON.stringify(msg); +// console.log(data); + if (this.multipart) socket.write(data+Conn.EOM); + else socket.write(data); + this.warned=0; + }; +}; + + + + + +/** + * + * @param {number} [interval] + */ +Server.prototype.start = function (interval) { + var self = this; + if (!interval) interval=1000; + /* + ** Start a client service and garbage collector for blocked client-side transactions (self.client_queue) + */ + Sch.AddTimer(interval, 'Broker Client Service and Garbage Collector', function (context) { + var p; + Io.log(((log+self.verbose)<4)||('[BFIS] Client Service and Garbage Collector')); + if (!self.lock) { + self.lock = true; + + if (self.mode==Mode.ONECHAN) + self.client_queue = Array.filter(self.client_queue, function (client /* msg */) { + client.timeout=client.timeout-interval; + return self.schedule_client(client,true); + }); + else { + for (p in self.hosts) { + // console.log(self.hosts[p]) + if (self.hosts[p]) { + self.hosts[p].timeout = self.hosts[p].timeout - interval; + if (self.hosts[p].timeout <= 0) { + Io.log(((log+self.verbose)<1)||('[BFIS] Host cache: Removing host '+ + ' hostport='+Net.Print.port(self.hosts[p].hostport))); + self.hosts[p]=undefined; + } + } + } + } + + self.lock = false; + } else { + context.blocked=false; + context.timer=Sch.time; + Sch.ScheduleNext(); + } + }); +}; + + +module.exports = { + /** RCP Broker Server for client-capable-only applications + * + */ + // typeof options = {hostport:port,srv_ip,srv_ipport,router,verbose?,env?:{}} + Mode: Mode, + Server: function(options) { + var obj = new Server(options); + Object.preventExtensions(obj); + return obj; + } +};