From 9de9b813ba3ea21c24e1e9a9088511544212b804 Mon Sep 17 00:00:00 2001 From: Sven Czarnian Date: Fri, 3 Sep 2021 23:35:17 +0200 Subject: [PATCH] sort the received aircraft reports into the corresponding worker thread --- aman/AMAN.py | 14 +++++++++++++- aman/com/Euroscope.py | 13 +++++++++---- aman/sys/Worker.py | 9 ++++++--- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/aman/AMAN.py b/aman/AMAN.py index fb84e93..0375668 100644 --- a/aman/AMAN.py +++ b/aman/AMAN.py @@ -4,6 +4,7 @@ import glob import os import sys +from aman.com import AircraftReport_pb2 from aman.com.Euroscope import Euroscope from aman.config.AircraftPerformance import AircraftPerformance from aman.config.Airport import Airport @@ -29,6 +30,7 @@ class AMAN: self.aircraftPerformance = None self.receiver = None self.workers = [] + self.inbounds = {} configPath = AMAN.findConfigPath() @@ -59,7 +61,7 @@ class AMAN: print('Starter worker for ' + icao) # create the EuroScope receiver - self.receiver = Euroscope(configPath, self.systemConfig.Server) + self.receiver = Euroscope(configPath, self.systemConfig.Server, self) def __del__(self): if None != self.receiver: @@ -68,3 +70,13 @@ class AMAN: for worker in self.workers: worker.stop() + + def updateAircraftReport(self, report : AircraftReport_pb2.AircraftReport): + # find the correct worker for the inbound + for worker in self.workers: + if worker.icao == report.destination: + print('Updated ' + report.aircraft.callsign + ' for ' + worker.icao) + worker.acquire() + worker.reportQueue[report.aircraft.callsign] = report + worker.release() + break \ No newline at end of file diff --git a/aman/com/Euroscope.py b/aman/com/Euroscope.py index 5c124d2..585ff4f 100644 --- a/aman/com/Euroscope.py +++ b/aman/com/Euroscope.py @@ -11,12 +11,13 @@ import zmq import zmq.auth from aman.com import AircraftReport_pb2 -from aman.config import Server +from aman.config.Server import Server class ReceiverThread(threading.Thread): - def __init__(self, socket): + def __init__(self, socket, aman): threading.Thread.__init__(self) self.socket = socket + self.aman = aman def run(self): while True: @@ -26,6 +27,10 @@ class ReceiverThread(threading.Thread): # parse the received message report = AircraftReport_pb2.AircraftReport() report.ParseFromString(msg) + + # try to associate the received aircraft to an airport + self.aman.updateAircraftReport(report) + except zmq.ZMQError as error: if zmq.EAGAIN == error.errno: time.sleep(0.5) @@ -50,7 +55,7 @@ class ReceiverThread(threading.Thread): class Euroscope: # @brief Initializes the ZMQ socket # @param[in] config The server configuration - def __init__(self, configPath : str, config : Server.Server): + def __init__(self, configPath : str, config : Server, aman): self.context = zmq.Context() # find the key directories @@ -80,7 +85,7 @@ class Euroscope: self.receiverSocket.setsockopt(zmq.CURVE_SERVER, True) self.receiverSocket.bind('tcp://' + config.Address + ':' + str(config.PortReceiver)) self.receiverSocket.setsockopt(zmq.SUBSCRIBE, b'') - self.receiverThread = ReceiverThread(self.receiverSocket) + self.receiverThread = ReceiverThread(self.receiverSocket, aman) self.receiverThread.start() print('Listening at tcp://' + config.Address + ':' + str(config.PortReceiver)) diff --git a/aman/sys/Worker.py b/aman/sys/Worker.py index 78130ab..f213366 100644 --- a/aman/sys/Worker.py +++ b/aman/sys/Worker.py @@ -1,18 +1,20 @@ #!/usr/bin/env python -import threading +from threading import Thread, Lock import time from aman.config.Airport import Airport -class Worker(threading.Thread): +class Worker(Thread): def __init__(self, icao : str, configuration : Airport): - threading.Thread.__init__(self) + Thread.__init__(self) self.stopThread = None self.icao = icao self.configuration = configuration self.arrivalRoutes = configuration.gngData.arrivalRoutes + self.updateLock = Lock() + self.reportQueue = {} def stop(self): self.stopThread = True @@ -26,5 +28,6 @@ class Worker(threading.Thread): if 0 != (counter % 60): continue + # TODO handle the report queue and update internal information # TODO execute planning, etc. continue