1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- #!/usr/bin/env python
- from datetime import datetime, timedelta
- from aman.sys.aco.Configuration import Configuration
- from aman.sys.aco.Constraints import SpacingConstraints
- from aman.types.Inbound import Inbound
- class RunwayManager:
- def __init__(self, configuration : Configuration):
- self.Spacings = SpacingConstraints()
- self.Configuration = configuration
- # initialize the tracker which inbound arrives at which runway
- self.RunwayInbounds = {}
- if None != configuration.PreceedingInbounds:
- for runway in configuration.PreceedingInbounds:
- self.RunwayInbounds[runway] = configuration.PreceedingInbounds[runway]
- for runway in configuration.RunwayConstraints.ActiveArrivalRunways:
- if not runway.Runway.Name in self.RunwayInbounds:
- self.RunwayInbounds[runway.Runway.Name] = None
- def validateWtc(inbound : Inbound):
- wtc = inbound.Report.aircraft.wtc.upper()
- if 'L' == wtc or 'M' == wtc or 'H' == wtc or 'J' == wtc:
- return wtc
- else:
- return None
- def calculateEarliestArrivalTime(self, runway : str, inbound : Inbound, useITA : bool, earliestArrivalTime : datetime):
- earliestArrivalTimes = []
- if None != self.RunwayInbounds[runway]:
- # get the WTC based ETA
- wtcPre = RunwayManager.validateWtc(self.RunwayInbounds[runway])
- wtcThis = RunwayManager.validateWtc(inbound)
- if None == wtcPre or None == wtcThis:
- spacing = 3
- else:
- spacing = self.Spacings[wtcPre][wtcThis]
- delay = timedelta(minutes = spacing / (inbound.PerformanceData.SpeedApproach / 60))
- earliestArrivalTimes.append(self.RunwayInbounds[runway].PlannedArrivalTime + delay)
- # get the runway time spacing
- spacing = self.Configuration.RunwayConstraints.findRunway(runway).Spacing
- delay = timedelta(minutes = spacing / (inbound.PerformanceData.SpeedApproach / 60))
- earliestArrivalTimes.append(self.RunwayInbounds[runway].PlannedArrivalTime + delay)
- # calculate the arrival times for the dependent inbounds
- for dependentRunway in self.Configuration.RunwayConstraints.findDependentRunways(runway):
- if None != self.RunwayInbounds[dependentRunway.Runway.Name]:
- delay = timedelta(minutes = 3 / (inbound.PerformanceData.SpeedApproach / 60))
- earliestArrivalTimes.append(self.RunwayInbounds[dependentRunway.Runway.Name].PlannedArrivalTime + delay)
- # get the arrival time on the runway of the inbound
- if True == useITA:
- arrivalTime = inbound.ArrivalCandidates[runway].EarliestArrivalTime
- else:
- arrivalTime = inbound.ArrivalCandidates[runway].InitialArrivalTime
- if 0 == len(earliestArrivalTimes):
- return arrivalTime, timedelta(seconds = 0)
- else:
- eta = max(earliestArrivalTimes)
- if eta < arrivalTime:
- return arrivalTime, arrivalTime - eta
- else:
- return eta, timedelta(seconds = 0)
- def selectArrivalRunway(self, inbound : Inbound, useITA : bool, earliestArrivalTime : datetime):
- availableRunways = []
- for runway in self.Configuration.RunwayConstraints.ActiveArrivalRunways:
- availableRunways.append(runway)
- #if 1 < len(availableRunways):
- # TODO filter based on type
- # TODO filter based on airline
- # ensure that at least one runway is available
- # fallback to check if we have available runways
- if 0 == len(availableRunways):
- runway = self.Configuration.RunwayConstraints.ActiveArrivalRunways[0]
- return runway, self.calculateEarliestArrivalTime(runway.Runway.Name, inbound, useITA, earliestArrivalTime)
- # start with the beginning
- selectedRunway = None
- lostTime = None
- eta = None
- # get the runway with the earliest ETA
- for runway in availableRunways:
- candidate, delta = self.calculateEarliestArrivalTime(runway.Runway.Name, inbound, useITA, earliestArrivalTime)
- if None == eta or eta > candidate:
- selectedRunway = runway.Runway
- lostTime = delta
- eta = candidate
- return selectedRunway, eta, lostTime
|