#!/usr/bin/env python import configparser from datetime import timedelta import glob import os import sys from aman.config.RHC import RHC from aman.config.AirportSequencing import AirportSequencing from aman.config.RunwaySequencing import RunwaySequencing, RunwayAssignmentType from aman.formats.SctEseFormat import SctEseFormat from aman.types.Waypoint import Waypoint class Airport: def findGngData(data, path): if None == data.get('gngwildcard'): return None, None # find the newest ESE file files = glob.glob(os.path.join(path, data['gngwildcard'] + '.ese')) latestEse = max(files, key=os.path.getctime) # search for the corresponding SCT file latestSct = os.path.splitext(latestEse)[0] + '.sct' # check if the files exist if False == os.path.isfile(latestEse) or False == os.path.isfile(latestSct): return None, None return latestSct, latestEse def parsePlanning(self, planning): if None == planning.get('routes'): return [] return planning['routes'].split(':') def parseDefaultSequencingConfiguration(self, icao : str, planning): if None == planning.get('activearrivalrunwaydefault'): sys.stderr.write('No "activearrivalrunwaydefault" entry found!') sys.exit(-1) if None == planning.get('activearrivalmodedefault'): sys.stderr.write('No "activearrivalmodedefault" entry found!') sys.exit(-1) if None == planning.get('arrivalspacingdefault'): sys.stderr.write('No "arrivalspacingdefault" entry found!') sys.exit(-1) if not icao in self.GngData.Runways: sys.stderr.write('Unable to find' + icao + 'in the SCT data!') sys.exit(-1) # parse the default arrival mode if 'STAGGERED' == planning['activearrivalmodedefault']: staggered = True elif 'IPA' == planning['activearrivalmodedefault']: staggered = False else: sys.stderr.write('Unknown arrival mode in "" found! (STAGGERED or IPA needs to be set)') sys.exit(-1) # translate the spacing into a map ident = '' spacings = {} spacingConfig = list(filter(None, planning['arrivalspacingdefault'].split(':'))) for i in range(0, len(spacingConfig)): if 0 == i % 2: ident = spacingConfig[i] elif '' != ident: spacings[ident] = int(spacingConfig[i]) else: sys.stderr.write('No runway defined in "arrivalspacingdefault"!') sys.exit(-1) # create the sequencing data per runway self.DefaultSequencing = AirportSequencing(icao) for ident in list(filter(None, planning['activearrivalrunwaydefault'].split(':'))): if not ident in spacings: sys.stderr.write('Unable to find sequencing data for ' + ident + ' of ' + icao) sys.exit(-1) found = False for runway in self.GngData.Runways[icao]: if ident == runway.Name: sequence = RunwaySequencing(runway) sequence.Spacing = spacings[ident] self.DefaultSequencing.activateRunway(sequence) found = True break if False == found: sys.stderr.write('Unable to find the runway for ' + ident + ' of ' + icao + ' in SCT data!') sys.exit(-1) # create the dependencies, if needed if True == staggered: if None == planning.get('runwaydependenciesdefault'): sys.stderr.write('Unable to find the runway dependencies for staggered approaches of ' + icao + '!') sys.exit(-1) dependencies = list(filter(None, planning['runwaydependenciesdefault'].split(':'))) if 0 != len(dependencies) % 2: sys.stderr.write('No valid set of runway dependencies found!') sys.exit(-1) for i in range(0, len(dependencies), 2): self.DefaultSequencing.addDependency(dependencies[i], dependencies[i + 1]) def parseConstraints(self, planning): self.ArrivalRouteConstraints = {} # check if the IAF sequence constraint is defined if 'iafsequence' in planning: self.IafSpacing = float(planning['iafsequence']) else: self.IafSpacing = 10.0 # parse the arrival constraints for key in planning: if True == key.startswith('constraints'): star = key.replace('constraints', '').upper() if '' != star: elements = list(filter(None, planning[key].split(':'))) if 3 > len(elements): sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key]) sys.exit(-1) waypoints = [] # values for the waypoint constraints waypointName = elements[0] constraints = [-1, -1] isBaseTurn = False isFinalTurn = False index = 1 while index < len(elements): if 'A' == elements[index] or 'S' == elements[index]: if index + 1 == len(elements) or False == elements[index + 1].isnumeric(): sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key]) sys.exit(-1) if 'A' == elements[index]: constraints[0] = int(elements[index + 1]) else: constraints[1] = int(elements[index + 1]) index += 1 elif 'B' == elements[index]: isBaseTurn = True elif 'F' == elements[index]: isFinalTurn = True else: if False == isBaseTurn and False == isFinalTurn and -1 == constraints[0] and -1 == constraints[1] and '' == waypointName: sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key]) sys.exit(-1) if True == isBaseTurn and True == isFinalTurn: sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key]) sys.exit(-1) waypoints.append(Waypoint(name = waypointName, base = isBaseTurn, final = isFinalTurn)) if -1 != constraints[0]: waypoints[-1].Altitude = constraints[0] if -1 != constraints[1]: waypoints[-1].Speed = constraints[1] # reset temporary data waypointName = elements[index] constraints = [-1, -1] isBaseTurn = False isFinalTurn = False index += 1 # check if we have to add the last waypoint if 0 != len(waypoints) and waypointName != waypoints[-1].Name: waypoints.append(Waypoint(name = waypointName, base = isBaseTurn, final = isFinalTurn)) if -1 != constraints[0]: waypoints[-1].Altitude = constraints[0] if -1 != constraints[1]: waypoints[-1].Speed = constraints[1] # register the arrival route self.ArrivalRouteConstraints[star] = waypoints def parseAssignment(assignment : str): elements = list(filter(None, assignment.split(':'))) retval = {} type = None index = 0 while index < len(elements): if 0 == index % 2: if 'A' == elements[index]: type = RunwayAssignmentType.AircraftType elif 'G' == elements[index]: type = RunwayAssignmentType.GateAssignment else: sys.stderr.write('Invalid assignment type: ' + elements[index]) sys.exit(-1) else: if None == type: sys.stderr.write('No assignment type defined') sys.exit(-1) if type not in retval: retval.setdefault(type, []) retval[type].append(elements[index]) type = None index += 1 return retval def findRunway(self, icao : str, name : str): for runway in self.GngData.Runways[icao]: if name == runway.Name: return runway sys.stderr.write('Unable to find runway ' + name + ' in the sequencing data for ' + icao) raise Exception() def updateRunwayAssignment(dictionary, runway, assignments): if runway not in dictionary: dictionary.setdefault(runway, {}) for key in assignments: if key not in dictionary[runway]: dictionary[runway].setdefault(key, assignments[key]) else: dictionary[runway][key].extend(assignments[key]) def parseOptimization(self, key : str, line : str): star = key.replace('optimization', '').upper() # check if the STAR exists found = False for rwy in self.GngData.ArrivalRoutes: for route in self.GngData.ArrivalRoutes[rwy]: if star == route.Name: found = True break if True == found: break if False == found: sys.stderr.write('Unknown star:' + key) raise Exception() elements = line.split(':') if 2 != len(elements): sys.stderr.write('Invalid optimization parameter for ' + key) raise Exception() maxTTG = int(elements[0]) ttgRatio = float(elements[1]) return star, maxTTG, ttgRatio def updateOptimizationParameters(dictionary, star, maxTTG, ttgRatio): if star not in dictionary: dictionary.setdefault(star, []) dictionary[star] = [ maxTTG, ttgRatio ] def parseRunwayAssignment(self, icao : str, planning): self.OptimizationParameters = {} self.RunwayAssignmentsShall = {} self.RunwayAssignmentsShould = {} self.RunwayAssignmentsMay = {} self.MaxDelayMay = timedelta(minutes=10) mayFound = False for key in planning: if True == key.startswith('shallassign'): runway = self.findRunway(icao, key.replace('shallassign', '').upper()) assignments = Airport.parseAssignment(planning[key]) Airport.updateRunwayAssignment(self.RunwayAssignmentsShall, runway, assignments) elif True == key.startswith('shouldassign'): runway = self.findRunway(icao, key.replace('shouldassign', '').upper()) assignments = Airport.parseAssignment(planning[key]) Airport.updateRunwayAssignment(self.RunwayAssignmentsShould, runway, assignments) elif True == key.startswith('mayassign'): runway = self.findRunway(icao, key.replace('mayassign', '').upper()) assignments = Airport.parseAssignment(planning[key]) Airport.updateRunwayAssignment(self.RunwayAssignmentsMay, runway, assignments) mayFound = True elif True == key.startswith('optimization'): star, maxTTG, ttgRatio = self.parseOptimization(key, planning[key]) Airport.updateOptimizationParameters(self.OptimizationParameters, star, maxTTG, ttgRatio) # find the max delays if True == mayFound: if 'maxdelaymay' not in planning: sys.stderr.write('maxDelaymay needs to be defined') sys.exit(-1) self.MaxDelayMay = timedelta(minutes=int(planning['maxdelaymay'])) def parseWebUI(self, webui): self.IafColorization = {} for key in webui: if 'iafcolorization' == key: elements = list(filter(None, webui[key].split(':'))) for i in range(0, len(elements), 4): self.IafColorization[elements[i]] = [ int(elements[i + 1]), int(elements[i + 2]), int(elements[i + 3]) ] def __init__(self, filepath : str, icao : str): config = configparser.ConfigParser() config.read(filepath) dataConfig = None planningConfig = None rhcConfig = None webUiConfig = None # search the required sections for key in config: if 'DATA' == key: dataConfig = config['DATA'] elif 'PLANNING' == key: planningConfig = config['PLANNING'] elif 'RHC' == key: rhcConfig = config['RHC'] elif 'WEBUI' == key: webUiConfig = config['WEBUI'] # find the GNG-file data sctFile, eseFile = Airport.findGngData(dataConfig, os.path.dirname(filepath)) if None == sctFile or None == eseFile: sys.stderr.write('No GNG-files found') sys.exit(-1) # parse the planning information if None == planningConfig or False == self.parsePlanning(planningConfig): sys.stderr.write('No planning configuration found') sys.exit(-1) requiredArrivalRoutes = self.parsePlanning(planningConfig) if 0 == len(requiredArrivalRoutes): sys.stderr.write('No valid planning configuration found') sys.exit(-1) # parse the RHC information if None == rhcConfig: sys.stderr.write('No RHC configuration found') sys.exit(-1) self.RecedingHorizonControl = RHC(rhcConfig) # check if thw WebUI information is available if None == webUiConfig: sys.stderr.write('No WEBUI configuration found') sys.exit(-1) # parse the GNG data print('Used GNG-Data: ' + eseFile) self.GngData = SctEseFormat(sctFile, eseFile, icao, requiredArrivalRoutes) # get the GAFOR id if None == dataConfig.get('gaforid'): sys.stderr.write('No GAFOR-ID found!') sys.exit(-1) self.GaforId = int(dataConfig['gaforid']) # get the default sequencing data self.parseDefaultSequencingConfiguration(icao, planningConfig) self.parseConstraints(planningConfig) self.parseRunwayAssignment(icao, planningConfig) self.parseWebUI(webUiConfig) self.assignmentUpdate(self.DefaultSequencing) def assignmentUpdate(self, sequenceConfig : AirportSequencing): # initializes the default sequence data for active in sequenceConfig.ActiveArrivalRunways: if active.Runway in self.RunwayAssignmentsShall: active.ShallAssignments = self.RunwayAssignmentsShall[active.Runway] if active.Runway in self.RunwayAssignmentsShould: active.ShouldAssignments = self.RunwayAssignmentsShould[active.Runway] if active.Runway in self.RunwayAssignmentsMay: active.MayAssignments = self.RunwayAssignmentsMay[active.Runway]