#!/usr/bin/env python from datetime import datetime, timedelta from aman.sys.aco.Configuration import Configuration from aman.sys.aco.Constraints import SpacingConstraints from aman.types.Inbound import Inbound class RunwayManager: def __init__(self, configuration : Configuration): self.Spacings = SpacingConstraints() self.Configuration = configuration # initialize the tracker which inbound arrives at which runway self.RunwayInbounds = {} if None != configuration.PreceedingInbounds: for runway in configuration.PreceedingInbounds: self.RunwayInbounds[runway] = configuration.PreceedingInbounds[runway] for runway in configuration.RunwayConstraints.ActiveArrivalRunways: if not runway.Runway.Name in self.RunwayInbounds: self.RunwayInbounds[runway.Runway.Name] = None def validateWtc(inbound : Inbound): wtc = inbound.Report.aircraft.wtc.upper() if 'L' == wtc or 'M' == wtc or 'H' == wtc or 'J' == wtc: return wtc else: return None def calculateEarliestArrivalTime(self, runway : str, inbound : Inbound, useITA : bool, earliestArrivalTime : datetime): earliestArrivalTimes = [] if None != self.RunwayInbounds[runway]: # get the WTC based ETA wtcPre = RunwayManager.validateWtc(self.RunwayInbounds[runway]) wtcThis = RunwayManager.validateWtc(inbound) if None == wtcPre or None == wtcThis: spacing = 3 else: spacing = self.Spacings[wtcPre][wtcThis] delay = timedelta(minutes = spacing / (inbound.PerformanceData.SpeedApproach / 60)) earliestArrivalTimes.append(self.RunwayInbounds[runway].PlannedArrivalTime + delay) # get the runway time spacing spacing = self.Configuration.RunwayConstraints.findRunway(runway).Spacing delay = timedelta(minutes = spacing / (inbound.PerformanceData.SpeedApproach / 60)) earliestArrivalTimes.append(self.RunwayInbounds[runway].PlannedArrivalTime + delay) # calculate the arrival times for the dependent inbounds for dependentRunway in self.Configuration.RunwayConstraints.findDependentRunways(runway): if None != self.RunwayInbounds[dependentRunway.Runway.Name]: delay = timedelta(minutes = 3 / (inbound.PerformanceData.SpeedApproach / 60)) earliestArrivalTimes.append(self.RunwayInbounds[dependentRunway.Runway.Name].PlannedArrivalTime + delay) # get the arrival time on the runway of the inbound if True == useITA: arrivalTime = inbound.ArrivalCandidates[runway].EarliestArrivalTime else: arrivalTime = inbound.ArrivalCandidates[runway].InitialArrivalTime if 0 == len(earliestArrivalTimes): return arrivalTime, timedelta(seconds = 0) else: eta = max(earliestArrivalTimes) if eta < arrivalTime: return arrivalTime, arrivalTime - eta else: return eta, timedelta(seconds = 0) def selectArrivalRunway(self, inbound : Inbound, useITA : bool, earliestArrivalTime : datetime): availableRunways = [] for runway in self.Configuration.RunwayConstraints.ActiveArrivalRunways: availableRunways.append(runway) #if 1 < len(availableRunways): # TODO filter based on type # TODO filter based on airline # ensure that at least one runway is available # fallback to check if we have available runways if 0 == len(availableRunways): runway = self.Configuration.RunwayConstraints.ActiveArrivalRunways[0] return runway, self.calculateEarliestArrivalTime(runway.Runway.Name, inbound, useITA, earliestArrivalTime) # start with the beginning selectedRunway = None lostTime = None eta = None # get the runway with the earliest ETA for runway in availableRunways: candidate, delta = self.calculateEarliestArrivalTime(runway.Runway.Name, inbound, useITA, earliestArrivalTime) if None == eta or eta > candidate: selectedRunway = runway.Runway lostTime = delta eta = candidate return selectedRunway, eta, lostTime