#!/usr/bin/env python import copy from datetime import datetime, timedelta from aman.config.RunwaySequencing import RunwayAssignmentType from aman.sys.aco.Configuration import Configuration from aman.sys.aco.Constraints import SpacingConstraints from aman.sys.aco.Node import Node class RunwayManager: def __init__(self, configuration : Configuration): self.Spacings = SpacingConstraints() self.Configuration = configuration self.RunwayInbounds = copy.deepcopy(configuration.PreceedingRunwayInbounds) self.IafInbounds = copy.deepcopy(configuration.PreceedingIafInbounds) def calculateEarliestArrivalTime(self, runway : str, node : Node, earliestArrivalTime : datetime): constrainedETA = None if None != self.RunwayInbounds[runway]: # get the WTC based ETA if None == self.RunwayInbounds[runway].Inbound.WTC or None == node.Inbound.WTC: spacingWTC = 3 else: if self.RunwayInbounds[runway].Inbound.WTC not in self.Spacings.WtcSpacing: spacingWTC = 3 elif node.Inbound.WTC not in self.Spacings.WtcSpacing[self.RunwayInbounds[runway].Inbound.WTC]: spacingWTC = self.Spacings.WtcSpacing[self.RunwayInbounds[runway].Inbound.WTC]['L'] else: spacingWTC = self.Spacings.WtcSpacing[self.RunwayInbounds[runway].Inbound.WTC][node.Inbound.WTC] # get the runway time spacing spacingRunway = self.Configuration.RunwayConstraints.findRunway(runway).Spacing constrainedETA = self.RunwayInbounds[runway].Inbound.PlannedArrivalTime + timedelta(minutes = max(spacingWTC, spacingRunway) / (node.Inbound.PerformanceData.SpeedApproach / 60)) # calculate the arrival times for the dependent inbounds for dependentRunway in self.Configuration.RunwayConstraints.findDependentRunways(runway): if None != self.RunwayInbounds[dependentRunway.Runway.Name]: candidate = self.RunwayInbounds[dependentRunway.Runway.Name].Inbound.PlannedArrivalTime + timedelta(minutes = 3 / (node.Inbound.PerformanceData.SpeedApproach / 60)) if None == constrainedETA or candidate > constrainedETA: constrainedETA = candidate if None == constrainedETA: eta = max(node.ArrivalCandidates[runway].InitialArrivalTime, earliestArrivalTime) else: eta = max(node.ArrivalCandidates[runway].InitialArrivalTime, max(constrainedETA, earliestArrivalTime)) return eta, eta - node.ArrivalCandidates[runway].InitialArrivalTime def selectShallShouldMayArrivalRunway(self, node : Node, runways, earliestArrivalTime : datetime): candidate = None delay = None for runway in runways: eta, _ = self.calculateEarliestArrivalTime(runway.Runway.Name, node, earliestArrivalTime) if None == delay: delay = eta - node.ArrivalCandidates[runway.Runway.Name].InitialArrivalTime candidate = runway elif delay > (eta - node.ArrivalCandidates[runway.Runway.Name].InitialArrivalTime): delay = eta- node.ArrivalCandidates[runway.Runway.Name].InitialArrivalTime candidate = runway return candidate def executeShallShouldMayAssignment(self, node : Node, earliestArrivalTime : datetime): shallRunways = [] shouldRunways = [] mayRunways = [] expectedRunway = None for runway in self.Configuration.RunwayConstraints.ActiveArrivalRunways: # test the shall assignments if RunwayAssignmentType.AircraftType in runway.ShallAssignments: if node.Inbound.Report.aircraft.type in runway.ShallAssignments[RunwayAssignmentType.AircraftType]: shallRunways.append(runway) expectedRunway = runway.Runway.Name if RunwayAssignmentType.GateAssignment in runway.ShallAssignments: if node.Inbound.Report.plannedGate in runway.ShallAssignments[RunwayAssignmentType.GateAssignment]: shallRunways.append(runway) expectedRunway = runway.Runway.Name # test the should assignments if RunwayAssignmentType.AircraftType in runway.ShouldAssignments: if node.Inbound.Report.aircraft.type in runway.ShouldAssignments[RunwayAssignmentType.AircraftType]: shouldRunways.append(runway) expectedRunway = runway.Runway.Name if RunwayAssignmentType.GateAssignment in runway.ShouldAssignments: if node.Inbound.Report.plannedGate in runway.ShouldAssignments[RunwayAssignmentType.GateAssignment]: shouldRunways.append(runway) expectedRunway = runway.Runway.Name # test the may assignments if RunwayAssignmentType.AircraftType in runway.MayAssignments: if node.Inbound.Report.aircraft.type in runway.MayAssignments[RunwayAssignmentType.AircraftType]: eta, _ = self.calculateEarliestArrivalTime(runway.Runway.Name, node, earliestArrivalTime) if (eta - node.ArrivalCandidates[runway.Runway.Name].InitialArrivalTime) <= self.Configuration.AirportConfiguration.MaxDelayMay: mayRunways.append(runway) expectedRunway = runway.Runway.Name if RunwayAssignmentType.GateAssignment in runway.MayAssignments: if node.Inbound.Report.plannedGate in runway.MayAssignments[RunwayAssignmentType.GateAssignment]: eta, _ = self.calculateEarliestArrivalTime(runway.Runway.Name, node, earliestArrivalTime) if (eta - node.ArrivalCandidates[runway.Runway.Name].InitialArrivalTime) <= self.Configuration.AirportConfiguration.MaxDelayMay: mayRunways.append(runway) expectedRunway = runway.Runway.Name runway = self.selectShallShouldMayArrivalRunway(node, shallRunways, earliestArrivalTime) if None != runway: return 'shall', expectedRunway, [ runway ] runway = self.selectShallShouldMayArrivalRunway(node, shouldRunways, earliestArrivalTime) if None != runway: return 'should', expectedRunway, [ runway ] runway = self.selectShallShouldMayArrivalRunway(node, mayRunways, earliestArrivalTime) if None != runway: return 'may', expectedRunway, [ runway ] return 'other', None, self.Configuration.RunwayConstraints.ActiveArrivalRunways def selectArrivalRunway(self, node : Node, earliestArrivalTime : datetime): availableRunways = self.Configuration.RunwayConstraints.ActiveArrivalRunways if 0 == len(availableRunways): return None, None, None, None, None expectedRunway = None if True == self.Configuration.RunwayConstraints.UseShallShouldMay and None == node.Inbound.RequestedRunway: type, expectedRunway, availableRunways = self.executeShallShouldMayAssignment(node, earliestArrivalTime) elif None != node.Inbound.RequestedRunway: for runway in availableRunways: if node.Inbound.RequestedRunway == runway.Runway.Name: availableRunways = [ runway ] type = 'other' break if 0 == len(availableRunways): runway = self.Configuration.RunwayConstraints.ActiveArrivalRunways[0] eta, delta = self.calculateEarliestArrivalTime(runway.Runway.Name, node, earliestArrivalTime) return 'other', None, runway, eta, delta # start with the beginning selectedRunway = None lostTime = None eta = None # get the runway with the earliest ETA for runway in availableRunways: candidate, delta = self.calculateEarliestArrivalTime(runway.Runway.Name, node, earliestArrivalTime) if None == eta or eta > candidate: selectedRunway = runway.Runway lostTime = delta eta = candidate # find the corresponding IAF iaf = node.ArrivalCandidates[selectedRunway.Name].ArrivalRoute[0].Waypoint.Name if iaf in self.IafInbounds: delta = 100000.0 targetLevel = None # find the planned level for level in self.IafInbounds[iaf]: difference = abs(level - node.ArrivalCandidates[selectedRunway.Name].ArrivalRoute[0].Altitude) if difference < delta: delta = difference targetLevel = level if targetLevel in self.IafInbounds[iaf]: # check if we have to lose time to ensure the IAF spacing # the function assumes that model allows only TTG during flight to IAF if None != self.IafInbounds[iaf][targetLevel]: if None != self.IafInbounds[iaf][targetLevel].Inbound.PlannedArrivalRoute: # ETA at IAF of preceeding traffic plannedDelta = self.IafInbounds[iaf][targetLevel].Inbound.PlannedArrivalTime - self.IafInbounds[iaf][targetLevel].Inbound.EnrouteArrivalTime iafETAPreceeding = self.IafInbounds[iaf][targetLevel].Inbound.PlannedArrivalRoute[0].ETA + plannedDelta # ETA at IAF of current inbound plannedDelta = eta - node.Inbound.EnrouteArrivalTime iafETACurrent = node.ArrivalCandidates[selectedRunway.Name].ArrivalRoute[0].ETA # required time delte to ensure IAF spacing timeSpacing = timedelta(hours = self.Configuration.AirportConfiguration.IafSpacing / node.ArrivalCandidates[selectedRunway.Name].ArrivalRoute[0].GroundSpeed) # we are too close to preceeding traffic currentTimeSpacing = iafETACurrent - iafETAPreceeding if timeSpacing > currentTimeSpacing: eta = eta + (timeSpacing - currentTimeSpacing) lostTime += (timeSpacing - currentTimeSpacing) return type, expectedRunway, selectedRunway, eta, lostTime def registerNode(self, node : Node, runway : str): self.RunwayInbounds[runway] = node # find the corresponding IAF iaf = node.ArrivalCandidates[runway].ArrivalRoute[0].Waypoint.Name if iaf in self.IafInbounds: delta = 100000.0 targetLevel = None # find the planned level for level in self.IafInbounds[iaf]: difference = abs(level - node.ArrivalCandidates[runway].ArrivalRoute[0].Altitude) if difference < delta: delta = difference targetLevel = level if targetLevel in self.IafInbounds[iaf]: self.IafInbounds[iaf][targetLevel] = node