remove acquire() and release() functions and run the threads as deamons for easier cleanups
This commit is contained in:
39
aman/AMAN.py
39
aman/AMAN.py
@@ -30,6 +30,7 @@ class AMAN:
|
|||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
# default initialization of members
|
# default initialization of members
|
||||||
|
configPath = AMAN.findConfigPath()
|
||||||
self.SystemConfig = None
|
self.SystemConfig = None
|
||||||
self.AircraftPerformance = None
|
self.AircraftPerformance = None
|
||||||
self.Receiver = None
|
self.Receiver = None
|
||||||
@@ -37,15 +38,6 @@ class AMAN:
|
|||||||
self.WebUi = None
|
self.WebUi = None
|
||||||
self.Workers = []
|
self.Workers = []
|
||||||
|
|
||||||
# initialize the random number generator
|
|
||||||
random.seed(time.time())
|
|
||||||
|
|
||||||
def __del__(self):
|
|
||||||
self.release()
|
|
||||||
|
|
||||||
def aquire(self):
|
|
||||||
configPath = AMAN.findConfigPath()
|
|
||||||
|
|
||||||
# read all system relevant configuration files
|
# read all system relevant configuration files
|
||||||
self.SystemConfig = System(os.path.join(configPath, 'System.ini'))
|
self.SystemConfig = System(os.path.join(configPath, 'System.ini'))
|
||||||
print('Parsed System.ini')
|
print('Parsed System.ini')
|
||||||
@@ -58,13 +50,10 @@ class AMAN:
|
|||||||
else:
|
else:
|
||||||
print('Parsed PerformanceData.ini. Extracted ' + str(len(self.AircraftPerformance.Aircrafts)) + ' aircrafts')
|
print('Parsed PerformanceData.ini. Extracted ' + str(len(self.AircraftPerformance.Aircrafts)) + ' aircrafts')
|
||||||
|
|
||||||
self.Weather = Weather()
|
# create the communication syb
|
||||||
self.Weather.acquire(self.SystemConfig.Weather)
|
self.Weather = Weather(self.SystemConfig.Weather)
|
||||||
self.WebUi = WebUI()
|
self.Receiver = Euroscope(configPath, self.SystemConfig.Server, self)
|
||||||
self.WebUi.acquire(self.SystemConfig.Server, self)
|
|
||||||
|
|
||||||
# create the EuroScope receiver
|
|
||||||
self.Receiver = Euroscope()
|
|
||||||
|
|
||||||
# find the airport configurations and create the workers
|
# find the airport configurations and create the workers
|
||||||
airportsPath = os.path.join(os.path.join(configPath, 'airports'), '*.ini')
|
airportsPath = os.path.join(os.path.join(configPath, 'airports'), '*.ini')
|
||||||
@@ -75,31 +64,13 @@ class AMAN:
|
|||||||
airportConfig = Airport(file, icao)
|
airportConfig = Airport(file, icao)
|
||||||
|
|
||||||
# initialize the worker thread
|
# initialize the worker thread
|
||||||
worker = Worker()
|
worker = Worker(icao, airportConfig, self.Weather, self.AircraftPerformance, self.Receiver)
|
||||||
worker.acquire(icao, airportConfig, self.Weather, self.AircraftPerformance, self.WebUi, self.Receiver)
|
|
||||||
self.Workers.append(worker)
|
self.Workers.append(worker)
|
||||||
print('Started worker for ' + icao)
|
print('Started worker for ' + icao)
|
||||||
|
|
||||||
# initialize the receiver
|
|
||||||
self.Receiver.acquire(configPath, self.SystemConfig.Server, self)
|
|
||||||
|
|
||||||
def release(self):
|
|
||||||
if None != self.Workers:
|
|
||||||
for worker in self.Workers:
|
|
||||||
worker.release()
|
|
||||||
self.Workers = None
|
|
||||||
|
|
||||||
if None != self.Receiver:
|
|
||||||
self.Receiver.release()
|
|
||||||
self.Receiver = None
|
|
||||||
|
|
||||||
if None != self.Weather:
|
|
||||||
self.Weather.release()
|
|
||||||
self.Weather = None
|
|
||||||
|
|
||||||
if None != self.WebUi:
|
|
||||||
self.WebUi.release()
|
|
||||||
self.WebUi = None
|
|
||||||
|
|
||||||
def updateAircraftReport(self, report : AircraftReport_pb2.AircraftReport):
|
def updateAircraftReport(self, report : AircraftReport_pb2.AircraftReport):
|
||||||
# find the correct worker for the inbound
|
# find the correct worker for the inbound
|
||||||
|
|||||||
@@ -40,33 +40,13 @@ class ReceiverThread(Thread):
|
|||||||
else:
|
else:
|
||||||
return
|
return
|
||||||
|
|
||||||
def threadId(self):
|
|
||||||
if hasattr(self, '_thread_id'):
|
|
||||||
return self._thread_id
|
|
||||||
for id, thread in _active.items():
|
|
||||||
if thread is self:
|
|
||||||
return id
|
|
||||||
|
|
||||||
def stopThread(self):
|
|
||||||
id = self.threadId()
|
|
||||||
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(id, ctypes.py_object(SystemExit))
|
|
||||||
if 1 < res:
|
|
||||||
ctypes.pythonapi.PyThreadState_SetAsyncExc(id, 0)
|
|
||||||
|
|
||||||
# @brief Receives and sends messages to EuroScope plugins
|
# @brief Receives and sends messages to EuroScope plugins
|
||||||
class Euroscope:
|
class Euroscope:
|
||||||
def __init__(self):
|
def __init__(self, configPath : str, config : Server, aman):
|
||||||
self.Context = None
|
self.Context = None
|
||||||
self.ReceiverSocket = None
|
self.ReceiverSocket = None
|
||||||
self.ReceiverThread = None
|
self.ReceiverThread = None
|
||||||
self.NotificationSocket = None
|
self.NotificationSocket = None
|
||||||
|
|
||||||
def __del__(self):
|
|
||||||
self.release()
|
|
||||||
|
|
||||||
# @brief Initializes the ZMQ socket
|
|
||||||
# @param[in] config The server configuration
|
|
||||||
def acquire(self, configPath : str, config : Server, aman):
|
|
||||||
self.Context = zmq.Context()
|
self.Context = zmq.Context()
|
||||||
|
|
||||||
# find the key directories
|
# find the key directories
|
||||||
@@ -97,6 +77,7 @@ class Euroscope:
|
|||||||
self.ReceiverSocket.bind('tcp://' + config.Address + ':' + str(config.PortReceiver))
|
self.ReceiverSocket.bind('tcp://' + config.Address + ':' + str(config.PortReceiver))
|
||||||
self.ReceiverSocket.setsockopt(zmq.SUBSCRIBE, b'')
|
self.ReceiverSocket.setsockopt(zmq.SUBSCRIBE, b'')
|
||||||
self.ReceiverThread = ReceiverThread(self.ReceiverSocket, aman)
|
self.ReceiverThread = ReceiverThread(self.ReceiverSocket, aman)
|
||||||
|
self.ReceiverThread.setDaemon(True)
|
||||||
self.ReceiverThread.start()
|
self.ReceiverThread.start()
|
||||||
print('Listening to tcp://' + config.Address + ':' + str(config.PortReceiver))
|
print('Listening to tcp://' + config.Address + ':' + str(config.PortReceiver))
|
||||||
|
|
||||||
@@ -108,22 +89,6 @@ class Euroscope:
|
|||||||
self.NotificationSocket.bind('tcp://' + config.Address + ':' + str(config.PortNotification))
|
self.NotificationSocket.bind('tcp://' + config.Address + ':' + str(config.PortNotification))
|
||||||
print('Publishing to tcp://' + config.Address + ':' + str(config.PortNotification))
|
print('Publishing to tcp://' + config.Address + ':' + str(config.PortNotification))
|
||||||
|
|
||||||
def release(self):
|
|
||||||
if None != self.ReceiverThread:
|
|
||||||
self.ReceiverThread.stopThread()
|
|
||||||
self.ReceiverThread.join()
|
|
||||||
self.ReceiverThread = None
|
|
||||||
|
|
||||||
if None != self.ReceiverSocket:
|
|
||||||
self.ReceiverSocket.close()
|
|
||||||
self.ReceiverSocket = None
|
|
||||||
|
|
||||||
if None != self.NotificationSocket:
|
|
||||||
self.NotificationSocket.close()
|
|
||||||
self.NotificationSocket = None
|
|
||||||
|
|
||||||
self.Context = None
|
|
||||||
|
|
||||||
def sendSequence(self, airport : str, inbounds, weather):
|
def sendSequence(self, airport : str, inbounds, weather):
|
||||||
if None == self.NotificationSocket:
|
if None == self.NotificationSocket:
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -11,15 +11,9 @@ from aman.com.DwdCrawler import DwdCrawler
|
|||||||
import aman.config.Weather
|
import aman.config.Weather
|
||||||
|
|
||||||
class Weather(Thread):
|
class Weather(Thread):
|
||||||
def __init__(self):
|
def __init__(self, config : aman.config.Weather.Weather):
|
||||||
Thread.__init__(self)
|
Thread.__init__(self)
|
||||||
|
|
||||||
self.NextUpdate = None
|
|
||||||
self.LastUpdateTried = None
|
|
||||||
self.StopThread = False
|
|
||||||
self.Provider = None
|
|
||||||
|
|
||||||
def acquire(self, config : aman.config.Weather.Weather):
|
|
||||||
self.NextUpdate = dt.utcfromtimestamp(int(time.time()))
|
self.NextUpdate = dt.utcfromtimestamp(int(time.time()))
|
||||||
self.LastUpdateTried = None
|
self.LastUpdateTried = None
|
||||||
self.StopThread = False
|
self.StopThread = False
|
||||||
@@ -31,12 +25,9 @@ class Weather(Thread):
|
|||||||
sys.stderr.write('Invalid or unknown weather-provider defined')
|
sys.stderr.write('Invalid or unknown weather-provider defined')
|
||||||
sys.exit(-1)
|
sys.exit(-1)
|
||||||
|
|
||||||
|
self.setDaemon(True)
|
||||||
self.start()
|
self.start()
|
||||||
|
|
||||||
def release(self):
|
|
||||||
self.StopThread = True
|
|
||||||
self.join()
|
|
||||||
|
|
||||||
def currentClock():
|
def currentClock():
|
||||||
clock = dt.utcfromtimestamp(int(time.time())).replace(tzinfo = pytz.UTC)
|
clock = dt.utcfromtimestamp(int(time.time())).replace(tzinfo = pytz.UTC)
|
||||||
return clock
|
return clock
|
||||||
|
|||||||
@@ -17,27 +17,13 @@ from aman.types.Inbound import Inbound
|
|||||||
from aman.types.PerformanceData import PerformanceData
|
from aman.types.PerformanceData import PerformanceData
|
||||||
|
|
||||||
class Worker(Thread):
|
class Worker(Thread):
|
||||||
def __init__(self):
|
def __init__(self, icao : str, configuration : Airport, weather : Weather,
|
||||||
|
performance : PerformanceData, euroscope : Euroscope):
|
||||||
Thread.__init__(self)
|
Thread.__init__(self)
|
||||||
self.StopThread = None
|
self.StopThread = None
|
||||||
self.Icao = None
|
|
||||||
self.Configuration = None
|
|
||||||
self.PerformanceData = None
|
|
||||||
self.UpdateLock = None
|
|
||||||
self.ReportQueue = {}
|
|
||||||
self.WeatherModel = None
|
|
||||||
self.RecedingHorizonControl = None
|
|
||||||
self.WebUi = None
|
|
||||||
|
|
||||||
def __del__(self):
|
|
||||||
self.release()
|
|
||||||
|
|
||||||
def acquire(self, icao : str, configuration : Airport, weather : Weather,
|
|
||||||
performance : PerformanceData, webui : WebUI, euroscope : Euroscope):
|
|
||||||
self.StopThread = None
|
|
||||||
self.Icao = icao
|
self.Icao = icao
|
||||||
self.Configuration = configuration
|
self.Configuration = configuration
|
||||||
self.sequencingConfiguration = configuration.DefaultSequencing
|
self.SequencingConfiguration = configuration.DefaultSequencing
|
||||||
self.PerformanceData = performance
|
self.PerformanceData = performance
|
||||||
self.UpdateLock = Lock()
|
self.UpdateLock = Lock()
|
||||||
self.ReportQueue = {}
|
self.ReportQueue = {}
|
||||||
@@ -68,16 +54,13 @@ class Worker(Thread):
|
|||||||
sys.exit(-1)
|
sys.exit(-1)
|
||||||
break
|
break
|
||||||
|
|
||||||
|
self.setDaemon(True)
|
||||||
self.start()
|
self.start()
|
||||||
|
|
||||||
def acquireLock(self):
|
def acquireLock(self):
|
||||||
if None != self.UpdateLock:
|
if None != self.UpdateLock:
|
||||||
self.UpdateLock.acquire()
|
self.UpdateLock.acquire()
|
||||||
|
|
||||||
def release(self):
|
|
||||||
self.StopThread = True
|
|
||||||
self.join()
|
|
||||||
|
|
||||||
def releaseLock(self):
|
def releaseLock(self):
|
||||||
if None != self.UpdateLock:
|
if None != self.UpdateLock:
|
||||||
self.UpdateLock.release()
|
self.UpdateLock.release()
|
||||||
@@ -107,7 +90,7 @@ class Worker(Thread):
|
|||||||
|
|
||||||
if 0 != report.distanceToIAF and '' != report.initialApproachFix:
|
if 0 != report.distanceToIAF and '' != report.initialApproachFix:
|
||||||
inbound = Inbound(report, self.PerformanceData)
|
inbound = Inbound(report, self.PerformanceData)
|
||||||
Node(inbound, inbound.ReportTime, self.WeatherModel, self.Configuration.GngData, self.sequencingConfiguration)
|
Node(inbound, inbound.ReportTime, self.WeatherModel, self.Configuration.GngData, self.SequencingConfiguration)
|
||||||
if None != inbound.InitialArrivalTime:
|
if None != inbound.InitialArrivalTime:
|
||||||
self.RecedingHorizonControl.updateReport(inbound)
|
self.RecedingHorizonControl.updateReport(inbound)
|
||||||
else:
|
else:
|
||||||
@@ -123,13 +106,13 @@ class Worker(Thread):
|
|||||||
# get the last landing aircrafts per runway before the RHC stage to check for constraints
|
# get the last landing aircrafts per runway before the RHC stage to check for constraints
|
||||||
# this is required to handle the overlap between windows
|
# this is required to handle the overlap between windows
|
||||||
preceedingInbounds = {}
|
preceedingInbounds = {}
|
||||||
for runway in self.sequencingConfiguration.ActiveArrivalRunways:
|
for runway in self.SequencingConfiguration.ActiveArrivalRunways:
|
||||||
inbound = self.RecedingHorizonControl.lastFixedInboundOnRunway(runway.Runway.Name)
|
inbound = self.RecedingHorizonControl.lastFixedInboundOnRunway(runway.Runway.Name)
|
||||||
if None != inbound:
|
if None != inbound:
|
||||||
preceedingInbounds[runway.Runway.Name] = inbound
|
preceedingInbounds[runway.Runway.Name] = inbound
|
||||||
|
|
||||||
# configure the ACO run
|
# configure the ACO run
|
||||||
acoConfig = Configuration(constraints = self.sequencingConfiguration, nav = self.Configuration.GngData,
|
acoConfig = Configuration(constraints = self.SequencingConfiguration, nav = self.Configuration.GngData,
|
||||||
earliest = earliestArrivalTime, weather = self.WeatherModel,
|
earliest = earliestArrivalTime, weather = self.WeatherModel,
|
||||||
preceeding = None if 0 == len(preceedingInbounds) else preceedingInbounds,
|
preceeding = None if 0 == len(preceedingInbounds) else preceedingInbounds,
|
||||||
ants = 5 * len(relevantInbounds), generations = 5 * len(relevantInbounds))
|
ants = 5 * len(relevantInbounds), generations = 5 * len(relevantInbounds))
|
||||||
|
|||||||
Reference in New Issue
Block a user