PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
Mit Datenaggregation und Sensorfusion
PD Stefan Bosse
Universität Bremen - FB Mathematik und Informatik
PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
Wie kann mit Coroutinen (Fibers) geschachtelt mehrere Aufgaben aus der Sicht des Programmiermodells "parallel" ausführen?
Welche Synchronisation wird benötigt?
Einführung eines universellen Synchronisationsobjekts für Coroutine und Implementierung von Stanadards
Coroutinen PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
Eine Coroutine ist in Lua ein separates Unterprogramm
Coroutinen werden in Lua niemals parallel (gleichzeitig) ausgeführt
Eine Coroutine kann eine andere nicht unterbrechen
Aber: Eine Coroutine kann an beliebiger Stelle im Programmfluss zu einem Scheduler verzweigen (yield), der eine andere ruhende Coroutine (re)aktiviert (resume)
Coroutinen PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
local co1 = coroutine.create(function () .. coroutine.yield() .. endlocal co2 = coroutine.create(function () .. coroutine.yield() .. endcoroutine.resume(co1)coroutine.resume(co2)
Coroutinen Synchronisation PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
Verschachtelte Ausführung von Coroutinen ist ergibt noch keine Synchronisation
Synchronisation zwischen Coroutinene wäre z.B.:
Universelles Synchronisationobjekt PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
Universelles Synchronisationobjekt PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
Das universelle cosync Objekt bietet mit der await Operation die Möglichkeit eine Art Belegung einzuleiten
Die signal Operation ermöglicht eine Art von Freigabe
Es gibt einen optionalen Zähler (wenn es eine init Funktion gibt)
Universelles Synchronisationobjekt PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
local co = cosync:new(oninit? : function () return value -- sets counter value end | value | nilonawait : function (counter,waiters) return newcounter,wait?,release? end | function (waiters) return wait?,release? endonsignal : function (counter,waiters) return newcounter,release? end | function (waiters) return release? end | nil)
Barriere mit Cosync PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
Eine Barriere dient zur zeitlichen Synchronosation einer Gruppe von Prozessen / Coroutinen.
Es handelt sich um eine selbstauslösende Synchronisation nur unter Verwendung der +await* Operation
Die Gruppengröße ist vorher festgelegt
Jeder Prozess tritt der Barriere und wird blcokiert bis der N-te Prozess beitritt und alle Prozesse freigibt.
Barriere mit Cosync PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
Barrier = function (N) return cosync:new( nil, function (waiters) return waiters~=(N-1),waiters==(N-1) end )end
Barriere mit Cosync PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
local ba = Barrier(4)for i = 1,3 do local co = coroutine.create(function () -- wait for new input data ba:await() end); coroutine.resume(co)end.. prepare input data for jobs ..ba:await()
Mutex mit Cosync PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
Schutz "kritischer" Codeabschnitte (um Datenkonsistenz zu gewährleisten)
Immer nur ein Prozess kann einen Mutex Lock besitzen/halten (Invariante)
Die Operationen await und signal müssen immer paarweise verwendet werden.
Ist der Lock vergeben, führen await Aufrufe zur Prozessblockierung (oder einem yield)
Mutex mit Cosync PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
Mutex = function () return cosync:new( function () return 1 end, function (counter,waiters) if counter == 0 then return counter,true else return 0,false end end, function (counter,waiters) if waiters then return 0,1 else return 1,0 end end )end
Mutex mit Cosync PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
local mu = Mutex()local data = { ... }for i = 1,3 do local co = coroutine.create(function () mu:await() .. modifify non-atomic shared data structures .. mu:signal() end); coroutine.resume(co)end
Semaphore mit Cosync PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
Semaphoren dienen klassisch zur Synchronisation von Produzenten- und Konsumentenprozessen
Der Semaphor besitzt einen Zähler der nicht negativ wird (Invariante)
Die await Operation erniedrigt den Zähler um den Wert 1, es sei denn der Zähler ist Null, dann wird der Aufrufer blockiert
Die signal Operation erhöht den Zählerwert um 1. War er vorher aud Null und gibt es wartende Prozesse, wird einer ausgewählt und freigegebn. Der Zähler bleibt auf Null
Semaphore mit Cosync PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
Semaphore = function (init) return cosync:new( function () return init end, function (counter,waiters) if counter > 0 then return counter-1,false else return counter,true end end, function (counter,waiters) if waiters > 0 and counter == 0 then return 0,1 -- +1 consumed by release of waiter else return counter+1,0 end end )end
Semaphore mit Cosync PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
local sema = Semaphore(0)for i = 1,3 do local co = coroutine.create(function () .. produce data .. sema:signal() end); coroutine.resume(co)endfor i = 1,3 do sema:await()end
Collector mit cosync PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
Der Semaphore muss bei einem Produzenten-Konsumenten System mit N Produzenten N-mal dekrementiert werden um auf die Terminierung aller Arbeitsprozesse zu warten
Den Semaphore kann man einfach in einen Kollektor transformieren der nur noch einen await Aufruf benötigt
Collector mit cosync PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
Collector = function (N) return cosync:new( function () return 0 end, function (counter,waiters) if counter == N then return counter,false,true else return counter,true,false end end, function (counter,waiters) counter=counter+1 if counter == N then return counter,true else return counter,false end end )end
Collector mit cosync PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
local coll = Colelctor(3)for i = 1,3 do local co = coroutine.create(function () .. produce data .. coll:signal() end); coroutine.resume(co)endlocal status = coll:await()
Cosync und Timeouts PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
Bisher wurde angenommen, dass das Synchronisationsereignus auch eintritt, z.B. dass die Barriere ausgelöst wird
Gerade bei verteilter Kommunikation können Nachrcihten verloren gehen
Daher führt man Timeouts als autosynchronisierendes (negatives) Ereignis ein
local o = cosync:new(...)local status = o:await(TimeoutInMillseconds)
Cosync und Timeouts PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
Die Wahl des Zeitfensters bis zum Scheitern einer Synchronisation ist schwierig und kritisch.
Es besteht immer die Gefahr dass ein Ereignis nach dem Timeout noch auftreten kann (z.B. der Empfang einer Nachricht)
Ist der Timeout zu klein, steigt die Wahrscheinlichkeit ein Ereignis zu verpassen
Ist der Timeout zu groß gewählt. bremst dass die Datenverarbeitung aus und es können ggfs. andere Ereignisse verpasst werden (Echtzeitfähigkeit leidet)
Cosync und Daten PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
Cosync Objekte dienen zunächst der reinen Coroutinensynchronisation im Kontrollpfad
Die signal Operation kann verwendet werden, um Daten in einer Queue (Liste) zu speichern
await kann die Daten abrufen
local o = cosync:new(nil|counter0,..,..)...o:signal(mydata1)o:signal(mydata2)...local status,data = o:await(TimeoutInMillseconds)local status,counter,data = o:await(TimeoutInMillseconds) -- with counter-- data={mydata1,mydata2}
Cosync Objekte Reinitialisieren PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
local o = cosync:new(...)o:reset(counterinit?)
Coroutinen Kommunikation mit CoVNC PD Stefan Bosse - Modul S: Synchronisation und Kommunikation
local vc = covnc:new({ loss?=number, bidir?=boolean})local p = vc:port(id?) -- p.idp:connect(to)local data = p:read()p:send({..},to?)p:receiver(function (data) .. end)