From a7541925c7b6f2dc23834013be074568915ee0fa Mon Sep 17 00:00:00 2001 From: Sven Czarnian Date: Thu, 14 Oct 2021 14:13:42 +0200 Subject: [PATCH] rename the cost function. it manages more the runways --- aman/sys/aco/CostFunction.py | 96 -------------------------------- aman/sys/aco/RunwayManager.py | 101 ++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 96 deletions(-) delete mode 100644 aman/sys/aco/CostFunction.py create mode 100644 aman/sys/aco/RunwayManager.py diff --git a/aman/sys/aco/CostFunction.py b/aman/sys/aco/CostFunction.py deleted file mode 100644 index 9a88c8c..0000000 --- a/aman/sys/aco/CostFunction.py +++ /dev/null @@ -1,96 +0,0 @@ -#!/usr/bin/env python - -from datetime import timedelta - -from aman.sys.aco.Colony import Colony -from aman.sys.aco.Constraints import SpacingConstraints -from aman.types.Inbound import Inbound - -class CostFunction: - def __init__(self, colony : Colony): - self.Spacings = SpacingConstraints() - self.Colony = colony - - # initialize the tracker which inbound arrives at which runway - self.RunwayInbounds = {} - for runway in self.Colony.Configuration.PreceedingInbounds: - self.RunwayInbounds[runway] = self.Colony.Configuration.PreceedingInbounds[runway] - for runway in self.Colony.Configuration.RunwayConstraints.ActiveArrivalRunways: - if not runway in self.RunwayInbounds: - self.RunwayInbounds[runway] = None - - return - - def validateWtc(inbound : Inbound): - wtc = inbound.Report.aircraft.wtc.upper() - if 'L' == wtc or 'M' == wtc or 'H' == None or 'J' == None: - return wtc - else: - return None - - def calculateEta(self, runway : str, inbound : Inbound): - if None == self.RunwayInbounds[runway]: - return inbound.EstimatedArrivalTime - - preceedingInbound = self.RunwayInbounds[runway] - - # get the WTC constrained ETA - wtcPre = CostFunction.validateWtc(preceedingInbound) - wtcThis = CostFunction.validateWtc(inbound) - if None == wtcPre or None == wtcThis: - spacing = 3 - else: - spacing = self.Spacings[wtcPre][wtcThis] - delay = timedelta(seconds = round(spacing / inbound.PerformanceData.speedApproach + 0.5)) - wtcETA = self.RunwayInbounds[runway].EstimatedArrivalTime + delay - - # get the staggered time spacing - dependentRunway = self.Colony.Configuration.RunwayConstraints.findDependentRunway(runway) - if None != dependentRunway and None != self.RunwayInbounds[dependentRunway.Runway.name]: - if preceedingInbound.EstimatedArrivalTime < self.RunwayInbounds[dependentRunway.Runway.name].EstimatedArrivalTime: - delay = timedelta(seconds = round(3 / inbound.PerformanceData.speedApproach + 0.5)) - staggeredETA = self.RunwayInbounds[dependentRunway.Runway.name].EstimatedArrivalTime + delay - else: - staggeredETA = wtcETA - else: - staggeredETA = wtcETA - - # get the runway time spacing - spacing = self.Colony.Configuration.RunwayConstraints.findRunway(runway).Spacing - delay = timedelta(seconds = round(spacing / inbound.PerformanceData.speedApproach + 0.5)) - runwayETA = self.RunwayInbounds[runway].EstimatedArrivalTime + delay - - # get the biggest ETA to define the maximum but ensure that we are not earlier than the ITA - # TODO model the TTG-concept depending on the distance to the IAF - candidate = max(max(wtcETA, staggeredETA), runwayETA) - if candidate < inbound.InitialArrivalTime: - return inbound.InitialArrivalTime - else: - return candidate - - def selectArrivalRunway(self, inbound : Inbound): - availableRunways = [] - for runway in self.Colony.Configuration.RunwayConstraints.ActiveArrivalRunways: - availableRunways.append(runway) - - #if 1 < len(availableRunways): - # TODO filter based on type - # TODO filter based on airline - # ensure that at least one runway is available - - # fallback to check if we have available runways - if 0 == len(availableRunways): - return self.Colony.Configuration.RunwayConstraints.ActiveArrivalRunways[0] - - # start with the beginning - selectedRunway = self.Colony.Configuration.RunwayConstraints.findRunway(availableRunways[0]) - eta = self.calculateEta(availableRunways[0], inbound) - - # get the runway with the earliest ETA - for runway in availableRunways: - candidate = self.calculateEta(runway, inbound) - if eta > candidate: - selectedRunway = self.Colony.Configuration.RunwayConstraints.findRunway(runway) - eta = candidate - - return selectedRunway diff --git a/aman/sys/aco/RunwayManager.py b/aman/sys/aco/RunwayManager.py new file mode 100644 index 0000000..3c65e5b --- /dev/null +++ b/aman/sys/aco/RunwayManager.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python + +from datetime import timedelta + +from aman.sys.aco.Configuration import Configuration +from aman.sys.aco.Constraints import SpacingConstraints +from aman.types.Inbound import Inbound + +class RunwayManager: + def __init__(self, configuration : Configuration): + self.Spacings = SpacingConstraints() + self.Configuration = configuration + + # initialize the tracker which inbound arrives at which runway + self.RunwayInbounds = {} + if None != configuration.PreceedingInbounds: + for runway in configuration.PreceedingInbounds: + self.RunwayInbounds[runway] = configuration.PreceedingInbounds[runway] + for runway in configuration.RunwayConstraints.ActiveArrivalRunways: + if not runway.Runway.Name in self.RunwayInbounds: + self.RunwayInbounds[runway.Runway.Name] = None + + def validateWtc(inbound : Inbound): + wtc = inbound.Report.aircraft.wtc.upper() + if 'L' == wtc or 'M' == wtc or 'H' == None or 'J' == None: + return wtc + else: + return None + + def calculateEarliestArrivalTime(self, runway : str, inbound : Inbound): + if None == self.RunwayInbounds[runway]: + return inbound.ArrivalCandidates[runway].EarliestArrivalTime + + preceedingInbound = self.RunwayInbounds[runway] + + # get the WTC constrained ETA + wtcPre = RunwayManager.validateWtc(preceedingInbound) + wtcThis = RunwayManager.validateWtc(inbound) + if None == wtcPre or None == wtcThis: + spacing = 3 + else: + spacing = self.Spacings[wtcPre][wtcThis] + delay = timedelta(minutes = spacing / (inbound.PerformanceData.SpeedApproach / 60)) + wtcEarliestArrivalTime = preceedingInbound.PlannedArrivalTime + delay + + # get the staggered time spacing + dependentRunway = self.Configuration.RunwayConstraints.findDependentRunway(runway) + if None != dependentRunway and None != self.RunwayInbounds[dependentRunway.Runway.Name]: + # check if the one on the same runway lands before the one on the parallel runway + if preceedingInbound.PlannedArrivalTime < self.RunwayInbounds[dependentRunway.Runway.Name].PlannedArrivalTime: + delay = timedelta(minutes = 3 / (inbound.PerformanceData.SpeedApproach / 60)) + staggeredEarliestArrivalTime = self.RunwayInbounds[dependentRunway.Runway.Name].PlannedArrivalTime + delay + # landing on the same runway + else: + staggeredEarliestArrivalTime = wtcEarliestArrivalTime + # no neighbor or no dependent runway (IPA) + else: + staggeredEarliestArrivalTime = wtcEarliestArrivalTime + + # get the runway time spacing + spacing = self.Configuration.RunwayConstraints.findRunway(runway).Spacing + delay = timedelta(minutes = spacing / (inbound.PerformanceData.SpeedApproach / 60)) + runwayEarliestArrivalTime = preceedingInbound.PlannedArrivalTime + delay + + # the candidate with the latest ETA is used -> ensure all safety and procedure constraints + candidate = max(max(wtcEarliestArrivalTime, staggeredEarliestArrivalTime), runwayEarliestArrivalTime) + + # check if inbound comes later than the latest possible ETA + if candidate < inbound.ArrivalCandidates[runway].EarliestArrivalTime: + return inbound.ArrivalCandidates[runway].EarliestArrivalTime + # candidate fits into earliest arrival time or is later than this + else: + return candidate + + def selectArrivalRunway(self, inbound : Inbound): + availableRunways = [] + for runway in self.Configuration.RunwayConstraints.ActiveArrivalRunways: + availableRunways.append(runway) + + #if 1 < len(availableRunways): + # TODO filter based on type + # TODO filter based on airline + # ensure that at least one runway is available + + # fallback to check if we have available runways + if 0 == len(availableRunways): + runway = self.Configuration.RunwayConstraints.ActiveArrivalRunways[0] + return runway, self.calculateEarliestArrivalTime(runway.Runway.Name, inbound) + + # start with the beginning + selectedRunway = None + eta = None + + # get the runway with the earliest ETA + for runway in availableRunways: + candidate = self.calculateEarliestArrivalTime(runway.Runway.Name, inbound) + if None == eta or eta > candidate: + selectedRunway = runway.Runway + eta = candidate + + return selectedRunway, eta