Files
aman-sys/aman/sys/RecedingHorizonControl.py
2021-11-11 20:21:11 +01:00

181 lines
7.6 KiB
Python

#!/usr/bin/env python
import time
from datetime import datetime as dt
from datetime import timedelta
import pytz
from aman.config.RHC import RHC
from aman.sys.RecedingHorizonWindow import RecedingHorizonWindow
from aman.types.Inbound import Inbound
class RecedingHorizonControl:
def __init__(self, config : RHC):
self.Windows = []
# contains the current index and the missed update counter
self.AssignedWindow = {}
self.Configuration = config
self.FreezedIndex = int(self.Configuration.FixedBeforeArrival.seconds / self.Configuration.WindowSize)
def insertInWindow(self, inbound : Inbound, usePTA : bool):
if False == usePTA:
referenceTime = inbound.EarliestArrivalTime
else:
referenceTime = inbound.PlannedArrivalTime
inserted = False
for i in range(0, len(self.Windows)):
window = self.Windows[i]
# find the correct window
if window.StartTime <= referenceTime and window.EndTime > referenceTime:
self.AssignedWindow[inbound.Callsign] = [ i, 0 ]
if False == usePTA:
inbound.PlannedArrivalTime = inbound.EarliestArrivalTime
inbound.FixedSequence = i < self.FreezedIndex
window.insert(inbound)
inserted = True
break
# create the new window
if False == inserted:
if 0 != len(self.Windows):
lastWindowTime = self.Windows[-1].EndTime
else:
lastWindowTime = dt.utcfromtimestamp(int(time.time())).replace(tzinfo = pytz.UTC)
timestep = timedelta(seconds = self.Configuration.WindowSize)
while True:
self.Windows.append(RecedingHorizonWindow(lastWindowTime, lastWindowTime + timestep))
if self.Windows[-1].EndTime > referenceTime:
window = self.Windows[-1]
self.AssignedWindow[inbound.Callsign] = [ len(self.Windows) - 1, 0 ]
window.insert(inbound)
if False == usePTA:
inbound.PlannedArrivalTime = max(window.StartTime, inbound.EarliestArrivalTime)
inbound.FixedSequence = len(self.Windows) < self.FreezedIndex
break
lastWindowTime = self.Windows[-1].EndTime
window.Inbounds.sort(key = lambda x: x.PlannedArrivalTime)
def updateReport(self, inbound : Inbound):
# check if we need to update
if inbound.Callsign in self.AssignedWindow:
index = self.AssignedWindow[inbound.Callsign][0]
self.AssignedWindow[inbound.Callsign][1] = 0
plannedInbound = self.Windows[index].inbound(inbound.Callsign)
plannedInbound.CurrentPosition = inbound.CurrentPosition
# ingore fixed updates
if True == plannedInbound.FixedSequence or index < self.FreezedIndex:
return
plannedInbound.ArrivalCandidates = inbound.ArrivalCandidates
plannedInbound.WTC = inbound.WTC
# check if we need to update the inbound
if plannedInbound.PlannedArrivalTime < inbound.EarliestArrivalTime:
if inbound.EarliestArrivalTime < self.Windows[index].StartTime or inbound.EarliestArrivalTime >= self.Windows[index].EndTime:
self.Windows[index].remove(inbound.Callsign)
self.AssignedWindow.pop(inbound.Callsign)
self.updateReport(inbound)
else:
plannedInbound.PlannedStar = inbound.PlannedStar
plannedInbound.PlannedRunway = inbound.PlannedRunway
plannedInbound.InitialArrivalTime = inbound.InitialArrivalTime
if plannedInbound.PlannedArrivalTime == plannedInbound.EarliestArrivalTime:
plannedInbound.PlannedArrivalTime = inbound.EarliestArrivalTime
plannedInbound.EarliestArrivalTime = inbound.EarliestArrivalTime
else:
self.insertInWindow(inbound, False)
def resequenceInbound(self, inbound : Inbound):
index = self.AssignedWindow[inbound.Callsign][0]
if inbound.PlannedArrivalTime < self.Windows[index].StartTime or inbound.PlannedArrivalTime >= self.Windows[index].EndTime:
self.Windows[index].remove(inbound.Callsign)
self.AssignedWindow.pop(inbound.Callsign)
self.insertInWindow(inbound, True)
else:
inbound.FixedSequence = index < self.FreezedIndex
self.Windows[index].Inbounds.sort(key = lambda x: x.PlannedArrivalTime)
def lastFixedInboundOnRunway(self, runway : str):
# no inbounds available
if 0 == len(self.Windows):
return None
# search from the back to the front to find the last inbound
for i in range(min(self.FreezedIndex, len(self.Windows)), -1, -1):
for inbound in self.Windows[i].Inbounds:
if runway == inbound.PlannedRunway.Name:
return inbound
# no inbound found
return None
def optimizationRelevantInbounds(self):
# no new inbounds
if len(self.Windows) <= self.FreezedIndex:
return None, None
inbounds = []
earliestArrivalTime = None
# check the overlapping windows
for i in range(0, len(self.Windows)):
#for i in range(self.FreezedIndex + 1, min(len(self.Windows), self.FreezedIndex + 1 + self.Configuration.WindowOverlap)):
if None == earliestArrivalTime:
earliestArrivalTime = self.Windows[i].StartTime
for inbound in self.Windows[i].Inbounds:
inbounds.append(inbound)
# check if we found relevant inbounds
if 0 != len(inbounds):
inbounds.sort(key = lambda x: x.InitialArrivalTime)
return inbounds, earliestArrivalTime
else:
return None, None
def sequence(self):
inbounds = []
for i in range(0, len(self.Windows)):
for inbound in self.Windows[i].Inbounds:
inbounds.append(inbound)
return inbounds
def cleanupWindows(self):
currentUtc = dt.utcfromtimestamp(int(time.time())).replace(tzinfo = pytz.UTC)
offsetCorrection = 0
# delete the non-required windows
while 0 != len(self.Windows) and currentUtc > self.Windows[0].EndTime:
# cleanup the association table
for inbound in self.Windows[0].Inbounds:
self.AssignedWindow.pop(inbound.Callsign)
offsetCorrection += 1
self.Windows.pop(0)
# correct the association table
if 0 != offsetCorrection:
for callsign in self.AssignedWindow:
self.AssignedWindow[callsign][0] -= offsetCorrection
if self.AssignedWindow[callsign][0] < self.FreezedIndex:
self.Windows[self.AssignedWindow[callsign][0]].inbound(callsign).FixedSequence = True
# delete the non-updated aircrafts and increase the missed-counter for later runs
callsigns = []
for callsign in self.AssignedWindow:
if 2 < self.AssignedWindow[callsign][1]:
self.Windows[self.AssignedWindow[callsign][0]].remove(callsign)
callsigns.append(callsign)
self.AssignedWindow[callsign][1] += 1
for callsign in callsigns:
self.AssignedWindow.pop(callsign)