#!/usr/bin/env python from datetime import timedelta from aman.sys.aco.Configuration import Configuration from aman.sys.aco.Constraints import SpacingConstraints from aman.types.Inbound import Inbound class RunwayManager: def __init__(self, configuration : Configuration): self.Spacings = SpacingConstraints() self.Configuration = configuration # initialize the tracker which inbound arrives at which runway self.RunwayInbounds = {} if None != configuration.PreceedingInbounds: for runway in configuration.PreceedingInbounds: self.RunwayInbounds[runway] = configuration.PreceedingInbounds[runway] for runway in configuration.RunwayConstraints.ActiveArrivalRunways: if not runway.Runway.Name in self.RunwayInbounds: self.RunwayInbounds[runway.Runway.Name] = None def validateWtc(inbound : Inbound): wtc = inbound.Report.aircraft.wtc.upper() if 'L' == wtc or 'M' == wtc or 'H' == None or 'J' == None: return wtc else: return None def calculateEarliestArrivalTime(self, runway : str, inbound : Inbound): if None == self.RunwayInbounds[runway]: return inbound.ArrivalCandidates[runway].EarliestArrivalTime preceedingInbound = self.RunwayInbounds[runway] # get the WTC constrained ETA wtcPre = RunwayManager.validateWtc(preceedingInbound) wtcThis = RunwayManager.validateWtc(inbound) if None == wtcPre or None == wtcThis: spacing = 3 else: spacing = self.Spacings[wtcPre][wtcThis] delay = timedelta(minutes = spacing / (inbound.PerformanceData.SpeedApproach / 60)) wtcEarliestArrivalTime = preceedingInbound.PlannedArrivalTime + delay # get the staggered time spacing dependentRunway = self.Configuration.RunwayConstraints.findDependentRunway(runway) if None != dependentRunway and None != self.RunwayInbounds[dependentRunway.Runway.Name]: # check if the one on the same runway lands before the one on the parallel runway if preceedingInbound.PlannedArrivalTime < self.RunwayInbounds[dependentRunway.Runway.Name].PlannedArrivalTime: delay = timedelta(minutes = 3 / (inbound.PerformanceData.SpeedApproach / 60)) staggeredEarliestArrivalTime = self.RunwayInbounds[dependentRunway.Runway.Name].PlannedArrivalTime + delay # landing on the same runway else: staggeredEarliestArrivalTime = wtcEarliestArrivalTime # no neighbor or no dependent runway (IPA) else: staggeredEarliestArrivalTime = wtcEarliestArrivalTime # get the runway time spacing spacing = self.Configuration.RunwayConstraints.findRunway(runway).Spacing delay = timedelta(minutes = spacing / (inbound.PerformanceData.SpeedApproach / 60)) runwayEarliestArrivalTime = preceedingInbound.PlannedArrivalTime + delay # the candidate with the latest ETA is used -> ensure all safety and procedure constraints candidate = max(max(wtcEarliestArrivalTime, staggeredEarliestArrivalTime), runwayEarliestArrivalTime) # check if inbound comes later than the latest possible ETA if candidate < inbound.ArrivalCandidates[runway].EarliestArrivalTime: return inbound.ArrivalCandidates[runway].EarliestArrivalTime # candidate fits into earliest arrival time or is later than this else: return candidate def selectArrivalRunway(self, inbound : Inbound): availableRunways = [] for runway in self.Configuration.RunwayConstraints.ActiveArrivalRunways: availableRunways.append(runway) #if 1 < len(availableRunways): # TODO filter based on type # TODO filter based on airline # ensure that at least one runway is available # fallback to check if we have available runways if 0 == len(availableRunways): runway = self.Configuration.RunwayConstraints.ActiveArrivalRunways[0] return runway, self.calculateEarliestArrivalTime(runway.Runway.Name, inbound) # start with the beginning selectedRunway = None eta = None # get the runway with the earliest ETA for runway in availableRunways: candidate = self.calculateEarliestArrivalTime(runway.Runway.Name, inbound) if None == eta or eta > candidate: selectedRunway = runway.Runway eta = candidate return selectedRunway, eta