123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- #!/usr/bin/env python
- from datetime import timedelta
- from aman.sys.aco.Colony import Colony
- from aman.sys.aco.Constraints import SpacingConstraints
- from aman.types.Inbound import Inbound
- class CostFunction:
- def __init__(self, colony : Colony):
- self.Spacings = SpacingConstraints()
- self.Colony = colony
- # initialize the tracker which inbound arrives at which runway
- self.RunwayInbounds = {}
- for runway in self.Colony.Configuration.PreceedingInbounds:
- self.RunwayInbounds[runway] = self.Colony.Configuration.PreceedingInbounds[runway]
- for runway in self.Colony.Configuration.RunwayConstraints.ActiveArrivalRunways:
- if not runway in self.RunwayInbounds:
- self.RunwayInbounds[runway] = None
- return
- def validateWtc(inbound : Inbound):
- wtc = inbound.Report.aircraft.wtc.upper()
- if 'L' == wtc or 'M' == wtc or 'H' == None or 'J' == None:
- return wtc
- else:
- return None
- def calculateEta(self, runway : str, inbound : Inbound):
- if None == self.RunwayInbounds[runway]:
- return inbound.EstimatedArrivalTime
- preceedingInbound = self.RunwayInbounds[runway]
- # get the WTC constrained ETA
- wtcPre = CostFunction.validateWtc(preceedingInbound)
- wtcThis = CostFunction.validateWtc(inbound)
- if None == wtcPre or None == wtcThis:
- spacing = 3
- else:
- spacing = self.Spacings[wtcPre][wtcThis]
- delay = timedelta(seconds = round(spacing / inbound.PerformanceData.speedApproach + 0.5))
- wtcETA = self.RunwayInbounds[runway].EstimatedArrivalTime + delay
- # get the staggered time spacing
- dependentRunway = self.Colony.Configuration.RunwayConstraints.findDependentRunway(runway)
- if None != dependentRunway and None != self.RunwayInbounds[dependentRunway.Runway.name]:
- if preceedingInbound.EstimatedArrivalTime < self.RunwayInbounds[dependentRunway.Runway.name].EstimatedArrivalTime:
- delay = timedelta(seconds = round(3 / inbound.PerformanceData.speedApproach + 0.5))
- staggeredETA = self.RunwayInbounds[dependentRunway.Runway.name].EstimatedArrivalTime + delay
- else:
- staggeredETA = wtcETA
- else:
- staggeredETA = wtcETA
- # get the runway time spacing
- spacing = self.Colony.Configuration.RunwayConstraints.findRunway(runway).Spacing
- delay = timedelta(seconds = round(spacing / inbound.PerformanceData.speedApproach + 0.5))
- runwayETA = self.RunwayInbounds[runway].EstimatedArrivalTime + delay
- # get the biggest ETA to define the maximum but ensure that we are not earlier than the ITA
- # TODO model the TTG-concept depending on the distance to the IAF
- candidate = max(max(wtcETA, staggeredETA), runwayETA)
- if candidate < inbound.InitialArrivalTime:
- return inbound.InitialArrivalTime
- else:
- return candidate
- def selectArrivalRunway(self, inbound : Inbound):
- availableRunways = []
- for runway in self.Colony.Configuration.RunwayConstraints.ActiveArrivalRunways:
- availableRunways.append(runway)
- #if 1 < len(availableRunways):
- # TODO filter based on type
- # TODO filter based on airline
- # ensure that at least one runway is available
- # fallback to check if we have available runways
- if 0 == len(availableRunways):
- return self.Colony.Configuration.RunwayConstraints.ActiveArrivalRunways[0]
- # start with the beginning
- selectedRunway = self.Colony.Configuration.RunwayConstraints.findRunway(availableRunways[0])
- eta = self.calculateEta(availableRunways[0], inbound)
- # get the runway with the earliest ETA
- for runway in availableRunways:
- candidate = self.calculateEta(runway, inbound)
- if eta > candidate:
- selectedRunway = self.Colony.Configuration.RunwayConstraints.findRunway(runway)
- eta = candidate
- return selectedRunway
|