Files
aman-sys/aman/sys/RecedingHorizonControl.py

225 lines
10 KiB
Python

#!/usr/bin/env python
import time
from datetime import datetime as dt
from datetime import timedelta
import pytz
from aman.config.Airport import Airport
from aman.config.AirportSequencing import AirportSequencing
from aman.config.RHC import RHC
from aman.sys.aco.Node import Node
from aman.sys.RecedingHorizonWindow import RecedingHorizonWindow
from aman.types.Inbound import Inbound
class RecedingHorizonControl:
def __init__(self, config : RHC):
self.Windows = []
# contains the current index and the missed update counter
self.AssignedWindow = {}
self.Configuration = config
self.FreezedIndex = int(self.Configuration.FixedBeforeArrival.seconds / self.Configuration.WindowSize)
def insertInWindow(self, inbound : Inbound, usePTA : bool):
if False == usePTA:
referenceTime = inbound.EnrouteArrivalTime
else:
referenceTime = inbound.PlannedArrivalTime
inserted = False
for i in range(0, len(self.Windows)):
window = self.Windows[i]
# find the correct window
if window.StartTime <= referenceTime and window.EndTime > referenceTime:
self.AssignedWindow[inbound.Callsign] = [ i, 0 ]
inbound.FixedSequence = i < self.FreezedIndex
if True == inbound.FixedSequence and None == inbound.PlannedArrivalTime:
inbound.PlannedArrivalTime = inbound.EnrouteArrivalTime
window.insert(inbound)
inserted = True
break
# create the new window
if False == inserted:
if 0 != len(self.Windows):
lastWindowTime = self.Windows[-1].EndTime
else:
lastWindowTime = dt.utcfromtimestamp(int(time.time())).replace(tzinfo = pytz.UTC)
timestep = timedelta(seconds = self.Configuration.WindowSize)
while True:
self.Windows.append(RecedingHorizonWindow(lastWindowTime, lastWindowTime + timestep))
if self.Windows[-1].EndTime > referenceTime:
window = self.Windows[-1]
window.insert(inbound)
self.AssignedWindow[inbound.Callsign] = [ len(self.Windows) - 1, 0 ]
inbound.FixedSequence = len(self.Windows) < self.FreezedIndex
if True == inbound.FixedSequence and None == inbound.PlannedArrivalTime:
inbound.PlannedArrivalTime = inbound.EnrouteArrivalTime
break
lastWindowTime = self.Windows[-1].EndTime
window.Inbounds.sort(key = lambda x: x.PlannedArrivalTime if None != x.PlannedArrivalTime else x.EnrouteArrivalTime)
def updateReport(self, inbound : Inbound):
# check if we need to update
if inbound.Callsign in self.AssignedWindow:
index = self.AssignedWindow[inbound.Callsign][0]
self.AssignedWindow[inbound.Callsign][1] = 0
plannedInbound = self.Windows[index].inbound(inbound.Callsign)
plannedInbound.Report = inbound.Report
plannedInbound.ReportTime = inbound.ReportTime
plannedInbound.CurrentPosition = inbound.CurrentPosition
plannedInbound.RequestedRunway = inbound.RequestedRunway
# ingore fixed updates
if True == plannedInbound.FixedSequence or index <= self.FreezedIndex:
plannedInbound.FixedSequence = True
return
plannedInbound.WTC = inbound.WTC
# check if we need to update the inbound
if None == plannedInbound.PlannedStar:
reference = inbound.EnrouteArrivalTime
if plannedInbound.EnrouteArrivalTime > reference:
reference = plannedInbound.EnrouteArrivalTime
if reference < self.Windows[index].StartTime or reference >= self.Windows[index].EndTime:
self.Windows[index].remove(inbound.Callsign)
self.AssignedWindow.pop(inbound.Callsign)
inbound.EnrouteArrivalTime = reference
self.updateReport(inbound)
else:
plannedInbound.EnrouteArrivalTime = reference
self.Windows[index].Inbounds.sort(key = lambda x: x.PlannedArrivalTime if None != x.PlannedArrivalTime else x.EnrouteArrivalTime)
else:
self.insertInWindow(inbound, False)
def resequenceInbound(self, inbound : Inbound):
index = self.AssignedWindow[inbound.Callsign][0]
if inbound.PlannedArrivalTime < self.Windows[index].StartTime or inbound.PlannedArrivalTime >= self.Windows[index].EndTime:
self.Windows[index].remove(inbound.Callsign)
self.AssignedWindow.pop(inbound.Callsign)
self.insertInWindow(inbound, True)
else:
inbound.FixedSequence = index < self.FreezedIndex
self.Windows[index].Inbounds.sort(key = lambda x: x.PlannedArrivalTime if None != x.PlannedArrivalTime else x.EnrouteArrivalTime)
def latestFixedInbounds(self, configuration : Airport, sequenceConfiguration : AirportSequencing):
if 0 == len(self.Windows):
return None, None
# create the runway tree
runwayInbounds = {}
for runway in sequenceConfiguration.ActiveArrivalRunways:
runwayInbounds[runway.Runway.Name] = None
# create the IAF tree
iafInbounds = {}
for star in configuration.ArrivalRouteConstraints:
altitude = configuration.ArrivalRouteConstraints[star][0].Altitude
iaf = configuration.ArrivalRouteConstraints[star][0].Name
if iaf not in iafInbounds:
iafInbounds[iaf] = { altitude : None }
elif altitude not in iafInbounds[iaf]:
iafInbounds[iaf][altitude] = None
# associate the inbounds to the runways and the IAFs
for i in range(min(self.FreezedIndex, len(self.Windows)), -1, -1):
for inbound in self.Windows[i].Inbounds:
if None == inbound.PlannedRunway or None == inbound.PlannedArrivalRoute:
continue
node = Node(inbound, None, None, None, None)
node.EstimatedTouchdownTime = inbound.PlannedArrivalTime
node.EstimatedIafAltitude = inbound.PlannedArrivalRoute[0].Altitude
node.EstimatedIafTime = inbound.PlannedArrivalRoute[0].PTA
if inbound.PlannedRunway.Name in runwayInbounds:
if None == runwayInbounds[inbound.PlannedRunway.Name] or runwayInbounds[inbound.PlannedRunway.Name].EstimatedTouchdownTime < node.EstimatedTouchdownTime:
runwayInbounds[inbound.PlannedRunway.Name] = node
if inbound.PlannedArrivalRoute[0].Name in iafInbounds:
delta = 100000.0
targetLevel = None
for level in iafInbounds[inbound.PlannedArrivalRoute[0].Name]:
difference = abs(level - inbound.PlannedArrivalRoute[0].Altitude)
if difference < delta:
delta = difference
targetLevel = level
if None == iafInbounds[inbound.PlannedArrivalRoute[0].Name][targetLevel]:
iafInbounds[inbound.PlannedArrivalRoute[0].Name][targetLevel] = Node
elif iafInbounds[inbound.PlannedArrivalRoute[0].Name][targetLevel].EstimatedIafTime < node.EstimatedIafTime:
iafInbounds[inbound.PlannedArrivalRoute[0].Name][targetLevel] = Node
return runwayInbounds, iafInbounds
def optimizationRelevantInbounds(self):
# no new inbounds
if len(self.Windows) <= self.FreezedIndex + 1:
return None, None
inbounds = []
earliestArrivalTime = self.Windows[self.FreezedIndex + 1].StartTime
# check the overlapping windows
for i in range(self.FreezedIndex + 1, len(self.Windows)):
for inbound in self.Windows[i].Inbounds:
inbounds.append(inbound)
if 20 <= len(inbounds):
break
# check if we found relevant inbounds
if 0 != len(inbounds):
inbounds.sort(key = lambda x: x.PlannedArrivalTime if None != x.PlannedArrivalTime else x.EnrouteArrivalTime)
return inbounds, earliestArrivalTime
else:
return None, None
def sequence(self):
inbounds = []
for i in range(0, len(self.Windows)):
for inbound in self.Windows[i].Inbounds:
inbounds.append(inbound)
return inbounds
def cleanupWindows(self):
currentUtc = dt.utcfromtimestamp(int(time.time())).replace(tzinfo = pytz.UTC)
offsetCorrection = 0
# delete the non-required windows
while 0 != len(self.Windows) and currentUtc > self.Windows[0].EndTime:
# cleanup the association table
for inbound in self.Windows[0].Inbounds:
self.AssignedWindow.pop(inbound.Callsign)
offsetCorrection += 1
self.Windows.pop(0)
# correct the association table
if 0 != offsetCorrection:
for callsign in self.AssignedWindow:
self.AssignedWindow[callsign][0] -= offsetCorrection
if self.AssignedWindow[callsign][0] <= self.FreezedIndex:
self.Windows[self.AssignedWindow[callsign][0]].inbound(callsign).FixedSequence = True
# delete the non-updated aircrafts and increase the missed-counter for later runs
callsigns = []
for callsign in self.AssignedWindow:
if 2 < self.AssignedWindow[callsign][1]:
self.Windows[self.AssignedWindow[callsign][0]].remove(callsign)
callsigns.append(callsign)
self.AssignedWindow[callsign][1] += 1
for callsign in callsigns:
self.AssignedWindow.pop(callsign)