Files
aman-sys/aman/config/Airport.py
2021-12-21 10:42:16 +01:00

360 lines
15 KiB
Python

#!/usr/bin/env python
import configparser
from datetime import timedelta
import glob
import os
import sys
from aman.config.RHC import RHC
from aman.config.AirportSequencing import AirportSequencing
from aman.config.RunwaySequencing import RunwaySequencing, RunwayAssignmentType
from aman.formats.SctEseFormat import SctEseFormat
from aman.types.Waypoint import Waypoint
class Airport:
def findGngData(data, path):
if None == data.get('gngwildcard'):
return None, None
# find the newest ESE file
files = glob.glob(os.path.join(path, data['gngwildcard'] + '.ese'))
latestEse = max(files, key=os.path.getctime)
# search for the corresponding SCT file
latestSct = os.path.splitext(latestEse)[0] + '.sct'
# check if the files exist
if False == os.path.isfile(latestEse) or False == os.path.isfile(latestSct):
return None, None
return latestSct, latestEse
def parsePlanning(self, planning):
if None == planning.get('routes'):
return []
return planning['routes'].split(':')
def parseDefaultSequencingConfiguration(self, icao : str, planning):
if None == planning.get('activearrivalrunwaydefault'):
sys.stderr.write('No "activearrivalrunwaydefault" entry found!')
sys.exit(-1)
if None == planning.get('activearrivalmodedefault'):
sys.stderr.write('No "activearrivalmodedefault" entry found!')
sys.exit(-1)
if None == planning.get('arrivalspacingdefault'):
sys.stderr.write('No "arrivalspacingdefault" entry found!')
sys.exit(-1)
if not icao in self.GngData.Runways:
sys.stderr.write('Unable to find' + icao + 'in the SCT data!')
sys.exit(-1)
# parse the default arrival mode
if 'STAGGERED' == planning['activearrivalmodedefault']:
staggered = True
elif 'IPA' == planning['activearrivalmodedefault']:
staggered = False
else:
sys.stderr.write('Unknown arrival mode in "" found! (STAGGERED or IPA needs to be set)')
sys.exit(-1)
# translate the spacing into a map
ident = ''
spacings = {}
spacingConfig = list(filter(None, planning['arrivalspacingdefault'].split(':')))
for i in range(0, len(spacingConfig)):
if 0 == i % 2:
ident = spacingConfig[i]
elif '' != ident:
spacings[ident] = int(spacingConfig[i])
else:
sys.stderr.write('No runway defined in "arrivalspacingdefault"!')
sys.exit(-1)
# create the sequencing data per runway
self.DefaultSequencing = AirportSequencing(icao)
for ident in list(filter(None, planning['activearrivalrunwaydefault'].split(':'))):
if not ident in spacings:
sys.stderr.write('Unable to find sequencing data for ' + ident + ' of ' + icao)
sys.exit(-1)
found = False
for runway in self.GngData.Runways[icao]:
if ident == runway.Name:
sequence = RunwaySequencing(runway)
sequence.Spacing = spacings[ident]
self.DefaultSequencing.activateRunway(sequence)
found = True
break
if False == found:
sys.stderr.write('Unable to find the runway for ' + ident + ' of ' + icao + ' in SCT data!')
sys.exit(-1)
# create the dependencies, if needed
if True == staggered:
if None == planning.get('runwaydependenciesdefault'):
sys.stderr.write('Unable to find the runway dependencies for staggered approaches of ' + icao + '!')
sys.exit(-1)
dependencies = list(filter(None, planning['runwaydependenciesdefault'].split(':')))
if 0 != len(dependencies) % 2:
sys.stderr.write('No valid set of runway dependencies found!')
sys.exit(-1)
for i in range(0, len(dependencies), 2):
self.DefaultSequencing.addDependency(dependencies[i], dependencies[i + 1])
def parseConstraints(self, planning):
self.ArrivalRouteConstraints = {}
# check if the IAF sequence constraint is defined
if 'iafsequence' in planning:
self.IafSpacing = float(planning['iafsequence'])
else:
self.IafSpacing = 10.0
# parse the arrival constraints
for key in planning:
if True == key.startswith('constraints'):
star = key.replace('constraints', '').upper()
if '' != star:
elements = list(filter(None, planning[key].split(':')))
if 3 > len(elements):
sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
sys.exit(-1)
waypoints = []
# values for the waypoint constraints
waypointName = elements[0]
constraints = [-1, -1]
isBaseTurn = False
isFinalTurn = False
index = 1
while index < len(elements):
if 'A' == elements[index] or 'S' == elements[index]:
if index + 1 == len(elements) or False == elements[index + 1].isnumeric():
sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
sys.exit(-1)
if 'A' == elements[index]:
constraints[0] = int(elements[index + 1])
else:
constraints[1] = int(elements[index + 1])
index += 1
elif 'B' == elements[index]:
isBaseTurn = True
elif 'F' == elements[index]:
isFinalTurn = True
else:
if False == isBaseTurn and False == isFinalTurn and -1 == constraints[0] and -1 == constraints[1] and '' == waypointName:
sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
sys.exit(-1)
if True == isBaseTurn and True == isFinalTurn:
sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
sys.exit(-1)
waypoints.append(Waypoint(name = waypointName, base = isBaseTurn, final = isFinalTurn))
if -1 != constraints[0]:
waypoints[-1].Altitude = constraints[0]
if -1 != constraints[1]:
waypoints[-1].Speed = constraints[1]
# reset temporary data
waypointName = elements[index]
constraints = [-1, -1]
isBaseTurn = False
isFinalTurn = False
index += 1
# check if we have to add the last waypoint
if 0 != len(waypoints) and waypointName != waypoints[-1].Name:
waypoints.append(Waypoint(name = waypointName, base = isBaseTurn, final = isFinalTurn))
if -1 != constraints[0]:
waypoints[-1].Altitude = constraints[0]
if -1 != constraints[1]:
waypoints[-1].Speed = constraints[1]
# register the arrival route
self.ArrivalRouteConstraints[star] = waypoints
def parseAssignment(assignment : str):
elements = list(filter(None, assignment.split(':')))
retval = {}
type = None
index = 0
while index < len(elements):
if 0 == index % 2:
if 'A' == elements[index]:
type = RunwayAssignmentType.AircraftType
elif 'G' == elements[index]:
type = RunwayAssignmentType.GateAssignment
else:
sys.stderr.write('Invalid assignment type: ' + elements[index])
sys.exit(-1)
else:
if None == type:
sys.stderr.write('No assignment type defined')
sys.exit(-1)
if type not in retval:
retval.setdefault(type, [])
retval[type].append(elements[index])
type = None
index += 1
return retval
def findRunway(self, icao : str, name : str):
for runway in self.GngData.Runways[icao]:
if name == runway.Name:
return runway
sys.stderr.write('Unable to find runway ' + name + ' in the sequencing data for ' + icao)
raise Exception()
def updateRunwayAssignment(dictionary, runway, assignments):
if runway not in dictionary:
dictionary.setdefault(runway, {})
for key in assignments:
if key not in dictionary[runway]:
dictionary[runway].setdefault(key, assignments[key])
else:
dictionary[runway][key].extend(assignments[key])
def parseOptimization(self, key : str, line : str):
star = key.replace('optimization', '').upper()
# check if the STAR exists
found = False
for rwy in self.GngData.ArrivalRoutes:
for route in self.GngData.ArrivalRoutes[rwy]:
if star == route.Name:
found = True
break
if True == found:
break
if False == found:
sys.stderr.write('Unknown star:' + key)
raise Exception()
elements = line.split(':')
if 2 != len(elements):
sys.stderr.write('Invalid optimization parameter for ' + key)
raise Exception()
maxTTG = int(elements[0])
ttgRatio = float(elements[1])
return star, maxTTG, ttgRatio
def updateOptimizationParameters(dictionary, star, maxTTG, ttgRatio):
if star not in dictionary:
dictionary.setdefault(star, [])
dictionary[star] = [ maxTTG, ttgRatio ]
def parseRunwayAssignment(self, icao : str, planning):
self.OptimizationParameters = {}
self.RunwayAssignmentsShall = {}
self.RunwayAssignmentsShould = {}
self.RunwayAssignmentsMay = {}
self.MaxDelayMay = timedelta(minutes=10)
mayFound = False
for key in planning:
if True == key.startswith('shallassign'):
runway = self.findRunway(icao, key.replace('shallassign', '').upper())
assignments = Airport.parseAssignment(planning[key])
Airport.updateRunwayAssignment(self.RunwayAssignmentsShall, runway, assignments)
elif True == key.startswith('shouldassign'):
runway = self.findRunway(icao, key.replace('shouldassign', '').upper())
assignments = Airport.parseAssignment(planning[key])
Airport.updateRunwayAssignment(self.RunwayAssignmentsShould, runway, assignments)
elif True == key.startswith('mayassign'):
runway = self.findRunway(icao, key.replace('mayassign', '').upper())
assignments = Airport.parseAssignment(planning[key])
Airport.updateRunwayAssignment(self.RunwayAssignmentsMay, runway, assignments)
mayFound = True
elif True == key.startswith('optimization'):
star, maxTTG, ttgRatio = self.parseOptimization(key, planning[key])
Airport.updateOptimizationParameters(self.OptimizationParameters, star, maxTTG, ttgRatio)
# find the max delays
if True == mayFound:
if 'maxdelaymay' not in planning:
sys.stderr.write('maxDelaymay needs to be defined')
sys.exit(-1)
self.MaxDelayMay = timedelta(minutes=int(planning['maxdelaymay']))
def __init__(self, filepath : str, icao : str):
config = configparser.ConfigParser()
config.read(filepath)
dataConfig = None
planningConfig = None
rhcConfig = None
# search the required sections
for key in config:
if 'DATA' == key:
dataConfig = config['DATA']
elif 'PLANNING' == key:
planningConfig = config['PLANNING']
elif 'RHC' == key:
rhcConfig = config['RHC']
# find the GNG-file data
sctFile, eseFile = Airport.findGngData(dataConfig, os.path.dirname(filepath))
if None == sctFile or None == eseFile:
sys.stderr.write('No GNG-files found')
sys.exit(-1)
# parse the planning information
if None == planningConfig or False == self.parsePlanning(planningConfig):
sys.stderr.write('No planning configuration found')
sys.exit(-1)
requiredArrivalRoutes = self.parsePlanning(planningConfig)
if 0 == len(requiredArrivalRoutes):
sys.stderr.write('No valid planning configuration found')
sys.exit(-1)
# parse the RHC information
if None == rhcConfig:
sys.stderr.write('No RHC configuration found')
sys.exit(-1)
self.RecedingHorizonControl = RHC(rhcConfig)
# parse the GNG data
print('Used GNG-Data: ' + eseFile)
self.GngData = SctEseFormat(sctFile, eseFile, icao, requiredArrivalRoutes)
# get the GAFOR id
if None == dataConfig.get('gaforid'):
sys.stderr.write('No GAFOR-ID found!')
sys.exit(-1)
self.GaforId = int(dataConfig['gaforid'])
# get the default sequencing data
self.parseDefaultSequencingConfiguration(icao, planningConfig)
self.parseConstraints(planningConfig)
self.parseRunwayAssignment(icao, planningConfig)
self.assignmentUpdate(self.DefaultSequencing)
def assignmentUpdate(self, sequenceConfig : AirportSequencing):
# initializes the default sequence data
for active in sequenceConfig.ActiveArrivalRunways:
if active.Runway in self.RunwayAssignmentsShall:
active.ShallAssignments = self.RunwayAssignmentsShall[active.Runway]
if active.Runway in self.RunwayAssignmentsShould:
active.ShouldAssignments = self.RunwayAssignmentsShould[active.Runway]
if active.Runway in self.RunwayAssignmentsMay:
active.MayAssignments = self.RunwayAssignmentsMay[active.Runway]