#!/usr/bin/env python from datetime import timedelta from aman.sys.aco.Colony import Colony from aman.sys.aco.Constraints import SpacingConstraints from aman.types.Inbound import Inbound class CostFunction: def __init__(self, colony : Colony): self.Spacings = SpacingConstraints() self.Colony = colony # initialize the tracker which inbound arrives at which runway self.RunwayInbounds = {} for runway in self.Colony.Configuration.PreceedingInbounds: self.RunwayInbounds[runway] = self.Colony.Configuration.PreceedingInbounds[runway] for runway in self.Colony.Configuration.RunwayConstraints.ActiveArrivalRunways: if not runway in self.RunwayInbounds: self.RunwayInbounds[runway] = None return def validateWtc(inbound : Inbound): wtc = inbound.Report.aircraft.wtc.upper() if 'L' == wtc or 'M' == wtc or 'H' == None or 'J' == None: return wtc else: return None def calculateEta(self, runway : str, inbound : Inbound): if None == self.RunwayInbounds[runway]: return inbound.EstimatedArrivalTime preceedingInbound = self.RunwayInbounds[runway] # get the WTC constrained ETA wtcPre = CostFunction.validateWtc(preceedingInbound) wtcThis = CostFunction.validateWtc(inbound) if None == wtcPre or None == wtcThis: spacing = 3 else: spacing = self.Spacings[wtcPre][wtcThis] delay = timedelta(seconds = round(spacing / inbound.PerformanceData.speedApproach + 0.5)) wtcETA = self.RunwayInbounds[runway].EstimatedArrivalTime + delay # get the staggered time spacing dependentRunway = self.Colony.Configuration.RunwayConstraints.findDependentRunway(runway) if None != dependentRunway and None != self.RunwayInbounds[dependentRunway.Runway.name]: if preceedingInbound.EstimatedArrivalTime < self.RunwayInbounds[dependentRunway.Runway.name].EstimatedArrivalTime: delay = timedelta(seconds = round(3 / inbound.PerformanceData.speedApproach + 0.5)) staggeredETA = self.RunwayInbounds[dependentRunway.Runway.name].EstimatedArrivalTime + delay else: staggeredETA = wtcETA else: staggeredETA = wtcETA # get the runway time spacing spacing = self.Colony.Configuration.RunwayConstraints.findRunway(runway).Spacing delay = timedelta(seconds = round(spacing / inbound.PerformanceData.speedApproach + 0.5)) runwayETA = self.RunwayInbounds[runway].EstimatedArrivalTime + delay # get the biggest ETA to define the maximum but ensure that we are not earlier than the ITA # TODO model the TTG-concept depending on the distance to the IAF candidate = max(max(wtcETA, staggeredETA), runwayETA) if candidate < inbound.InitialArrivalTime: return inbound.InitialArrivalTime else: return candidate def selectArrivalRunway(self, inbound : Inbound): availableRunways = [] for runway in self.Colony.Configuration.RunwayConstraints.ActiveArrivalRunways: availableRunways.append(runway) #if 1 < len(availableRunways): # TODO filter based on type # TODO filter based on airline # ensure that at least one runway is available # fallback to check if we have available runways if 0 == len(availableRunways): return self.Colony.Configuration.RunwayConstraints.ActiveArrivalRunways[0] # start with the beginning selectedRunway = self.Colony.Configuration.RunwayConstraints.findRunway(availableRunways[0]) eta = self.calculateEta(availableRunways[0], inbound) # get the runway with the earliest ETA for runway in availableRunways: candidate = self.calculateEta(runway, inbound) if eta > candidate: selectedRunway = self.Colony.Configuration.RunwayConstraints.findRunway(runway) eta = candidate return selectedRunway