123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377 |
- #!/usr/bin/env python
- import configparser
- from datetime import timedelta
- import glob
- import os
- import sys
- from aman.config.RHC import RHC
- from aman.config.AirportSequencing import AirportSequencing
- from aman.config.RunwaySequencing import RunwaySequencing, RunwayAssignmentType
- from aman.formats.SctEseFormat import SctEseFormat
- from aman.types.Waypoint import Waypoint
- class Airport:
- def findGngData(data, path):
- if None == data.get('gngwildcard'):
- return None, None
- # find the newest ESE file
- files = glob.glob(os.path.join(path, data['gngwildcard'] + '.ese'))
- latestEse = max(files, key=os.path.getctime)
- # search for the corresponding SCT file
- latestSct = os.path.splitext(latestEse)[0] + '.sct'
- # check if the files exist
- if False == os.path.isfile(latestEse) or False == os.path.isfile(latestSct):
- return None, None
- return latestSct, latestEse
- def parsePlanning(self, planning):
- if None == planning.get('routes'):
- return []
- return planning['routes'].split(':')
- def parseDefaultSequencingConfiguration(self, icao : str, planning):
- if None == planning.get('activearrivalrunwaydefault'):
- sys.stderr.write('No "activearrivalrunwaydefault" entry found!')
- sys.exit(-1)
- if None == planning.get('activearrivalmodedefault'):
- sys.stderr.write('No "activearrivalmodedefault" entry found!')
- sys.exit(-1)
- if None == planning.get('arrivalspacingdefault'):
- sys.stderr.write('No "arrivalspacingdefault" entry found!')
- sys.exit(-1)
- if not icao in self.GngData.Runways:
- sys.stderr.write('Unable to find' + icao + 'in the SCT data!')
- sys.exit(-1)
- # parse the default arrival mode
- if 'STAGGERED' == planning['activearrivalmodedefault']:
- staggered = True
- elif 'IPA' == planning['activearrivalmodedefault']:
- staggered = False
- else:
- sys.stderr.write('Unknown arrival mode in "" found! (STAGGERED or IPA needs to be set)')
- sys.exit(-1)
- # translate the spacing into a map
- ident = ''
- spacings = {}
- spacingConfig = list(filter(None, planning['arrivalspacingdefault'].split(':')))
- for i in range(0, len(spacingConfig)):
- if 0 == i % 2:
- ident = spacingConfig[i]
- elif '' != ident:
- spacings[ident] = int(spacingConfig[i])
- else:
- sys.stderr.write('No runway defined in "arrivalspacingdefault"!')
- sys.exit(-1)
- # create the sequencing data per runway
- self.DefaultSequencing = AirportSequencing(icao)
- for ident in list(filter(None, planning['activearrivalrunwaydefault'].split(':'))):
- if not ident in spacings:
- sys.stderr.write('Unable to find sequencing data for ' + ident + ' of ' + icao)
- sys.exit(-1)
- found = False
- for runway in self.GngData.Runways[icao]:
- if ident == runway.Name:
- sequence = RunwaySequencing(runway)
- sequence.Spacing = spacings[ident]
- self.DefaultSequencing.activateRunway(sequence)
- found = True
- break
- if False == found:
- sys.stderr.write('Unable to find the runway for ' + ident + ' of ' + icao + ' in SCT data!')
- sys.exit(-1)
- # create the dependencies, if needed
- if True == staggered:
- if None == planning.get('runwaydependenciesdefault'):
- sys.stderr.write('Unable to find the runway dependencies for staggered approaches of ' + icao + '!')
- sys.exit(-1)
- dependencies = list(filter(None, planning['runwaydependenciesdefault'].split(':')))
- if 0 != len(dependencies) % 2:
- sys.stderr.write('No valid set of runway dependencies found!')
- sys.exit(-1)
- for i in range(0, len(dependencies), 2):
- self.DefaultSequencing.addDependency(dependencies[i], dependencies[i + 1])
- def parseConstraints(self, planning):
- self.ArrivalRouteConstraints = {}
- # check if the IAF sequence constraint is defined
- if 'iafsequence' in planning:
- self.IafSpacing = float(planning['iafsequence'])
- else:
- self.IafSpacing = 10.0
- # parse the arrival constraints
- for key in planning:
- if True == key.startswith('constraints'):
- star = key.replace('constraints', '').upper()
- if '' != star:
- elements = list(filter(None, planning[key].split(':')))
- if 3 > len(elements):
- sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
- sys.exit(-1)
- waypoints = []
- # values for the waypoint constraints
- waypointName = elements[0]
- constraints = [-1, -1]
- isBaseTurn = False
- isFinalTurn = False
- index = 1
- while index < len(elements):
- if 'A' == elements[index] or 'S' == elements[index]:
- if index + 1 == len(elements) or False == elements[index + 1].isnumeric():
- sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
- sys.exit(-1)
- if 'A' == elements[index]:
- constraints[0] = int(elements[index + 1])
- else:
- constraints[1] = int(elements[index + 1])
- index += 1
- elif 'B' == elements[index]:
- isBaseTurn = True
- elif 'F' == elements[index]:
- isFinalTurn = True
- else:
- if False == isBaseTurn and False == isFinalTurn and -1 == constraints[0] and -1 == constraints[1] and '' == waypointName:
- sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
- sys.exit(-1)
- if True == isBaseTurn and True == isFinalTurn:
- sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
- sys.exit(-1)
- waypoints.append(Waypoint(name = waypointName, base = isBaseTurn, final = isFinalTurn))
- if -1 != constraints[0]:
- waypoints[-1].Altitude = constraints[0]
- if -1 != constraints[1]:
- waypoints[-1].Speed = constraints[1]
- # reset temporary data
- waypointName = elements[index]
- constraints = [-1, -1]
- isBaseTurn = False
- isFinalTurn = False
- index += 1
- # check if we have to add the last waypoint
- if 0 != len(waypoints) and waypointName != waypoints[-1].Name:
- waypoints.append(Waypoint(name = waypointName, base = isBaseTurn, final = isFinalTurn))
- if -1 != constraints[0]:
- waypoints[-1].Altitude = constraints[0]
- if -1 != constraints[1]:
- waypoints[-1].Speed = constraints[1]
- # register the arrival route
- self.ArrivalRouteConstraints[star] = waypoints
- def parseAssignment(assignment : str):
- elements = list(filter(None, assignment.split(':')))
- retval = {}
- type = None
- index = 0
- while index < len(elements):
- if 0 == index % 2:
- if 'A' == elements[index]:
- type = RunwayAssignmentType.AircraftType
- elif 'G' == elements[index]:
- type = RunwayAssignmentType.GateAssignment
- else:
- sys.stderr.write('Invalid assignment type: ' + elements[index])
- sys.exit(-1)
- else:
- if None == type:
- sys.stderr.write('No assignment type defined')
- sys.exit(-1)
- if type not in retval:
- retval.setdefault(type, [])
- retval[type].append(elements[index])
- type = None
- index += 1
- return retval
- def findRunway(self, icao : str, name : str):
- for runway in self.GngData.Runways[icao]:
- if name == runway.Name:
- return runway
- sys.stderr.write('Unable to find runway ' + name + ' in the sequencing data for ' + icao)
- raise Exception()
- def updateRunwayAssignment(dictionary, runway, assignments):
- if runway not in dictionary:
- dictionary.setdefault(runway, {})
- for key in assignments:
- if key not in dictionary[runway]:
- dictionary[runway].setdefault(key, assignments[key])
- else:
- dictionary[runway][key].extend(assignments[key])
- def parseOptimization(self, key : str, line : str):
- star = key.replace('optimization', '').upper()
- # check if the STAR exists
- found = False
- for rwy in self.GngData.ArrivalRoutes:
- for route in self.GngData.ArrivalRoutes[rwy]:
- if star == route.Name:
- found = True
- break
- if True == found:
- break
- if False == found:
- sys.stderr.write('Unknown star:' + key)
- raise Exception()
- elements = line.split(':')
- if 2 != len(elements):
- sys.stderr.write('Invalid optimization parameter for ' + key)
- raise Exception()
- maxTTG = int(elements[0])
- ttgRatio = float(elements[1])
- return star, maxTTG, ttgRatio
- def updateOptimizationParameters(dictionary, star, maxTTG, ttgRatio):
- if star not in dictionary:
- dictionary.setdefault(star, [])
- dictionary[star] = [ maxTTG, ttgRatio ]
- def parseRunwayAssignment(self, icao : str, planning):
- self.OptimizationParameters = {}
- self.RunwayAssignmentsShall = {}
- self.RunwayAssignmentsShould = {}
- self.RunwayAssignmentsMay = {}
- self.MaxDelayMay = timedelta(minutes=10)
- mayFound = False
- for key in planning:
- if True == key.startswith('shallassign'):
- runway = self.findRunway(icao, key.replace('shallassign', '').upper())
- assignments = Airport.parseAssignment(planning[key])
- Airport.updateRunwayAssignment(self.RunwayAssignmentsShall, runway, assignments)
- elif True == key.startswith('shouldassign'):
- runway = self.findRunway(icao, key.replace('shouldassign', '').upper())
- assignments = Airport.parseAssignment(planning[key])
- Airport.updateRunwayAssignment(self.RunwayAssignmentsShould, runway, assignments)
- elif True == key.startswith('mayassign'):
- runway = self.findRunway(icao, key.replace('mayassign', '').upper())
- assignments = Airport.parseAssignment(planning[key])
- Airport.updateRunwayAssignment(self.RunwayAssignmentsMay, runway, assignments)
- mayFound = True
- elif True == key.startswith('optimization'):
- star, maxTTG, ttgRatio = self.parseOptimization(key, planning[key])
- Airport.updateOptimizationParameters(self.OptimizationParameters, star, maxTTG, ttgRatio)
- # find the max delays
- if True == mayFound:
- if 'maxdelaymay' not in planning:
- sys.stderr.write('maxDelaymay needs to be defined')
- sys.exit(-1)
- self.MaxDelayMay = timedelta(minutes=int(planning['maxdelaymay']))
- def parseWebUI(self, webui):
- self.IafColorization = {}
- for key in webui:
- if 'iafcolorization' == key:
- elements = list(filter(None, webui[key].split(':')))
- for i in range(0, len(elements), 4):
- self.IafColorization[elements[i]] = [ int(elements[i + 1]), int(elements[i + 2]), int(elements[i + 3]) ]
- def __init__(self, filepath : str, icao : str):
- config = configparser.ConfigParser()
- config.read(filepath)
- dataConfig = None
- planningConfig = None
- rhcConfig = None
- webUiConfig = None
- # search the required sections
- for key in config:
- if 'DATA' == key:
- dataConfig = config['DATA']
- elif 'PLANNING' == key:
- planningConfig = config['PLANNING']
- elif 'RHC' == key:
- rhcConfig = config['RHC']
- elif 'WEBUI' == key:
- webUiConfig = config['WEBUI']
- # find the GNG-file data
- sctFile, eseFile = Airport.findGngData(dataConfig, os.path.dirname(filepath))
- if None == sctFile or None == eseFile:
- sys.stderr.write('No GNG-files found')
- sys.exit(-1)
- # parse the planning information
- if None == planningConfig or False == self.parsePlanning(planningConfig):
- sys.stderr.write('No planning configuration found')
- sys.exit(-1)
- requiredArrivalRoutes = self.parsePlanning(planningConfig)
- if 0 == len(requiredArrivalRoutes):
- sys.stderr.write('No valid planning configuration found')
- sys.exit(-1)
- # parse the RHC information
- if None == rhcConfig:
- sys.stderr.write('No RHC configuration found')
- sys.exit(-1)
- self.RecedingHorizonControl = RHC(rhcConfig)
- # check if thw WebUI information is available
- if None == webUiConfig:
- sys.stderr.write('No WEBUI configuration found')
- sys.exit(-1)
- # parse the GNG data
- print('Used GNG-Data: ' + eseFile)
- self.GngData = SctEseFormat(sctFile, eseFile, icao, requiredArrivalRoutes)
- # get the GAFOR id
- if None == dataConfig.get('gaforid'):
- sys.stderr.write('No GAFOR-ID found!')
- sys.exit(-1)
- self.GaforId = int(dataConfig['gaforid'])
- # get the default sequencing data
- self.parseDefaultSequencingConfiguration(icao, planningConfig)
- self.parseConstraints(planningConfig)
- self.parseRunwayAssignment(icao, planningConfig)
- self.parseWebUI(webUiConfig)
- self.assignmentUpdate(self.DefaultSequencing)
- def assignmentUpdate(self, sequenceConfig : AirportSequencing):
- # initializes the default sequence data
- for active in sequenceConfig.ActiveArrivalRunways:
- if active.Runway in self.RunwayAssignmentsShall:
- active.ShallAssignments = self.RunwayAssignmentsShall[active.Runway]
- if active.Runway in self.RunwayAssignmentsShould:
- active.ShouldAssignments = self.RunwayAssignmentsShould[active.Runway]
- if active.Runway in self.RunwayAssignmentsMay:
- active.MayAssignments = self.RunwayAssignmentsMay[active.Runway]
|