227 lines
		
	
	
		
			9.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			227 lines
		
	
	
		
			9.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| 
 | |
| import configparser
 | |
| import glob
 | |
| import os
 | |
| import sys
 | |
| 
 | |
| from aman.config.RHC import RHC
 | |
| from aman.config.AirportSequencing import AirportSequencing
 | |
| from aman.config.RunwaySequencing import RunwaySequencing
 | |
| from aman.formats.SctEseFormat import SctEseFormat
 | |
| from aman.types.Waypoint import Waypoint
 | |
| 
 | |
| class Airport:
 | |
|     def findGngData(data, path):
 | |
|         if None == data.get('gngwildcard'):
 | |
|             return None, None
 | |
| 
 | |
|         # find the newest ESE file
 | |
|         files = glob.glob(os.path.join(path, data['gngwildcard'] + '.ese'))
 | |
|         latestEse = max(files, key=os.path.getctime)
 | |
| 
 | |
|         # search for the corresponding SCT file
 | |
|         latestSct = os.path.splitext(latestEse)[0] + '.sct'
 | |
| 
 | |
|         # check if the files exist
 | |
|         if False == os.path.isfile(latestEse) or False == os.path.isfile(latestSct):
 | |
|             return None, None
 | |
| 
 | |
|         return latestSct, latestEse
 | |
| 
 | |
|     def parsePlanning(self, planning):
 | |
|         if None == planning.get('routes'):
 | |
|             return []
 | |
|         return planning['routes'].split(':')
 | |
| 
 | |
|     def parseDefaultSequencingConfiguration(self, icao : str, planning):
 | |
|         if None == planning.get('activearrivalrunwaydefault'):
 | |
|             sys.stderr.write('No "activearrivalrunwaydefault" entry found!')
 | |
|             sys.exit(-1)
 | |
|         if None == planning.get('activearrivalmodedefault'):
 | |
|             sys.stderr.write('No "activearrivalmodedefault" entry found!')
 | |
|             sys.exit(-1)
 | |
|         if None == planning.get('arrivalspacingdefault'):
 | |
|             sys.stderr.write('No "arrivalspacingdefault" entry found!')
 | |
|             sys.exit(-1)
 | |
|         if not icao in self.GngData.Runways:
 | |
|             sys.stderr.write('Unable to find' + icao + 'in the SCT data!')
 | |
|             sys.exit(-1)
 | |
| 
 | |
|         # parse the default arrival mode
 | |
|         if 'STAGGERED' == planning['activearrivalmodedefault']:
 | |
|             staggered = True
 | |
|         elif 'IPA' == planning['activearrivalmodedefault']:
 | |
|             staggered = False
 | |
|         else:
 | |
|             sys.stderr.write('Unknown arrival mode in "" found! (STAGGERED or IPA needs to be set)')
 | |
|             sys.exit(-1)
 | |
| 
 | |
|         # translate the spacing into a map
 | |
|         ident = ''
 | |
|         spacings = {}
 | |
|         spacingConfig = list(filter(None, planning['arrivalspacingdefault'].split(':')))
 | |
|         for i in range(0, len(spacingConfig)):
 | |
|             if 0 == i % 2:
 | |
|                 ident = spacingConfig[i]
 | |
|             elif '' != ident:
 | |
|                 spacings[ident] = int(spacingConfig[i])
 | |
|             else:
 | |
|                 sys.stderr.write('No runway defined in "arrivalspacingdefault"!')
 | |
|                 sys.exit(-1)
 | |
| 
 | |
|         # create the sequencing data per runway
 | |
|         self.DefaultSequencing = AirportSequencing(icao)
 | |
|         for ident in list(filter(None, planning['activearrivalrunwaydefault'].split(':'))):
 | |
|             if not ident in spacings:
 | |
|                 sys.stderr.write('Unable to find sequencing data for ' + ident + ' of ' + icao)
 | |
|                 sys.exit(-1)
 | |
| 
 | |
|             found = False
 | |
|             for runway in self.GngData.Runways[icao]:
 | |
|                 if ident == runway.Name:
 | |
|                     sequence = RunwaySequencing(runway)
 | |
|                     sequence.Spacing = spacings[ident]
 | |
|                     self.DefaultSequencing.activateRunway(sequence)
 | |
|                     found = True
 | |
|                     break
 | |
| 
 | |
|             if False == found:
 | |
|                 sys.stderr.write('Unable to find the runway for ' + ident + ' of ' + icao + ' in SCT data!')
 | |
|                 sys.exit(-1)
 | |
| 
 | |
|         # create the dependencies, if needed
 | |
|         if True == staggered:
 | |
|             if None == planning.get('runwaydependenciesdefault'):
 | |
|                 sys.stderr.write('Unable to find the runway dependencies for staggered approaches of ' + icao + '!')
 | |
|                 sys.exit(-1)
 | |
| 
 | |
|             dependencies = list(filter(None, planning['runwaydependenciesdefault'].split(':')))
 | |
|             if 0 != len(dependencies) % 2:
 | |
|                 sys.stderr.write('No valid set of runway dependencies found!')
 | |
|                 sys.exit(-1)
 | |
| 
 | |
|             for i in range(0, len(dependencies), 2):
 | |
|                 self.DefaultSequencing.addDependency(dependencies[i], dependencies[i + 1])
 | |
| 
 | |
|     def parseConstraints(self, planning):
 | |
|         self.ArrivalRouteConstraints = {}
 | |
| 
 | |
|         for key in planning:
 | |
|             if True == key.startswith('constraints'):
 | |
|                 star = key.replace('constraints', '').upper()
 | |
|                 if '' != star:
 | |
|                     elements = list(filter(None, planning[key].split(':')))
 | |
|                     if 3 > len(elements):
 | |
|                         sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
 | |
|                         sys.exit(-1)
 | |
| 
 | |
|                     waypoints = []
 | |
| 
 | |
|                     # values for the waypoint constraints
 | |
|                     waypointName = elements[0]
 | |
|                     constraints = [-1, -1]
 | |
|                     isBaseTurn = False
 | |
|                     isFinalTurn = False
 | |
| 
 | |
|                     index = 1
 | |
|                     while index < len(elements):
 | |
|                         if 'A' == elements[index] or 'S' == elements[index]:
 | |
|                             if index + 1 == len(elements) or False == elements[index + 1].isnumeric():
 | |
|                                 sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
 | |
|                                 sys.exit(-1)
 | |
| 
 | |
|                             if 'A' == elements[index]:
 | |
|                                 constraints[0] = int(elements[index + 1])
 | |
|                             else:
 | |
|                                 constraints[1] = int(elements[index + 1])
 | |
|                             index += 1
 | |
|                         elif 'B' == elements[index]:
 | |
|                             isBaseTurn = True
 | |
|                         elif 'F' == elements[index]:
 | |
|                             isFinalTurn = True
 | |
|                         else:
 | |
|                             if False == isBaseTurn and False == isFinalTurn and -1 == constraints[0] and -1 == constraints[1] and '' == waypointName:
 | |
|                                 sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
 | |
|                                 sys.exit(-1)
 | |
|                             if True == isBaseTurn and True == isFinalTurn:
 | |
|                                 sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
 | |
|                                 sys.exit(-1)
 | |
| 
 | |
|                             waypoints.append(Waypoint(name = waypointName, base = isBaseTurn, final = isFinalTurn))
 | |
|                             if -1 != constraints[0]:
 | |
|                                 waypoints[-1].Altitude = constraints[0]
 | |
|                             if -1 != constraints[1]:
 | |
|                                 waypoints[-1].Speed = constraints[1]
 | |
| 
 | |
|                             # reset temporary data
 | |
|                             waypointName = elements[index]
 | |
|                             constraints = [-1, -1]
 | |
|                             isBaseTurn = False
 | |
|                             isFinalTurn = False
 | |
| 
 | |
|                         index += 1
 | |
| 
 | |
|                     # check if we have to add the last waypoint
 | |
|                     if 0 != len(waypoints) and waypointName != waypoints[-1].Name:
 | |
|                         waypoints.append(Waypoint(name = waypointName, base = isBaseTurn, final = isFinalTurn))
 | |
|                         if -1 != constraints[0]:
 | |
|                             waypoints[-1].Altitude = constraints[0]
 | |
|                         if -1 != constraints[1]:
 | |
|                             waypoints[-1].Speed = constraints[1]
 | |
| 
 | |
|                     # register the arrival route
 | |
|                     self.ArrivalRouteConstraints[star] = waypoints
 | |
| 
 | |
|     def __init__(self, filepath : str, icao : str):
 | |
|         config = configparser.ConfigParser()
 | |
|         config.read(filepath)
 | |
| 
 | |
|         dataConfig = None
 | |
|         planningConfig = None
 | |
|         rhcConfig = None
 | |
| 
 | |
|         # search the required sections
 | |
|         for key in config:
 | |
|             if 'DATA' == key:
 | |
|                 dataConfig = config['DATA']
 | |
|             elif 'PLANNING' == key:
 | |
|                 planningConfig = config['PLANNING']
 | |
|             elif 'RHC' == key:
 | |
|                 rhcConfig = config['RHC']
 | |
| 
 | |
|         # find the GNG-file data
 | |
|         sctFile, eseFile = Airport.findGngData(dataConfig, os.path.dirname(filepath))
 | |
|         if None == sctFile or None == eseFile:
 | |
|             sys.stderr.write('No GNG-files found')
 | |
|             sys.exit(-1)
 | |
| 
 | |
|         # parse the planning information
 | |
|         if None == planningConfig or False == self.parsePlanning(planningConfig):
 | |
|             sys.stderr.write('No planning configuration found')
 | |
|             sys.exit(-1)
 | |
|         requiredArrivalRoutes = self.parsePlanning(planningConfig)
 | |
|         if 0 == len(requiredArrivalRoutes):
 | |
|             sys.stderr.write('No valid planning configuration found')
 | |
|             sys.exit(-1)
 | |
| 
 | |
|         # parse the RHC information
 | |
|         if None == rhcConfig:
 | |
|             sys.stderr.write('No RHC configuration found')
 | |
|             sys.exit(-1)
 | |
|         self.RecedingHorizonControl = RHC(rhcConfig)
 | |
| 
 | |
|         # parse the GNG data
 | |
|         print('Used GNG-Data: ' + eseFile)
 | |
|         self.GngData = SctEseFormat(sctFile, eseFile, icao, requiredArrivalRoutes)
 | |
| 
 | |
|         # get the GAFOR id
 | |
|         if None == dataConfig.get('gaforid'):
 | |
|             sys.stderr.write('No GAFOR-ID found!')
 | |
|             sys.exit(-1)
 | |
|         self.GaforId = int(dataConfig['gaforid'])
 | |
| 
 | |
|         # get the default sequencing data
 | |
|         self.parseDefaultSequencingConfiguration(icao, planningConfig)
 | |
|         self.parseConstraints(planningConfig)
 |