adapt the code to split up predictions form the inbounds

This commit is contained in:
Sven Czarnian
2021-11-13 22:55:04 +01:00
parent eba9e2deab
commit 8b34f622a3
9 changed files with 280 additions and 267 deletions

View File

@@ -21,7 +21,7 @@ class RecedingHorizonControl:
def insertInWindow(self, inbound : Inbound, usePTA : bool):
if False == usePTA:
referenceTime = inbound.EarliestArrivalTime
referenceTime = inbound.InitialArrivalTime
else:
referenceTime = inbound.PlannedArrivalTime
@@ -32,9 +32,9 @@ class RecedingHorizonControl:
# find the correct window
if window.StartTime <= referenceTime and window.EndTime > referenceTime:
self.AssignedWindow[inbound.Callsign] = [ i, 0 ]
if False == usePTA:
inbound.PlannedArrivalTime = inbound.EarliestArrivalTime
inbound.FixedSequence = i < self.FreezedIndex
if True == inbound.FixedSequence and None == inbound.PlannedArrivalTime:
inbound.PlannedArrivalTime = inbound.InitialArrivalTime
window.insert(inbound)
inserted = True
break
@@ -51,16 +51,16 @@ class RecedingHorizonControl:
self.Windows.append(RecedingHorizonWindow(lastWindowTime, lastWindowTime + timestep))
if self.Windows[-1].EndTime > referenceTime:
window = self.Windows[-1]
window.insert(inbound)
self.AssignedWindow[inbound.Callsign] = [ len(self.Windows) - 1, 0 ]
window.insert(inbound)
if False == usePTA:
inbound.PlannedArrivalTime = max(window.StartTime, inbound.EarliestArrivalTime)
inbound.FixedSequence = len(self.Windows) < self.FreezedIndex
if True == inbound.FixedSequence and None == inbound.PlannedArrivalTime:
inbound.PlannedArrivalTime = inbound.InitialArrivalTime
break
lastWindowTime = self.Windows[-1].EndTime
window.Inbounds.sort(key = lambda x: x.PlannedArrivalTime)
window.Inbounds.sort(key = lambda x: x.PlannedArrivalTime if None != x.PlannedArrivalTime else x.InitialArrivalTime)
def updateReport(self, inbound : Inbound):
# check if we need to update
@@ -73,22 +73,16 @@ class RecedingHorizonControl:
# ingore fixed updates
if True == plannedInbound.FixedSequence or index < self.FreezedIndex:
return
plannedInbound.ArrivalCandidates = inbound.ArrivalCandidates
plannedInbound.WTC = inbound.WTC
# check if we need to update the inbound
if plannedInbound.PlannedArrivalTime < inbound.EarliestArrivalTime:
if inbound.EarliestArrivalTime < self.Windows[index].StartTime or inbound.EarliestArrivalTime >= self.Windows[index].EndTime:
if None == plannedInbound.PlannedStar:
if inbound.InitialArrivalTime < self.Windows[index].StartTime or inbound.InitialArrivalTime >= self.Windows[index].EndTime:
self.Windows[index].remove(inbound.Callsign)
self.AssignedWindow.pop(inbound.Callsign)
self.updateReport(inbound)
else:
plannedInbound.PlannedStar = inbound.PlannedStar
plannedInbound.PlannedRunway = inbound.PlannedRunway
plannedInbound.InitialArrivalTime = inbound.InitialArrivalTime
if plannedInbound.PlannedArrivalTime == plannedInbound.EarliestArrivalTime:
plannedInbound.PlannedArrivalTime = inbound.EarliestArrivalTime
plannedInbound.EarliestArrivalTime = inbound.EarliestArrivalTime
else:
self.insertInWindow(inbound, False)
@@ -100,7 +94,7 @@ class RecedingHorizonControl:
self.insertInWindow(inbound, True)
else:
inbound.FixedSequence = index < self.FreezedIndex
self.Windows[index].Inbounds.sort(key = lambda x: x.PlannedArrivalTime)
self.Windows[index].Inbounds.sort(key = lambda x: x.PlannedArrivalTime if None != x.PlannedArrivalTime else x.InitialArrivalTime)
def lastFixedInboundOnRunway(self, runway : str):
# no inbounds available

View File

@@ -10,6 +10,7 @@ from aman.com.WebUI import WebUI
from aman.config.Airport import Airport
from aman.sys.aco.Colony import Colony
from aman.sys.aco.Configuration import Configuration
from aman.sys.aco.Node import Node
from aman.sys.WeatherModel import WeatherModel
from aman.sys.RecedingHorizonControl import RecedingHorizonControl
from aman.types.Inbound import Inbound
@@ -100,8 +101,9 @@ class Worker(Thread):
report = self.ReportQueue[callsign]
if 0 != report.distanceToIAF and '' != report.initialApproachFix:
inbound = Inbound(report, self.sequencingConfiguration, self.Configuration.GngData, self.PerformanceData, self.WeatherModel)
if None != inbound.PlannedRunway and None != inbound.PlannedStar:
inbound = Inbound(report, self.PerformanceData)
Node(inbound, inbound.ReportTime, self.WeatherModel, self.Configuration.GngData, self.sequencingConfiguration)
if None != inbound.InitialArrivalTime:
self.RecedingHorizonControl.updateReport(inbound)
else:
print('Unable to find all data of ' + report.aircraft.callsign)
@@ -122,13 +124,13 @@ class Worker(Thread):
preceedingInbounds[runway.Runway.Name] = inbound
# configure the ACO run
acoConfig = Configuration(constraints = self.sequencingConfiguration, inbounds = relevantInbounds,
acoConfig = Configuration(constraints = self.sequencingConfiguration, nav = self.Configuration.GngData,
earliest = earliestArrivalTime, weather = self.WeatherModel,
preceeding = None if 0 == len(preceedingInbounds) else preceedingInbounds,
ants = 5 * len(relevantInbounds), generations = 5 * len(relevantInbounds))
# perform the ACO run
aco = Colony(acoConfig)
aco = Colony(relevantInbounds, acoConfig)
aco.optimize()
if None != aco.Result:
for inbound in aco.Result:

View File

@@ -9,26 +9,27 @@ import itertools
from aman.sys.aco.Configuration import Configuration
from aman.sys.aco.RunwayManager import RunwayManager
from aman.types.Inbound import Inbound
from aman.sys.aco.Node import Node
# This class implements a single ant of the following paper:
# https://sci-hub.mksa.top/10.1109/cec.2019.8790135
class Ant:
def __init__(self, pheromoneTable : np.array, configuration : Configuration):
def __init__(self, pheromoneTable : np.array, configuration : Configuration, nodes):
self.Configuration = configuration
self.Nodes = nodes
self.RunwayManager = RunwayManager(self.Configuration)
self.InboundSelected = [ False ] * len(self.Configuration.Inbounds)
self.InboundScore = np.zeros([ len(self.Configuration.Inbounds), 1 ])
self.InboundSelected = [ False ] * len(self.Nodes)
self.InboundScore = np.zeros([ len(self.Nodes), 1 ])
self.PheromoneMatrix = pheromoneTable
self.SequenceDelay = timedelta(seconds = 0)
self.Sequence = None
def qualifyDelay(delay, inbound, runway):
def qualifyDelay(delay, node, runway):
if 0.0 > delay.total_seconds():
delay = timedelta(seconds = 0)
# calculate the heuristic scaling to punish increased delays for single inbounds
scaledDelay = delay.total_seconds() / inbound.ArrivalCandidates[runway.Name].MaximumTimeToLose.total_seconds()
scaledDelay = delay.total_seconds() / node.ArrivalCandidates[runway.Name].MaximumTimeToLose.total_seconds()
return max(1.0 / (99.0 * (scaledDelay ** 2) + 1), 0.01)
# Implements function (5), but adapted to the following logic:
@@ -37,8 +38,8 @@ class Ant:
# - Calculates a ratio between the inbound delay and the unused runway time
# - Weight the overall ratio based on maximum time to lose to punish high time to lose rates while other flights are gaining time
def heuristicInformation(self, preceeding : int, current : int):
rwy, eta, unusedRunwayTime = self.RunwayManager.selectArrivalRunway(self.Configuration.Inbounds[current], True, self.Configuration.EarliestArrivalTime)
inboundDelay = eta - self.Configuration.Inbounds[current].ArrivalCandidates[rwy.Name].InitialArrivalTime
rwy, eta, unusedRunwayTime = self.RunwayManager.selectArrivalRunway(self.Nodes[current], True, self.Configuration.EarliestArrivalTime)
inboundDelay = eta - self.Nodes[current].ArrivalCandidates[rwy.Name].InitialArrivalTime
if 0.0 > inboundDelay.total_seconds():
inboundDelay = timedelta(seconds = 0)
@@ -49,7 +50,7 @@ class Ant:
fraction /= 60.0
# calculate the heuristic scaling to punish increased delays for single inbounds
weight = Ant.qualifyDelay(inboundDelay, self.Configuration.Inbounds[current], rwy)
weight = Ant.qualifyDelay(inboundDelay, self.Nodes[current], rwy)
return weight * self.PheromoneMatrix[preceeding, current] * ((1.0 / (fraction or 1)) ** self.Configuration.Beta)
@@ -86,18 +87,19 @@ class Ant:
return None
def associateInbound(self, inbound : Inbound, earliestArrivalTime : datetime):
def associateInbound(self, node : Node, earliestArrivalTime : datetime):
# prepare the statistics
rwy, eta, _ = self.RunwayManager.selectArrivalRunway(inbound, True, self.Configuration.EarliestArrivalTime)
rwy, eta, _ = self.RunwayManager.selectArrivalRunway(node, True, self.Configuration.EarliestArrivalTime)
eta = max(earliestArrivalTime, eta)
inbound.PlannedRunway = rwy
inbound.PlannedStar = inbound.ArrivalCandidates[rwy.Name].Star
inbound.PlannedArrivalTime = eta
inbound.InitialArrivalTime = inbound.ArrivalCandidates[rwy.Name].InitialArrivalTime
self.RunwayManager.RunwayInbounds[rwy.Name] = inbound
node.Inbound.PlannedRunway = rwy
node.Inbound.PlannedStar = node.ArrivalCandidates[rwy.Name].Star
node.Inbound.PlannedArrivalTime = eta
node.Inbound.PlannedArrivalRoute = node.ArrivalCandidates[rwy.Name].ArrivalRoute
node.Inbound.InitialArrivalTime = node.ArrivalCandidates[rwy.Name].InitialArrivalTime
self.RunwayManager.RunwayInbounds[rwy.Name] = node
delay = inbound.PlannedArrivalTime - inbound.InitialArrivalTime
delay = node.Inbound.PlannedArrivalTime - node.Inbound.InitialArrivalTime
if 0.0 < delay.total_seconds():
return delay, rwy
else:
@@ -108,8 +110,8 @@ class Ant:
# select the first inbound
self.InboundSelected[first] = True
delay, rwy = self.associateInbound(self.Configuration.Inbounds[first], self.Configuration.EarliestArrivalTime)
self.InboundScore[0] = Ant.qualifyDelay(delay, self.Configuration.Inbounds[first], rwy)
delay, rwy = self.associateInbound(self.Nodes[first], self.Configuration.EarliestArrivalTime)
self.InboundScore[0] = Ant.qualifyDelay(delay, self.Nodes[first], rwy)
self.Sequence.append(first)
self.SequenceDelay += delay
@@ -119,9 +121,9 @@ class Ant:
break
self.InboundSelected[index] = True
delay, rwy = self.associateInbound(self.Configuration.Inbounds[index], self.Configuration.EarliestArrivalTime)
delay, rwy = self.associateInbound(self.Nodes[index], self.Configuration.EarliestArrivalTime)
self.SequenceDelay += delay
self.InboundScore[len(self.Sequence)] = Ant.qualifyDelay(delay, self.Configuration.Inbounds[index], rwy)
self.InboundScore[len(self.Sequence)] = Ant.qualifyDelay(delay, self.Nodes[index], rwy)
self.Sequence.append(index)
# update the local pheromone
@@ -130,7 +132,7 @@ class Ant:
self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]] = max(self.Configuration.ThetaZero, update)
# validate that nothing went wrong
if len(self.Sequence) != len(self.Configuration.Inbounds):
if len(self.Sequence) != len(self.Nodes):
self.SequenceDelay = None
self.SequenceScore = None
self.Sequence = None

View File

@@ -1,48 +1,57 @@
#!/usr/bin/env python
from datetime import datetime as dt
from datetime import datetime, timedelta
import numpy as np
import pytz
import random
import sys
import pytz
import time
from aman.sys.aco.Ant import Ant
from aman.sys.aco.Configuration import Configuration
from aman.sys.aco.Node import Node
from aman.sys.aco.RunwayManager import RunwayManager
from aman.types.Inbound import Inbound
# This class implements the ant colony of the following paper:
# https://sci-hub.mksa.top/10.1109/cec.2019.8790135
class Colony:
def associateInbound(rwyManager : RunwayManager, inbound : Inbound, earliestArrivalTime : datetime, useITA : bool):
rwy, eta, _ = rwyManager.selectArrivalRunway(inbound, useITA, earliestArrivalTime)
def associateInbound(rwyManager : RunwayManager, node : Node, earliestArrivalTime : datetime, useITA : bool):
rwy, eta, _ = rwyManager.selectArrivalRunway(node, useITA, earliestArrivalTime)
eta = max(earliestArrivalTime, eta)
inbound.PlannedRunway = rwy
inbound.PlannedStar = inbound.ArrivalCandidates[rwy.Name].Star
inbound.PlannedArrivalRoute = inbound.ArrivalCandidates[rwy.Name].ArrivalRoute
inbound.PlannedArrivalTime = eta
inbound.InitialArrivalTime = inbound.ArrivalCandidates[rwy.Name].InitialArrivalTime
inbound.PlannedTrackmiles = inbound.ArrivalCandidates[rwy.Name].Trackmiles
rwyManager.RunwayInbounds[rwy.Name] = inbound
node.Inbound.PlannedRunway = rwy
node.Inbound.PlannedStar = node.ArrivalCandidates[rwy.Name].Star
node.Inbound.PlannedArrivalRoute = node.ArrivalCandidates[rwy.Name].ArrivalRoute
node.Inbound.PlannedArrivalTime = eta
node.Inbound.InitialArrivalTime = node.ArrivalCandidates[rwy.Name].InitialArrivalTime
node.Inbound.PlannedTrackmiles = node.ArrivalCandidates[rwy.Name].Trackmiles
rwyManager.RunwayInbounds[rwy.Name] = node
def calculateInitialCosts(rwyManager : RunwayManager, inbounds, earliestArrivalTime : datetime):
def calculateInitialCosts(rwyManager : RunwayManager, nodes, earliestArrivalTime : datetime):
overallDelay = timedelta(seconds = 0)
# assume that the inbounds are sorted in FCFS order
for inbound in inbounds:
Colony.associateInbound(rwyManager, inbound, earliestArrivalTime, False)
overallDelay += inbound.PlannedArrivalTime - inbound.InitialArrivalTime
# assume that the nodes are sorted in FCFS order
for node in nodes:
Colony.associateInbound(rwyManager, node, earliestArrivalTime, False)
overallDelay += node.Inbound.PlannedArrivalTime - node.Inbound.InitialArrivalTime
return overallDelay
def __init__(self, configuration : Configuration):
def __init__(self, inbounds, configuration : Configuration):
self.Configuration = configuration
self.ResultDelay = None
self.Result = None
self.Nodes = []
# create the new planning instances
currentTime = dt.utcfromtimestamp(int(time.time())).replace(tzinfo = pytz.UTC)
for inbound in inbounds:
self.Nodes.append(Node(inbound, currentTime, self.Configuration.WeatherModel, self.Configuration.NavData, self.Configuration.RunwayConstraints))
rwyManager = RunwayManager(self.Configuration)
delay = Colony.calculateInitialCosts(rwyManager, self.Configuration.Inbounds, self.Configuration.EarliestArrivalTime)
delay = Colony.calculateInitialCosts(rwyManager, self.Nodes, self.Configuration.EarliestArrivalTime)
self.FcfsDelay = delay
# run the optimization in every cycle to ensure optimal spacings based on TTG
@@ -50,8 +59,8 @@ class Colony:
delay = timedelta(seconds = 1.0)
# initial value for the optimization
self.Configuration.ThetaZero = 1.0 / (len(self.Configuration.Inbounds) * (delay.total_seconds() / 60.0))
self.PheromoneMatrix = np.ones(( len(self.Configuration.Inbounds), len(self.Configuration.Inbounds) ), dtype=float) * self.Configuration.ThetaZero
self.Configuration.ThetaZero = 1.0 / (len(self.Nodes) * (delay.total_seconds() / 60.0))
self.PheromoneMatrix = np.ones(( len(self.Nodes), len(self.Nodes) ), dtype=float) * self.Configuration.ThetaZero
def optimize(self):
# FCFS is the best solution
@@ -64,12 +73,12 @@ class Colony:
# run the optimization loops
for _ in range(0, self.Configuration.ExplorationRuns):
# select the first inbound
index = random.randint(1, len(self.Configuration.Inbounds)) - 1
index = random.randint(1, len(self.Nodes)) - 1
candidates = []
for _ in range(0, self.Configuration.AntCount):
# let the ant find a solution
ant = Ant(self.PheromoneMatrix, self.Configuration)
ant = Ant(self.PheromoneMatrix, self.Configuration, self.Nodes)
ant.findSolution(index)
# fallback to check if findSolution was successful
@@ -111,8 +120,8 @@ class Colony:
# finalize the sequence
rwyManager = RunwayManager(self.Configuration)
for i in range(0, len(bestSequence[1])):
self.Result.append(self.Configuration.Inbounds[bestSequence[1][i]])
Colony.associateInbound(rwyManager, self.Result[-1], self.Configuration.EarliestArrivalTime, True)
self.Result.append(self.Nodes[bestSequence[1][i]].Inbound)
Colony.associateInbound(rwyManager, self.Nodes[bestSequence[1][i]], self.Configuration.EarliestArrivalTime, True)
# the idea behind the TTL/TTG per waypoint is that the TTL and TTG needs to be achieved in the
# first 2/3 of the estimated trackmiles and assign it with a linear function to the waypoints

View File

@@ -9,9 +9,9 @@ class Configuration:
# the AMAN specific information
self.RunwayConstraints = kwargs.get('constraints', None)
self.PreceedingInbounds = kwargs.get('preceeding', None)
self.Inbounds = kwargs.get('inbounds', None)
self.EarliestArrivalTime = kwargs.get('earliest', None)
self.WeatherModel = kwargs.get('weather', None)
self.NavData = kwargs.get('nav', None)
# the ACO specific information
self.AntCount = kwargs.get('ants', 20)

193
aman/sys/aco/Node.py Normal file
View File

@@ -0,0 +1,193 @@
#!/usr/bin/env python
import math
import sys
from datetime import datetime, timedelta
from aman.config.AirportSequencing import AirportSequencing
from aman.formats.SctEseFormat import SctEseFormat
from aman.sys.WeatherModel import WeatherModel
from aman.types.ArrivalData import ArrivalData
from aman.types.ArrivalRoute import ArrivalRoute
from aman.types.ArrivalWaypoint import ArrivalWaypoint
from aman.types.Runway import Runway
from aman.types.Inbound import Inbound
from aman.types.Waypoint import Waypoint
class Node:
def findArrivalRoute(iaf : str, runway : Runway, navData : SctEseFormat):
for arrivalRunway in navData.ArrivalRoutes:
if arrivalRunway == runway.Name:
stars = navData.ArrivalRoutes[arrivalRunway]
for star in stars:
if 0 != len(star.Route) and iaf == star.Iaf.Name:
return star
return None
def arrivalEstimation(self, runway : Runway, star : ArrivalRoute, weather : WeatherModel):
# calculate remaining trackmiles
trackmiles = self.PredictedDistanceToIAF
start = star.Route[0]
turnIndices = [ -1, -1 ]
constraints = []
for i in range(0, len(star.Route)):
# identified the base turn
if True == star.Route[i].BaseTurn:
turnIndices[0] = i
# identified the final turn
elif -1 != turnIndices[0] and True == star.Route[i].FinalTurn:
turnIndices[1] = i
# skip waypoints until the final turn point is found
elif -1 != turnIndices[0] and -1 == turnIndices[1]:
continue
trackmiles += start.haversine(star.Route[i]) * 0.539957
# check if a new constraint is defined
altitude = -1
speed = -1
if None != star.Route[i].Altitude:
altitude = star.Route[i].Altitude
if None != star.Route[i].Speed:
speed = star.Route[i].Speed
if -1 != altitude or -1 != speed:
constraints.append([ trackmiles, altitude, speed ])
start = star.Route[i]
# add the remaining distance from the last waypoint to the runway threshold
trackmiles += start.haversine(runway.Start)
if turnIndices[0] > turnIndices[1] or (-1 == turnIndices[1] and -1 != turnIndices[0]):
sys.stderr.write('Invalid constraint definition found for ' + star.Name)
sys.exit(-1)
# calculate descend profile
currentHeading = Waypoint(latitude = self.Inbound.Report.position.latitude, longitude = self.Inbound.Report.position.longitude).bearing(star.Route[0])
currentIAS = self.Inbound.PerformanceData.ias(self.Inbound.Report.dynamics.altitude, trackmiles)
currentPosition = [ self.Inbound.Report.dynamics.altitude, self.Inbound.Report.dynamics.groundSpeed ]
distanceToWaypoint = self.PredictedDistanceToIAF
flightTimeSeconds = 0
nextWaypointIndex = 0
flownDistance = 0.0
arrivalRoute = [ ArrivalWaypoint(waypoint = star.Route[0], trackmiles = distanceToWaypoint) ]
while True:
# check if a constraint cleanup is needed and if a speed-update is needed
if 0 != len(constraints) and flownDistance >= constraints[0][0]:
if -1 != constraints[0][2]:
currentIAS = min(constraints[0][2], self.Inbound.PerformanceData.ias(self.Inbound.Report.dynamics.altitude, trackmiles - flownDistance))
currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
constraints.pop(0)
# search next altitude constraint
altitudeDistance = 0
nextAltitude = 0
for constraint in constraints:
if -1 != constraint[1]:
altitudeDistance = constraint[0]
nextAltitude = constraint[1]
break
# check if update of altitude and speed is needed on 3° glide
if currentPosition[0] > nextAltitude and ((currentPosition[0] - nextAltitude) / 1000 * 3) > (altitudeDistance - flownDistance):
oldGroundspeed = currentPosition[1]
descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6
newAltitude = currentPosition[0] - descendRate
if 0 > newAltitude:
newAltitude = 0
currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) ]
distance = (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6
else:
distance = currentPosition[1] / 60 / 6
# update the statistics
distanceToWaypoint -= distance
flownDistance += distance
newIAS = min(currentIAS, self.Inbound.PerformanceData.ias(currentPosition[0], trackmiles - flownDistance))
if newIAS < currentIAS:
currentPosition[1] = min(weather.calculateGS(currentPosition[0], newIAS, currentHeading), currentPosition[1])
currentIAS = newIAS
flightTimeSeconds += 10
if flownDistance >= trackmiles:
break
# check if we follow a new waypoint pair
if 0 >= distanceToWaypoint:
lastWaypointIndex = nextWaypointIndex
nextWaypointIndex += 1
arrivalRoute[-1].FlightTime = timedelta(seconds = flightTimeSeconds)
arrivalRoute[-1].ETA = self.Inbound.ReportTime + arrivalRoute[-1].FlightTime
arrivalRoute[-1].PTA = arrivalRoute[-1].ETA
arrivalRoute[-1].Altitude = currentPosition[0]
arrivalRoute[-1].IndicatedAirspeed = currentIAS
arrivalRoute[-1].GroundSpeed = currentPosition[1]
# check if a skip from base to final turn waypoints is needed
if -1 != turnIndices[0] and nextWaypointIndex > turnIndices[0] and nextWaypointIndex < turnIndices[1]:
nextWaypointIndex = turnIndices[1]
# update the statistics
if nextWaypointIndex < len(star.Route):
distanceToWaypoint = star.Route[lastWaypointIndex].haversine(star.Route[nextWaypointIndex]) * 0.539957
currentHeading = star.Route[lastWaypointIndex].bearing(star.Route[nextWaypointIndex])
currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
arrivalRoute.append(ArrivalWaypoint(waypoint = star.Route[nextWaypointIndex], trackmiles = arrivalRoute[-1].Trackmiles + distanceToWaypoint))
return timedelta(seconds = flightTimeSeconds), trackmiles, arrivalRoute
def __init__(self, inbound : Inbound, referenceTime : datetime, weatherModel : WeatherModel,
navData : SctEseFormat, sequencingConfig : AirportSequencing):
self.PredictedDistanceToIAF = inbound.Report.distanceToIAF
self.ArrivalCandidates = {}
self.Inbound = inbound
# predict the distance to IAF
timePrediction = (referenceTime - inbound.ReportTime).total_seconds()
if 0 != timePrediction and 0 != len(sequencingConfig.ActiveArrivalRunways):
# calculate current motion information
course = weatherModel.estimateCourse(inbound.Report.dynamics.altitude, inbound.Report.dynamics.groundSpeed, inbound.Report.dynamics.heading)
tempWaypoint = Waypoint(longitude = inbound.CurrentPosition.longitude, latitude = inbound.CurrentPosition.latitude)
gs = inbound.Report.dynamics.groundSpeed * 0.514444 # ground speed in m/s
distance = gs * timePrediction * 0.000539957 # distance back to nm
# calculate the bearing between the current position and the IAF
star = Node.findArrivalRoute(inbound.Report.initialApproachFix, sequencingConfig.ActiveArrivalRunways[0].Runway, navData)
if None != star:
bearing = tempWaypoint.bearing(star.Route[0])
else:
bearing = inbound.Report.dynamics.heading
# calculate the distance based on the flown distance and update the predicted distance
self.PredictedDistanceToIAF -= math.cos(math.radians(bearing - course)) * distance
if 0.0 > self.PredictedDistanceToIAF:
self.PredictedDistanceToIAF = 0.0
# calculate the timings for the different arrival runways
for identifier in sequencingConfig.ActiveArrivalRunways:
star = Node.findArrivalRoute(self.Inbound.Report.initialApproachFix, identifier.Runway, navData)
if None != star:
flightTime, trackmiles, arrivalRoute = self.arrivalEstimation(identifier.Runway, star, weatherModel)
avgSpeed = trackmiles / (float(flightTime.seconds) / 3600.0)
# the closer we get to the IAF the less time delta can be achieved by short cuts, delay vectors or speeds
ratio = min(2.0, max(0.0, self.PredictedDistanceToIAF / (trackmiles - self.PredictedDistanceToIAF)))
possibleTimeDelta = (trackmiles / (avgSpeed * 0.9)) * 60
ttg = timedelta(minutes = (possibleTimeDelta - flightTime.total_seconds() / 60) * ratio)
ttl = timedelta(minutes = (possibleTimeDelta - flightTime.total_seconds() / 60))
ita = self.Inbound.ReportTime + flightTime
earliest = ita - ttg
latest = ita + ttl
self.ArrivalCandidates[identifier.Runway.Name] = ArrivalData(ttg = ttg, star = star, ita = ita, earliest = earliest,
ttl = ttl, latest = latest, route = arrivalRoute,
trackmiles = trackmiles)
if None == self.Inbound.InitialArrivalTime:
self.Inbound.InitialArrivalTime = self.ArrivalCandidates[identifier.Runway.Name].InitialArrivalTime

View File

@@ -4,7 +4,7 @@ from datetime import datetime, timedelta
from aman.sys.aco.Configuration import Configuration
from aman.sys.aco.Constraints import SpacingConstraints
from aman.types.Inbound import Inbound
from aman.sys.aco.Node import Node
class RunwayManager:
def __init__(self, configuration : Configuration):
@@ -20,33 +20,33 @@ class RunwayManager:
if not runway.Runway.Name in self.RunwayInbounds:
self.RunwayInbounds[runway.Runway.Name] = None
def calculateEarliestArrivalTime(self, runway : str, inbound : Inbound, useETA : bool, earliestArrivalTime : datetime):
def calculateEarliestArrivalTime(self, runway : str, node : Node, useETA : bool, earliestArrivalTime : datetime):
constrainedETA = None
if None != self.RunwayInbounds[runway]:
# get the WTC based ETA
if None == self.RunwayInbounds[runway].WTC or None == inbound.WTC:
if None == self.RunwayInbounds[runway].Inbound.WTC or None == node.Inbound.WTC:
spacingWTC = 3
else:
spacingWTC = self.Spacings[self.RunwayInbounds[runway].WTC][inbound.WTC]
spacingWTC = self.Spacings[self.RunwayInbounds[runway].Inbound.WTC][node.Inbound.WTC]
# get the runway time spacing
spacingRunway = self.Configuration.RunwayConstraints.findRunway(runway).Spacing
constrainedETA = self.RunwayInbounds[runway].PlannedArrivalTime + timedelta(minutes = max(spacingWTC, spacingRunway) / (inbound.PerformanceData.SpeedApproach / 60))
constrainedETA = self.RunwayInbounds[runway].Inbound.PlannedArrivalTime + timedelta(minutes = max(spacingWTC, spacingRunway) / (node.Inbound.PerformanceData.SpeedApproach / 60))
# calculate the arrival times for the dependent inbounds
for dependentRunway in self.Configuration.RunwayConstraints.findDependentRunways(runway):
if None != self.RunwayInbounds[dependentRunway.Runway.Name]:
# TODO staggered spacing variabel
candidate = self.RunwayInbounds[dependentRunway.Runway.Name].PlannedArrivalTime + timedelta(minutes = 3 / (inbound.PerformanceData.SpeedApproach / 60))
candidate = self.RunwayInbounds[dependentRunway.Runway.Name].Inbound.PlannedArrivalTime + timedelta(minutes = 3 / (node.Inbound.PerformanceData.SpeedApproach / 60))
if None == constrainedETA or candidate > constrainedETA:
constrainedETA = candidate
# get the arrival time on the runway of the inbound
if True == useETA:
arrivalTime = inbound.ArrivalCandidates[runway].EarliestArrivalTime
arrivalTime = node.ArrivalCandidates[runway].EarliestArrivalTime
else:
arrivalTime = inbound.ArrivalCandidates[runway].InitialArrivalTime
arrivalTime = node.ArrivalCandidates[runway].InitialArrivalTime
if None == constrainedETA:
return max(arrivalTime, earliestArrivalTime), timedelta(seconds = 0)
@@ -57,7 +57,7 @@ class RunwayManager:
else:
return eta, timedelta(seconds = 0)
def selectArrivalRunway(self, inbound : Inbound, useETA : bool, earliestArrivalTime : datetime):
def selectArrivalRunway(self, node : Node, useETA : bool, earliestArrivalTime : datetime):
availableRunways = self.Configuration.RunwayConstraints.ActiveArrivalRunways
#if 1 < len(availableRunways):
@@ -68,7 +68,7 @@ class RunwayManager:
# fallback to check if we have available runways
if 0 == len(availableRunways):
runway = self.Configuration.RunwayConstraints.ActiveArrivalRunways[0]
return runway, self.calculateEarliestArrivalTime(runway.Runway.Name, inbound, useETA, earliestArrivalTime)
return runway, self.calculateEarliestArrivalTime(runway.Runway.Name, node, useETA, earliestArrivalTime)
# start with the beginning
selectedRunway = None
@@ -77,7 +77,7 @@ class RunwayManager:
# get the runway with the earliest ETA
for runway in availableRunways:
candidate, delta = self.calculateEarliestArrivalTime(runway.Runway.Name, inbound, useETA, earliestArrivalTime)
candidate, delta = self.calculateEarliestArrivalTime(runway.Runway.Name, node, useETA, earliestArrivalTime)
if None == eta or eta > candidate:
selectedRunway = runway.Runway
lostTime = delta

View File

@@ -9,8 +9,6 @@ class ArrivalData:
self.Star = None
self.MaximumTimeToGain = None
self.MaximumTimeToLose = None
self.FlightTimeUntilIaf = None
self.FlightTimeUntilTouchdown = None
self.InitialArrivalTime = None
self.EarliestArrivalTime = None
self.LatestArrivalTime = None
@@ -52,16 +50,6 @@ class ArrivalData:
self.LatestArrivalTime = value
else:
raise Exception('Invalid type for latest')
elif 'touchdown' == key:
if True == isinstance(value, timedelta):
self.FlightTimeUntilTouchdown = value
else:
raise Exception('Invalid type for touchdown')
elif 'entry' == key:
if True == isinstance(value, timedelta):
self.FlightTimeUntilIaf = value
else:
raise Exception('Invalid type for entry')
elif 'route' == key:
self.ArrivalRoute = value
elif 'trackmiles' == key:

View File

@@ -1,48 +1,27 @@
#!/usr/bin/env python
import pytz
import sys
from datetime import datetime, timedelta
from datetime import datetime
from aman.com import AircraftReport_pb2
from aman.config.AirportSequencing import AirportSequencing
from aman.formats.SctEseFormat import SctEseFormat
from aman.sys.WeatherModel import WeatherModel
from aman.types.ArrivalWaypoint import ArrivalWaypoint
from aman.types.PerformanceData import PerformanceData
from aman.types.ArrivalRoute import ArrivalRoute
from aman.types.ArrivalData import ArrivalData
from aman.types.Runway import Runway
from aman.types.Waypoint import Waypoint
class Inbound:
def findArrivalRoute(self, runway : Runway, navData : SctEseFormat):
for arrivalRunway in navData.ArrivalRoutes:
if arrivalRunway == runway.Name:
stars = navData.ArrivalRoutes[arrivalRunway]
for star in stars:
if 0 != len(star.Route) and self.Report.initialApproachFix == star.Iaf.Name:
return star
return None
def __init__(self, report : AircraftReport_pb2.AircraftReport, sequencingConfig : AirportSequencing, navData : SctEseFormat,
performanceData : PerformanceData, weatherModel : WeatherModel):
def __init__(self, report : AircraftReport_pb2.AircraftReport, performanceData : PerformanceData):
self.Report = report
self.Callsign = report.aircraft.callsign
self.CurrentPosition = report.position
self.ReportTime = datetime.strptime(report.reportTime + '+0000', '%Y%m%d%H%M%S%z').replace(tzinfo = pytz.UTC)
self.InitialArrivalTime = None
self.EarliestArrivalTime = None
self.PlannedArrivalTime = None
self.EstimatedStarEntryTime = None
self.PlannedRunway = None
self.PlannedStar = None
self.PlannedArrivalRoute = None
self.PlannedTrackmiles = None
self.ArrivalCandidates = {}
self.WTC = None
self.FixedSequence = False
self.WTC = None
# analyze the WTC
wtc = report.aircraft.wtc.upper()
@@ -54,157 +33,3 @@ class Inbound:
self.PerformanceData = performanceData.Aircrafts[self.Report.aircraft.type]
if None == self.PerformanceData:
self.PerformanceData = performanceData.Aircrafts['A320']
# calculate the timings for the different arrival runways
for identifier in sequencingConfig.ActiveArrivalRunways:
star = self.findArrivalRoute(identifier.Runway, navData)
if None != star:
flightTime, flightTimeUntilIaf, trackmiles, arrivalRoute = self.arrivalEstimation(identifier.Runway, star, weatherModel)
avgSpeed = trackmiles / (float(flightTime.seconds) / 3600.0)
# the closer we get to the IAF the less time delta can be achieved by short cuts, delay vectors or speeds
ratio = min(2.0, max(0.0, self.Report.distanceToIAF / (trackmiles - self.Report.distanceToIAF)))
possibleTimeDelta = (trackmiles / (avgSpeed * 0.9)) * 60
ttg = timedelta(minutes = (possibleTimeDelta - flightTime.total_seconds() / 60) * ratio)
ttl = timedelta(minutes = (possibleTimeDelta - flightTime.total_seconds() / 60))
ita = self.ReportTime + flightTime
earliest = ita - ttg
latest = ita + ttl
self.ArrivalCandidates[identifier.Runway.Name] = ArrivalData(ttg = ttg, star = star, ita = ita, earliest = earliest,
entry = flightTimeUntilIaf, touchdown = flightTime,
ttl = ttl, latest = latest, route = arrivalRoute, trackmiles = trackmiles)
# calculate the first values for later plannings
for candidate in self.ArrivalCandidates:
if None == self.EarliestArrivalTime or self.ArrivalCandidates[candidate].EarliestArrivalTime < self.EarliestArrivalTime:
self.InitialArrivalTime = self.ArrivalCandidates[candidate].InitialArrivalTime
self.EarliestArrivalTime = self.ArrivalCandidates[candidate].EarliestArrivalTime
self.EstimatedStarEntryTime = self.ReportTime + self.ArrivalCandidates[candidate].FlightTimeUntilIaf
self.PlannedStar = self.ArrivalCandidates[candidate].Star
if None != self.PlannedStar:
for runway in navData.Runways[self.Report.destination.upper()]:
if runway.Name == self.PlannedStar.Runway:
self.PlannedRunway = runway
break
def arrivalEstimation(self, runway : Runway, star : ArrivalRoute, weather : WeatherModel):
# calculate remaining trackmiles
trackmiles = self.Report.distanceToIAF
start = star.Route[0]
turnIndices = [ -1, -1 ]
constraints = []
for i in range(0, len(star.Route)):
# identified the base turn
if True == star.Route[i].BaseTurn:
turnIndices[0] = i
# identified the final turn
elif -1 != turnIndices[0] and True == star.Route[i].FinalTurn:
turnIndices[1] = i
# skip waypoints until the final turn point is found
elif -1 != turnIndices[0] and -1 == turnIndices[1]:
continue
trackmiles += start.haversine(star.Route[i]) * 0.539957
# check if a new constraint is defined
altitude = -1
speed = -1
if None != star.Route[i].Altitude:
altitude = star.Route[i].Altitude
if None != star.Route[i].Speed:
speed = star.Route[i].Speed
if -1 != altitude or -1 != speed:
constraints.append([ trackmiles, altitude, speed ])
start = star.Route[i]
# add the remaining distance from the last waypoint to the runway threshold
trackmiles += start.haversine(runway.Start)
if turnIndices[0] > turnIndices[1] or (-1 == turnIndices[1] and -1 != turnIndices[0]):
sys.stderr.write('Invalid constraint definition found for ' + star.Name)
sys.exit(-1)
# calculate descend profile
currentHeading = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude).bearing(star.Route[0])
currentIAS = self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles)
currentPosition = [ self.Report.dynamics.altitude, self.Report.dynamics.groundSpeed ]
distanceToWaypoint = self.Report.distanceToIAF
flightTimeUntilIafSeconds = 0
flightTimeSeconds = 0
nextWaypointIndex = 0
flownDistance = 0.0
arrivalRoute = [ ArrivalWaypoint(waypoint = star.Route[0], trackmiles = distanceToWaypoint) ]
while True:
# check if a constraint cleanup is needed and if a speed-update is needed
if 0 != len(constraints) and flownDistance >= constraints[0][0]:
if -1 != constraints[0][2]:
currentIAS = min(constraints[0][2], self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles - flownDistance))
currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
constraints.pop(0)
# search next altitude constraint
altitudeDistance = 0
nextAltitude = 0
for constraint in constraints:
if -1 != constraint[1]:
altitudeDistance = constraint[0]
nextAltitude = constraint[1]
break
# check if update of altitude and speed is needed on 3° glide
if currentPosition[0] > nextAltitude and ((currentPosition[0] - nextAltitude) / 1000 * 3) > (altitudeDistance - flownDistance):
oldGroundspeed = currentPosition[1]
descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6
newAltitude = currentPosition[0] - descendRate
if 0 > newAltitude:
newAltitude = 0
currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) ]
distance = (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6
else:
distance = currentPosition[1] / 60 / 6
# update the statistics
distanceToWaypoint -= distance
flownDistance += distance
newIAS = min(currentIAS, self.PerformanceData.ias(currentPosition[0], trackmiles - flownDistance))
if newIAS < currentIAS:
currentPosition[1] = min(weather.calculateGS(currentPosition[0], newIAS, currentHeading), currentPosition[1])
currentIAS = newIAS
flightTimeSeconds += 10
if flownDistance <= self.Report.distanceToIAF:
flightTimeUntilIafSeconds += 10
if flownDistance >= trackmiles:
break
# check if we follow a new waypoint pair
if 0 >= distanceToWaypoint:
lastWaypointIndex = nextWaypointIndex
nextWaypointIndex += 1
arrivalRoute[-1].FlightTime = timedelta(seconds = flightTimeSeconds)
arrivalRoute[-1].ETA = self.ReportTime + arrivalRoute[-1].FlightTime
arrivalRoute[-1].PTA = arrivalRoute[-1].ETA
arrivalRoute[-1].Altitude = currentPosition[0]
arrivalRoute[-1].IndicatedAirspeed = currentIAS
arrivalRoute[-1].GroundSpeed = currentPosition[1]
# check if a skip from base to final turn waypoints is needed
if -1 != turnIndices[0] and nextWaypointIndex > turnIndices[0] and nextWaypointIndex < turnIndices[1]:
nextWaypointIndex = turnIndices[1]
# update the statistics
if nextWaypointIndex < len(star.Route):
distanceToWaypoint = star.Route[lastWaypointIndex].haversine(star.Route[nextWaypointIndex]) * 0.539957
currentHeading = star.Route[lastWaypointIndex].bearing(star.Route[nextWaypointIndex])
currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
arrivalRoute.append(ArrivalWaypoint(waypoint = star.Route[nextWaypointIndex], trackmiles = arrivalRoute[-1].Trackmiles + distanceToWaypoint))
return timedelta(seconds = flightTimeSeconds), timedelta(seconds = flightTimeUntilIafSeconds), trackmiles, arrivalRoute