Airport.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. #!/usr/bin/env python
  2. import configparser
  3. import glob
  4. import os
  5. import sys
  6. from aman.config.RHC import RHC
  7. from aman.config.AirportSequencing import AirportSequencing
  8. from aman.config.RunwaySequencing import RunwaySequencing
  9. from aman.formats.SctEseFormat import SctEseFormat
  10. from aman.types.Waypoint import Waypoint
  11. class Airport:
  12. def findGngData(data, path):
  13. if None == data.get('gngwildcard'):
  14. return None, None
  15. # find the newest ESE file
  16. files = glob.glob(os.path.join(path, data['gngwildcard'] + '.ese'))
  17. latestEse = max(files, key=os.path.getctime)
  18. # search for the corresponding SCT file
  19. latestSct = os.path.splitext(latestEse)[0] + '.sct'
  20. # check if the files exist
  21. if False == os.path.isfile(latestEse) or False == os.path.isfile(latestSct):
  22. return None, None
  23. return latestSct, latestEse
  24. def parsePlanning(self, planning):
  25. if None == planning.get('routes'):
  26. return []
  27. return planning['routes'].split(':')
  28. def parseDefaultSequencingConfiguration(self, icao : str, planning):
  29. if None == planning.get('activearrivalrunwaydefault'):
  30. sys.stderr.write('No "activearrivalrunwaydefault" entry found!')
  31. sys.exit(-1)
  32. if None == planning.get('activearrivalmodedefault'):
  33. sys.stderr.write('No "activearrivalmodedefault" entry found!')
  34. sys.exit(-1)
  35. if None == planning.get('arrivalspacingdefault'):
  36. sys.stderr.write('No "arrivalspacingdefault" entry found!')
  37. sys.exit(-1)
  38. if not icao in self.GngData.Runways:
  39. sys.stderr.write('Unable to find' + icao + 'in the SCT data!')
  40. sys.exit(-1)
  41. # parse the default arrival mode
  42. if 'STAGGERED' == planning['activearrivalmodedefault']:
  43. staggered = True
  44. elif 'IPA' == planning['activearrivalmodedefault']:
  45. staggered = False
  46. else:
  47. sys.stderr.write('Unknown arrival mode in "" found! (STAGGERED or IPA needs to be set)')
  48. sys.exit(-1)
  49. # translate the spacing into a map
  50. ident = ''
  51. spacings = {}
  52. spacingConfig = list(filter(None, planning['arrivalspacingdefault'].split(':')))
  53. for i in range(0, len(spacingConfig)):
  54. if 0 == i % 2:
  55. ident = spacingConfig[i]
  56. elif '' != ident:
  57. spacings[ident] = int(spacingConfig[i])
  58. else:
  59. sys.stderr.write('No runway defined in "arrivalspacingdefault"!')
  60. sys.exit(-1)
  61. # create the sequencing data per runway
  62. self.DefaultSequencing = AirportSequencing(icao)
  63. for ident in list(filter(None, planning['activearrivalrunwaydefault'].split(':'))):
  64. if not ident in spacings:
  65. sys.stderr.write('Unable to find sequencing data for ' + ident + ' of ' + icao)
  66. sys.exit(-1)
  67. found = False
  68. for runway in self.GngData.Runways[icao]:
  69. if ident == runway.Name:
  70. sequence = RunwaySequencing(runway)
  71. sequence.Spacing = spacings[ident]
  72. self.DefaultSequencing.activateRunway(sequence)
  73. found = True
  74. break
  75. if False == found:
  76. sys.stderr.write('Unable to find the runway for ' + ident + ' of ' + icao + ' in SCT data!')
  77. sys.exit(-1)
  78. # create the dependencies, if needed
  79. if True == staggered:
  80. if None == planning.get('runwaydependenciesdefault'):
  81. sys.stderr.write('Unable to find the runway dependencies for staggered approaches of ' + icao + '!')
  82. sys.exit(-1)
  83. dependencies = list(filter(None, planning['runwaydependenciesdefault'].split(':')))
  84. if 0 != len(dependencies) % 2:
  85. sys.stderr.write('No valid set of runway dependencies found!')
  86. sys.exit(-1)
  87. for i in range(0, len(dependencies), 2):
  88. self.DefaultSequencing.addDependency(dependencies[i], dependencies[i + 1])
  89. def parseConstraints(self, planning):
  90. self.ArrivalRouteConstraints = {}
  91. for key in planning:
  92. if True == key.startswith('constraints'):
  93. star = key.replace('constraints', '').upper()
  94. if '' != star:
  95. elements = list(filter(None, planning[key].split(':')))
  96. if 3 > len(elements):
  97. sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
  98. sys.exit(-1)
  99. waypoints = []
  100. # values for the waypoint constraints
  101. waypointName = elements[0]
  102. constraints = [-1, -1]
  103. isBaseTurn = False
  104. isFinalTurn = False
  105. index = 1
  106. while index < len(elements):
  107. if 'A' == elements[index] or 'S' == elements[index]:
  108. if index + 1 == len(elements) or False == elements[index + 1].isnumeric():
  109. sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
  110. sys.exit(-1)
  111. if 'A' == elements[index]:
  112. constraints[0] = int(elements[index + 1])
  113. else:
  114. constraints[1] = int(elements[index + 1])
  115. index += 1
  116. elif 'B' == elements[index]:
  117. isBaseTurn = True
  118. elif 'F' == elements[index]:
  119. isFinalTurn = True
  120. else:
  121. if False == isBaseTurn and False == isFinalTurn and -1 == constraints[0] and -1 == constraints[1] and '' == waypointName:
  122. sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
  123. sys.exit(-1)
  124. if True == isBaseTurn and True == isFinalTurn:
  125. sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
  126. sys.exit(-1)
  127. waypoints.append(Waypoint(name = waypointName, base = isBaseTurn, final = isFinalTurn))
  128. if -1 != constraints[0]:
  129. waypoints[-1].Altitude = constraints[0]
  130. if -1 != constraints[1]:
  131. waypoints[-1].Speed = constraints[1]
  132. # reset temporary data
  133. waypointName = elements[index]
  134. constraints = [-1, -1]
  135. isBaseTurn = False
  136. isFinalTurn = False
  137. index += 1
  138. # check if we have to add the last waypoint
  139. if 0 != len(waypoints) and waypointName != waypoints[-1].Name:
  140. waypoints.append(Waypoint(name = waypointName, base = isBaseTurn, final = isFinalTurn))
  141. if -1 != constraints[0]:
  142. waypoints[-1].Altitude = constraints[0]
  143. if -1 != constraints[1]:
  144. waypoints[-1].Speed = constraints[1]
  145. # register the arrival route
  146. self.ArrivalRouteConstraints[star] = waypoints
  147. def __init__(self, filepath : str, icao : str):
  148. config = configparser.ConfigParser()
  149. config.read(filepath)
  150. dataConfig = None
  151. planningConfig = None
  152. rhcConfig = None
  153. # search the required sections
  154. for key in config:
  155. if 'DATA' == key:
  156. dataConfig = config['DATA']
  157. elif 'PLANNING' == key:
  158. planningConfig = config['PLANNING']
  159. elif 'RHC' == key:
  160. rhcConfig = config['RHC']
  161. # find the GNG-file data
  162. sctFile, eseFile = Airport.findGngData(dataConfig, os.path.dirname(filepath))
  163. if None == sctFile or None == eseFile:
  164. sys.stderr.write('No GNG-files found')
  165. sys.exit(-1)
  166. # parse the planning information
  167. if None == planningConfig or False == self.parsePlanning(planningConfig):
  168. sys.stderr.write('No planning configuration found')
  169. sys.exit(-1)
  170. requiredArrivalRoutes = self.parsePlanning(planningConfig)
  171. if 0 == len(requiredArrivalRoutes):
  172. sys.stderr.write('No valid planning configuration found')
  173. sys.exit(-1)
  174. # parse the RHC information
  175. if None == rhcConfig:
  176. sys.stderr.write('No RHC configuration found')
  177. sys.exit(-1)
  178. self.RecedingHorizonControl = RHC(rhcConfig)
  179. # parse the GNG data
  180. print('Used GNG-Data: ' + eseFile)
  181. self.GngData = SctEseFormat(sctFile, eseFile, icao, requiredArrivalRoutes)
  182. # get the GAFOR id
  183. if None == dataConfig.get('gaforid'):
  184. sys.stderr.write('No GAFOR-ID found!')
  185. sys.exit(-1)
  186. self.GaforId = int(dataConfig['gaforid'])
  187. # get the default sequencing data
  188. self.parseDefaultSequencingConfiguration(icao, planningConfig)
  189. self.parseConstraints(planningConfig)