1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- #!/usr/bin/env python
- from datetime import datetime, timedelta
- from aman.sys.aco.Configuration import Configuration
- from aman.sys.aco.Constraints import SpacingConstraints
- from aman.types.Inbound import Inbound
- class RunwayManager:
- def __init__(self, configuration : Configuration):
- self.Spacings = SpacingConstraints()
- self.Configuration = configuration
- # initialize the tracker which inbound arrives at which runway
- self.RunwayInbounds = {}
- if None != configuration.PreceedingInbounds:
- for runway in configuration.PreceedingInbounds:
- self.RunwayInbounds[runway] = configuration.PreceedingInbounds[runway]
- for runway in configuration.RunwayConstraints.ActiveArrivalRunways:
- if not runway.Runway.Name in self.RunwayInbounds:
- self.RunwayInbounds[runway.Runway.Name] = None
- def calculateEarliestArrivalTime(self, runway : str, inbound : Inbound, useETA : bool, earliestArrivalTime : datetime):
- constrainedETA = None
- if None != self.RunwayInbounds[runway]:
- # get the WTC based ETA
- if None == self.RunwayInbounds[runway].WTC or None == inbound.WTC:
- spacingWTC = 3
- else:
- spacingWTC = self.Spacings[self.RunwayInbounds[runway].WTC][inbound.WTC]
- # get the runway time spacing
- spacingRunway = self.Configuration.RunwayConstraints.findRunway(runway).Spacing
- constrainedETA = self.RunwayInbounds[runway].PlannedArrivalTime + timedelta(minutes = max(spacingWTC, spacingRunway) / (inbound.PerformanceData.SpeedApproach / 60))
- # calculate the arrival times for the dependent inbounds
- for dependentRunway in self.Configuration.RunwayConstraints.findDependentRunways(runway):
- if None != self.RunwayInbounds[dependentRunway.Runway.Name]:
- # TODO staggered spacing variabel
- candidate = self.RunwayInbounds[dependentRunway.Runway.Name].PlannedArrivalTime + timedelta(minutes = 3 / (inbound.PerformanceData.SpeedApproach / 60))
- if None == constrainedETA or candidate > constrainedETA:
- constrainedETA = candidate
- # get the arrival time on the runway of the inbound
- if True == useETA:
- arrivalTime = inbound.ArrivalCandidates[runway].EarliestArrivalTime
- else:
- arrivalTime = inbound.ArrivalCandidates[runway].InitialArrivalTime
- if None == constrainedETA:
- return max(arrivalTime, earliestArrivalTime), timedelta(seconds = 0)
- else:
- eta = max(constrainedETA, earliestArrivalTime)
- if eta < arrivalTime:
- return arrivalTime, arrivalTime - eta
- else:
- return eta, timedelta(seconds = 0)
- def selectArrivalRunway(self, inbound : Inbound, useETA : bool, earliestArrivalTime : datetime):
- availableRunways = self.Configuration.RunwayConstraints.ActiveArrivalRunways
- #if 1 < len(availableRunways):
- # TODO filter based on type
- # TODO filter based on airline
- # ensure that at least one runway is available
- # fallback to check if we have available runways
- if 0 == len(availableRunways):
- runway = self.Configuration.RunwayConstraints.ActiveArrivalRunways[0]
- return runway, self.calculateEarliestArrivalTime(runway.Runway.Name, inbound, useETA, earliestArrivalTime)
- # start with the beginning
- selectedRunway = None
- lostTime = None
- eta = None
- # get the runway with the earliest ETA
- for runway in availableRunways:
- candidate, delta = self.calculateEarliestArrivalTime(runway.Runway.Name, inbound, useETA, earliestArrivalTime)
- if None == eta or eta > candidate:
- selectedRunway = runway.Runway
- lostTime = delta
- eta = candidate
- return selectedRunway, eta, lostTime
|