Browse Source

sort the received aircraft reports into the corresponding worker thread

Sven Czarnian 3 years ago
parent
commit
9de9b813ba
3 changed files with 28 additions and 8 deletions
  1. 13 1
      aman/AMAN.py
  2. 9 4
      aman/com/Euroscope.py
  3. 6 3
      aman/sys/Worker.py

+ 13 - 1
aman/AMAN.py

@@ -4,6 +4,7 @@ import glob
 import os
 import sys
 
+from aman.com import AircraftReport_pb2
 from aman.com.Euroscope import Euroscope
 from aman.config.AircraftPerformance import AircraftPerformance
 from aman.config.Airport import Airport
@@ -29,6 +30,7 @@ class AMAN:
         self.aircraftPerformance = None
         self.receiver = None
         self.workers = []
+        self.inbounds = {}
 
         configPath = AMAN.findConfigPath()
 
@@ -59,7 +61,7 @@ class AMAN:
             print('Starter worker for ' + icao)
 
         # create the EuroScope receiver
-        self.receiver = Euroscope(configPath, self.systemConfig.Server)
+        self.receiver = Euroscope(configPath, self.systemConfig.Server, self)
 
     def __del__(self):
         if None != self.receiver:
@@ -68,3 +70,13 @@ class AMAN:
 
         for worker in self.workers:
             worker.stop()
+
+    def updateAircraftReport(self, report : AircraftReport_pb2.AircraftReport):
+        # find the correct worker for the inbound
+        for worker in self.workers:
+            if worker.icao == report.destination:
+                print('Updated ' + report.aircraft.callsign + ' for ' + worker.icao)
+                worker.acquire()
+                worker.reportQueue[report.aircraft.callsign] = report
+                worker.release()
+                break

+ 9 - 4
aman/com/Euroscope.py

@@ -11,12 +11,13 @@ import zmq
 import zmq.auth
 
 from aman.com import AircraftReport_pb2
-from aman.config import Server
+from aman.config.Server import Server
 
 class ReceiverThread(threading.Thread):
-    def __init__(self, socket):
+    def __init__(self, socket, aman):
         threading.Thread.__init__(self)
         self.socket = socket
+        self.aman = aman
 
     def run(self):
         while True:
@@ -26,6 +27,10 @@ class ReceiverThread(threading.Thread):
                 # parse the received message
                 report = AircraftReport_pb2.AircraftReport()
                 report.ParseFromString(msg)
+
+                # try to associate the received aircraft to an airport
+                self.aman.updateAircraftReport(report)
+
             except zmq.ZMQError as error:
                 if zmq.EAGAIN == error.errno:
                     time.sleep(0.5)
@@ -50,7 +55,7 @@ class ReceiverThread(threading.Thread):
 class Euroscope:
     # @brief Initializes the ZMQ socket
     # @param[in] config The server configuration
-    def __init__(self, configPath : str, config : Server.Server):
+    def __init__(self, configPath : str, config : Server, aman):
         self.context = zmq.Context()
 
         # find the key directories
@@ -80,7 +85,7 @@ class Euroscope:
         self.receiverSocket.setsockopt(zmq.CURVE_SERVER, True)
         self.receiverSocket.bind('tcp://' + config.Address + ':' + str(config.PortReceiver))
         self.receiverSocket.setsockopt(zmq.SUBSCRIBE, b'')
-        self.receiverThread = ReceiverThread(self.receiverSocket)
+        self.receiverThread = ReceiverThread(self.receiverSocket, aman)
         self.receiverThread.start()
         print('Listening at tcp://' + config.Address + ':' + str(config.PortReceiver))
 

+ 6 - 3
aman/sys/Worker.py

@@ -1,18 +1,20 @@
 #!/usr/bin/env python
 
-import threading
+from threading import Thread, Lock
 import time
 
 from aman.config.Airport import Airport
 
-class Worker(threading.Thread):
+class Worker(Thread):
     def __init__(self, icao : str, configuration : Airport):
-        threading.Thread.__init__(self)
+        Thread.__init__(self)
 
         self.stopThread = None
         self.icao = icao
         self.configuration = configuration
         self.arrivalRoutes = configuration.gngData.arrivalRoutes
+        self.updateLock = Lock()
+        self.reportQueue = {}
 
     def stop(self):
         self.stopThread = True
@@ -26,5 +28,6 @@ class Worker(threading.Thread):
             if 0 != (counter % 60):
                 continue
 
+            # TODO handle the report queue and update internal information
             # TODO execute planning, etc.
             continue