378 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			378 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python
 | 
						|
 | 
						|
import configparser
 | 
						|
from datetime import timedelta
 | 
						|
import glob
 | 
						|
import os
 | 
						|
import sys
 | 
						|
 | 
						|
from aman.config.RHC import RHC
 | 
						|
from aman.config.AirportSequencing import AirportSequencing
 | 
						|
from aman.config.RunwaySequencing import RunwaySequencing, RunwayAssignmentType
 | 
						|
from aman.formats.SctEseFormat import SctEseFormat
 | 
						|
from aman.types.Waypoint import Waypoint
 | 
						|
 | 
						|
class Airport:
 | 
						|
    def findGngData(data, path):
 | 
						|
        if None == data.get('gngwildcard'):
 | 
						|
            return None, None
 | 
						|
 | 
						|
        # find the newest ESE file
 | 
						|
        files = glob.glob(os.path.join(path, data['gngwildcard'] + '.ese'))
 | 
						|
        latestEse = max(files, key=os.path.getctime)
 | 
						|
 | 
						|
        # search for the corresponding SCT file
 | 
						|
        latestSct = os.path.splitext(latestEse)[0] + '.sct'
 | 
						|
 | 
						|
        # check if the files exist
 | 
						|
        if False == os.path.isfile(latestEse) or False == os.path.isfile(latestSct):
 | 
						|
            return None, None
 | 
						|
 | 
						|
        return latestSct, latestEse
 | 
						|
 | 
						|
    def parsePlanning(self, planning):
 | 
						|
        if None == planning.get('routes'):
 | 
						|
            return []
 | 
						|
        return planning['routes'].split(':')
 | 
						|
 | 
						|
    def parseDefaultSequencingConfiguration(self, icao : str, planning):
 | 
						|
        if None == planning.get('activearrivalrunwaydefault'):
 | 
						|
            sys.stderr.write('No "activearrivalrunwaydefault" entry found!')
 | 
						|
            sys.exit(-1)
 | 
						|
        if None == planning.get('activearrivalmodedefault'):
 | 
						|
            sys.stderr.write('No "activearrivalmodedefault" entry found!')
 | 
						|
            sys.exit(-1)
 | 
						|
        if None == planning.get('arrivalspacingdefault'):
 | 
						|
            sys.stderr.write('No "arrivalspacingdefault" entry found!')
 | 
						|
            sys.exit(-1)
 | 
						|
        if not icao in self.GngData.Runways:
 | 
						|
            sys.stderr.write('Unable to find' + icao + 'in the SCT data!')
 | 
						|
            sys.exit(-1)
 | 
						|
 | 
						|
        # parse the default arrival mode
 | 
						|
        if 'STAGGERED' == planning['activearrivalmodedefault']:
 | 
						|
            staggered = True
 | 
						|
        elif 'IPA' == planning['activearrivalmodedefault']:
 | 
						|
            staggered = False
 | 
						|
        else:
 | 
						|
            sys.stderr.write('Unknown arrival mode in "" found! (STAGGERED or IPA needs to be set)')
 | 
						|
            sys.exit(-1)
 | 
						|
 | 
						|
        # translate the spacing into a map
 | 
						|
        ident = ''
 | 
						|
        spacings = {}
 | 
						|
        spacingConfig = list(filter(None, planning['arrivalspacingdefault'].split(':')))
 | 
						|
        for i in range(0, len(spacingConfig)):
 | 
						|
            if 0 == i % 2:
 | 
						|
                ident = spacingConfig[i]
 | 
						|
            elif '' != ident:
 | 
						|
                spacings[ident] = int(spacingConfig[i])
 | 
						|
            else:
 | 
						|
                sys.stderr.write('No runway defined in "arrivalspacingdefault"!')
 | 
						|
                sys.exit(-1)
 | 
						|
 | 
						|
        # create the sequencing data per runway
 | 
						|
        self.DefaultSequencing = AirportSequencing(icao)
 | 
						|
        for ident in list(filter(None, planning['activearrivalrunwaydefault'].split(':'))):
 | 
						|
            if not ident in spacings:
 | 
						|
                sys.stderr.write('Unable to find sequencing data for ' + ident + ' of ' + icao)
 | 
						|
                sys.exit(-1)
 | 
						|
 | 
						|
            found = False
 | 
						|
            for runway in self.GngData.Runways[icao]:
 | 
						|
                if ident == runway.Name:
 | 
						|
                    sequence = RunwaySequencing(runway)
 | 
						|
                    sequence.Spacing = spacings[ident]
 | 
						|
                    self.DefaultSequencing.activateRunway(sequence)
 | 
						|
                    found = True
 | 
						|
                    break
 | 
						|
 | 
						|
            if False == found:
 | 
						|
                sys.stderr.write('Unable to find the runway for ' + ident + ' of ' + icao + ' in SCT data!')
 | 
						|
                sys.exit(-1)
 | 
						|
 | 
						|
        # create the dependencies, if needed
 | 
						|
        if True == staggered:
 | 
						|
            if None == planning.get('runwaydependenciesdefault'):
 | 
						|
                sys.stderr.write('Unable to find the runway dependencies for staggered approaches of ' + icao + '!')
 | 
						|
                sys.exit(-1)
 | 
						|
 | 
						|
            dependencies = list(filter(None, planning['runwaydependenciesdefault'].split(':')))
 | 
						|
            if 0 != len(dependencies) % 2:
 | 
						|
                sys.stderr.write('No valid set of runway dependencies found!')
 | 
						|
                sys.exit(-1)
 | 
						|
 | 
						|
            for i in range(0, len(dependencies), 2):
 | 
						|
                self.DefaultSequencing.addDependency(dependencies[i], dependencies[i + 1])
 | 
						|
 | 
						|
    def parseConstraints(self, planning):
 | 
						|
        self.ArrivalRouteConstraints = {}
 | 
						|
 | 
						|
        # check if the IAF sequence constraint is defined
 | 
						|
        if 'iafsequence' in planning:
 | 
						|
            self.IafSpacing = float(planning['iafsequence'])
 | 
						|
        else:
 | 
						|
            self.IafSpacing = 10.0
 | 
						|
 | 
						|
        # parse the arrival constraints
 | 
						|
        for key in planning:
 | 
						|
            if True == key.startswith('constraints'):
 | 
						|
                star = key.replace('constraints', '').upper()
 | 
						|
                if '' != star:
 | 
						|
                    elements = list(filter(None, planning[key].split(':')))
 | 
						|
                    if 3 > len(elements):
 | 
						|
                        sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
 | 
						|
                        sys.exit(-1)
 | 
						|
 | 
						|
                    waypoints = []
 | 
						|
 | 
						|
                    # values for the waypoint constraints
 | 
						|
                    waypointName = elements[0]
 | 
						|
                    constraints = [-1, -1]
 | 
						|
                    isBaseTurn = False
 | 
						|
                    isFinalTurn = False
 | 
						|
 | 
						|
                    index = 1
 | 
						|
                    while index < len(elements):
 | 
						|
                        if 'A' == elements[index] or 'S' == elements[index]:
 | 
						|
                            if index + 1 == len(elements) or False == elements[index + 1].isnumeric():
 | 
						|
                                sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
 | 
						|
                                sys.exit(-1)
 | 
						|
 | 
						|
                            if 'A' == elements[index]:
 | 
						|
                                constraints[0] = int(elements[index + 1])
 | 
						|
                            else:
 | 
						|
                                constraints[1] = int(elements[index + 1])
 | 
						|
                            index += 1
 | 
						|
                        elif 'B' == elements[index]:
 | 
						|
                            isBaseTurn = True
 | 
						|
                        elif 'F' == elements[index]:
 | 
						|
                            isFinalTurn = True
 | 
						|
                        else:
 | 
						|
                            if False == isBaseTurn and False == isFinalTurn and -1 == constraints[0] and -1 == constraints[1] and '' == waypointName:
 | 
						|
                                sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
 | 
						|
                                sys.exit(-1)
 | 
						|
                            if True == isBaseTurn and True == isFinalTurn:
 | 
						|
                                sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
 | 
						|
                                sys.exit(-1)
 | 
						|
 | 
						|
                            waypoints.append(Waypoint(name = waypointName, base = isBaseTurn, final = isFinalTurn))
 | 
						|
                            if -1 != constraints[0]:
 | 
						|
                                waypoints[-1].Altitude = constraints[0]
 | 
						|
                            if -1 != constraints[1]:
 | 
						|
                                waypoints[-1].Speed = constraints[1]
 | 
						|
 | 
						|
                            # reset temporary data
 | 
						|
                            waypointName = elements[index]
 | 
						|
                            constraints = [-1, -1]
 | 
						|
                            isBaseTurn = False
 | 
						|
                            isFinalTurn = False
 | 
						|
 | 
						|
                        index += 1
 | 
						|
 | 
						|
                    # check if we have to add the last waypoint
 | 
						|
                    if 0 != len(waypoints) and waypointName != waypoints[-1].Name:
 | 
						|
                        waypoints.append(Waypoint(name = waypointName, base = isBaseTurn, final = isFinalTurn))
 | 
						|
                        if -1 != constraints[0]:
 | 
						|
                            waypoints[-1].Altitude = constraints[0]
 | 
						|
                        if -1 != constraints[1]:
 | 
						|
                            waypoints[-1].Speed = constraints[1]
 | 
						|
 | 
						|
                    # register the arrival route
 | 
						|
                    self.ArrivalRouteConstraints[star] = waypoints
 | 
						|
 | 
						|
    def parseAssignment(assignment : str):
 | 
						|
        elements = list(filter(None, assignment.split(':')))
 | 
						|
        retval = {}
 | 
						|
        type = None
 | 
						|
 | 
						|
        index = 0
 | 
						|
        while index < len(elements):
 | 
						|
            if 0 == index % 2:
 | 
						|
                if 'A' == elements[index]:
 | 
						|
                    type = RunwayAssignmentType.AircraftType
 | 
						|
                elif 'G' == elements[index]:
 | 
						|
                    type = RunwayAssignmentType.GateAssignment
 | 
						|
                else:
 | 
						|
                    sys.stderr.write('Invalid assignment type: ' + elements[index])
 | 
						|
                    sys.exit(-1)
 | 
						|
            else:
 | 
						|
                if None == type:
 | 
						|
                    sys.stderr.write('No assignment type defined')
 | 
						|
                    sys.exit(-1)
 | 
						|
 | 
						|
                if type not in retval:
 | 
						|
                    retval.setdefault(type, [])
 | 
						|
 | 
						|
                retval[type].append(elements[index])
 | 
						|
                type = None
 | 
						|
            index += 1
 | 
						|
 | 
						|
        return retval
 | 
						|
 | 
						|
    def findRunway(self, icao : str, name : str):
 | 
						|
        for runway in self.GngData.Runways[icao]:
 | 
						|
            if name == runway.Name:
 | 
						|
                return runway
 | 
						|
 | 
						|
        sys.stderr.write('Unable to find runway ' + name + ' in the sequencing data for ' + icao)
 | 
						|
        raise Exception()
 | 
						|
 | 
						|
    def updateRunwayAssignment(dictionary, runway, assignments):
 | 
						|
        if runway not in dictionary:
 | 
						|
            dictionary.setdefault(runway, {})
 | 
						|
 | 
						|
        for key in assignments:
 | 
						|
            if key not in dictionary[runway]:
 | 
						|
                dictionary[runway].setdefault(key, assignments[key])
 | 
						|
            else:
 | 
						|
                dictionary[runway][key].extend(assignments[key])
 | 
						|
 | 
						|
    def parseOptimization(self, key : str, line : str):
 | 
						|
        star = key.replace('optimization', '').upper()
 | 
						|
 | 
						|
        # check if the STAR exists
 | 
						|
        found = False
 | 
						|
        for rwy in self.GngData.ArrivalRoutes:
 | 
						|
            for route in self.GngData.ArrivalRoutes[rwy]:
 | 
						|
                if star == route.Name:
 | 
						|
                    found = True
 | 
						|
                    break
 | 
						|
            if True == found:
 | 
						|
                break
 | 
						|
 | 
						|
        if False == found:
 | 
						|
            sys.stderr.write('Unknown star:' + key)
 | 
						|
            raise Exception()
 | 
						|
 | 
						|
        elements = line.split(':')
 | 
						|
        if 2 != len(elements):
 | 
						|
            sys.stderr.write('Invalid optimization parameter for ' + key)
 | 
						|
            raise Exception()
 | 
						|
 | 
						|
        maxTTG = int(elements[0])
 | 
						|
        ttgRatio = float(elements[1])
 | 
						|
 | 
						|
        return star, maxTTG, ttgRatio
 | 
						|
 | 
						|
    def updateOptimizationParameters(dictionary, star, maxTTG, ttgRatio):
 | 
						|
        if star not in dictionary:
 | 
						|
            dictionary.setdefault(star, [])
 | 
						|
        dictionary[star] = [ maxTTG, ttgRatio ]
 | 
						|
 | 
						|
    def parseRunwayAssignment(self, icao : str, planning):
 | 
						|
        self.OptimizationParameters = {}
 | 
						|
        self.RunwayAssignmentsShall = {}
 | 
						|
        self.RunwayAssignmentsShould = {}
 | 
						|
        self.RunwayAssignmentsMay = {}
 | 
						|
        self.MaxDelayMay = timedelta(minutes=10)
 | 
						|
        mayFound = False
 | 
						|
 | 
						|
        for key in planning:
 | 
						|
            if True == key.startswith('shallassign'):
 | 
						|
                runway = self.findRunway(icao, key.replace('shallassign', '').upper())
 | 
						|
                assignments = Airport.parseAssignment(planning[key])
 | 
						|
                Airport.updateRunwayAssignment(self.RunwayAssignmentsShall, runway, assignments)
 | 
						|
            elif True == key.startswith('shouldassign'):
 | 
						|
                runway = self.findRunway(icao, key.replace('shouldassign', '').upper())
 | 
						|
                assignments = Airport.parseAssignment(planning[key])
 | 
						|
                Airport.updateRunwayAssignment(self.RunwayAssignmentsShould, runway, assignments)
 | 
						|
            elif True == key.startswith('mayassign'):
 | 
						|
                runway = self.findRunway(icao, key.replace('mayassign', '').upper())
 | 
						|
                assignments = Airport.parseAssignment(planning[key])
 | 
						|
                Airport.updateRunwayAssignment(self.RunwayAssignmentsMay, runway, assignments)
 | 
						|
                mayFound = True
 | 
						|
            elif True == key.startswith('optimization'):
 | 
						|
                star, maxTTG, ttgRatio = self.parseOptimization(key, planning[key])
 | 
						|
                Airport.updateOptimizationParameters(self.OptimizationParameters, star, maxTTG, ttgRatio)
 | 
						|
 | 
						|
 | 
						|
        # find the max delays
 | 
						|
        if True == mayFound:
 | 
						|
            if 'maxdelaymay' not in planning:
 | 
						|
                sys.stderr.write('maxDelaymay needs to be defined')
 | 
						|
                sys.exit(-1)
 | 
						|
            self.MaxDelayMay = timedelta(minutes=int(planning['maxdelaymay']))
 | 
						|
 | 
						|
    def parseWebUI(self, webui):
 | 
						|
        self.IafColorization = {}
 | 
						|
 | 
						|
        for key in webui:
 | 
						|
            if 'iafcolorization' == key:
 | 
						|
                elements = list(filter(None, webui[key].split(':')))
 | 
						|
                for i in range(0, len(elements), 4):
 | 
						|
                    self.IafColorization[elements[i]] = [ int(elements[i + 1]), int(elements[i + 2]), int(elements[i + 3]) ]
 | 
						|
 | 
						|
    def __init__(self, filepath : str, icao : str):
 | 
						|
        config = configparser.ConfigParser()
 | 
						|
        config.read(filepath)
 | 
						|
 | 
						|
        dataConfig = None
 | 
						|
        planningConfig = None
 | 
						|
        rhcConfig = None
 | 
						|
        webUiConfig = None
 | 
						|
 | 
						|
        # search the required sections
 | 
						|
        for key in config:
 | 
						|
            if 'DATA' == key:
 | 
						|
                dataConfig = config['DATA']
 | 
						|
            elif 'PLANNING' == key:
 | 
						|
                planningConfig = config['PLANNING']
 | 
						|
            elif 'RHC' == key:
 | 
						|
                rhcConfig = config['RHC']
 | 
						|
            elif 'WEBUI' == key:
 | 
						|
                webUiConfig = config['WEBUI']
 | 
						|
 | 
						|
        # find the GNG-file data
 | 
						|
        sctFile, eseFile = Airport.findGngData(dataConfig, os.path.dirname(filepath))
 | 
						|
        if None == sctFile or None == eseFile:
 | 
						|
            sys.stderr.write('No GNG-files found')
 | 
						|
            sys.exit(-1)
 | 
						|
 | 
						|
        # parse the planning information
 | 
						|
        if None == planningConfig or False == self.parsePlanning(planningConfig):
 | 
						|
            sys.stderr.write('No planning configuration found')
 | 
						|
            sys.exit(-1)
 | 
						|
        requiredArrivalRoutes = self.parsePlanning(planningConfig)
 | 
						|
        if 0 == len(requiredArrivalRoutes):
 | 
						|
            sys.stderr.write('No valid planning configuration found')
 | 
						|
            sys.exit(-1)
 | 
						|
 | 
						|
        # parse the RHC information
 | 
						|
        if None == rhcConfig:
 | 
						|
            sys.stderr.write('No RHC configuration found')
 | 
						|
            sys.exit(-1)
 | 
						|
        self.RecedingHorizonControl = RHC(rhcConfig)
 | 
						|
 | 
						|
        # check if thw WebUI information is available
 | 
						|
        if None == webUiConfig:
 | 
						|
            sys.stderr.write('No WEBUI configuration found')
 | 
						|
            sys.exit(-1)
 | 
						|
 | 
						|
        # parse the GNG data
 | 
						|
        print('Used GNG-Data: ' + eseFile)
 | 
						|
        self.GngData = SctEseFormat(sctFile, eseFile, icao, requiredArrivalRoutes)
 | 
						|
 | 
						|
        # get the GAFOR id
 | 
						|
        if None == dataConfig.get('gaforid'):
 | 
						|
            sys.stderr.write('No GAFOR-ID found!')
 | 
						|
            sys.exit(-1)
 | 
						|
        self.GaforId = int(dataConfig['gaforid'])
 | 
						|
 | 
						|
        # get the default sequencing data
 | 
						|
        self.parseDefaultSequencingConfiguration(icao, planningConfig)
 | 
						|
        self.parseConstraints(planningConfig)
 | 
						|
        self.parseRunwayAssignment(icao, planningConfig)
 | 
						|
        self.parseWebUI(webUiConfig)
 | 
						|
        self.assignmentUpdate(self.DefaultSequencing)
 | 
						|
 | 
						|
    def assignmentUpdate(self, sequenceConfig : AirportSequencing):
 | 
						|
        # initializes the default sequence data
 | 
						|
        for active in sequenceConfig.ActiveArrivalRunways:
 | 
						|
            if active.Runway in self.RunwayAssignmentsShall:
 | 
						|
                active.ShallAssignments = self.RunwayAssignmentsShall[active.Runway]
 | 
						|
            if active.Runway in self.RunwayAssignmentsShould:
 | 
						|
                active.ShouldAssignments = self.RunwayAssignmentsShould[active.Runway]
 | 
						|
            if active.Runway in self.RunwayAssignmentsMay:
 | 
						|
                active.MayAssignments = self.RunwayAssignmentsMay[active.Runway]
 |