Airport.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. #!/usr/bin/env python
  2. import configparser
  3. from datetime import timedelta
  4. import glob
  5. import os
  6. import sys
  7. from aman.config.RHC import RHC
  8. from aman.config.AirportSequencing import AirportSequencing
  9. from aman.config.RunwaySequencing import RunwaySequencing, RunwayAssignmentType
  10. from aman.formats.SctEseFormat import SctEseFormat
  11. from aman.types.Waypoint import Waypoint
  12. class Airport:
  13. def findGngData(data, path):
  14. if None == data.get('gngwildcard'):
  15. return None, None
  16. # find the newest ESE file
  17. files = glob.glob(os.path.join(path, data['gngwildcard'] + '.ese'))
  18. latestEse = max(files, key=os.path.getctime)
  19. # search for the corresponding SCT file
  20. latestSct = os.path.splitext(latestEse)[0] + '.sct'
  21. # check if the files exist
  22. if False == os.path.isfile(latestEse) or False == os.path.isfile(latestSct):
  23. return None, None
  24. return latestSct, latestEse
  25. def parsePlanning(self, planning):
  26. if None == planning.get('routes'):
  27. return []
  28. return planning['routes'].split(':')
  29. def parseDefaultSequencingConfiguration(self, icao : str, planning):
  30. if None == planning.get('activearrivalrunwaydefault'):
  31. sys.stderr.write('No "activearrivalrunwaydefault" entry found!')
  32. sys.exit(-1)
  33. if None == planning.get('activearrivalmodedefault'):
  34. sys.stderr.write('No "activearrivalmodedefault" entry found!')
  35. sys.exit(-1)
  36. if None == planning.get('arrivalspacingdefault'):
  37. sys.stderr.write('No "arrivalspacingdefault" entry found!')
  38. sys.exit(-1)
  39. if not icao in self.GngData.Runways:
  40. sys.stderr.write('Unable to find' + icao + 'in the SCT data!')
  41. sys.exit(-1)
  42. # parse the default arrival mode
  43. if 'STAGGERED' == planning['activearrivalmodedefault']:
  44. staggered = True
  45. elif 'IPA' == planning['activearrivalmodedefault']:
  46. staggered = False
  47. else:
  48. sys.stderr.write('Unknown arrival mode in "" found! (STAGGERED or IPA needs to be set)')
  49. sys.exit(-1)
  50. # translate the spacing into a map
  51. ident = ''
  52. spacings = {}
  53. spacingConfig = list(filter(None, planning['arrivalspacingdefault'].split(':')))
  54. for i in range(0, len(spacingConfig)):
  55. if 0 == i % 2:
  56. ident = spacingConfig[i]
  57. elif '' != ident:
  58. spacings[ident] = int(spacingConfig[i])
  59. else:
  60. sys.stderr.write('No runway defined in "arrivalspacingdefault"!')
  61. sys.exit(-1)
  62. # create the sequencing data per runway
  63. self.DefaultSequencing = AirportSequencing(icao)
  64. for ident in list(filter(None, planning['activearrivalrunwaydefault'].split(':'))):
  65. if not ident in spacings:
  66. sys.stderr.write('Unable to find sequencing data for ' + ident + ' of ' + icao)
  67. sys.exit(-1)
  68. found = False
  69. for runway in self.GngData.Runways[icao]:
  70. if ident == runway.Name:
  71. sequence = RunwaySequencing(runway)
  72. sequence.Spacing = spacings[ident]
  73. self.DefaultSequencing.activateRunway(sequence)
  74. found = True
  75. break
  76. if False == found:
  77. sys.stderr.write('Unable to find the runway for ' + ident + ' of ' + icao + ' in SCT data!')
  78. sys.exit(-1)
  79. # create the dependencies, if needed
  80. if True == staggered:
  81. if None == planning.get('runwaydependenciesdefault'):
  82. sys.stderr.write('Unable to find the runway dependencies for staggered approaches of ' + icao + '!')
  83. sys.exit(-1)
  84. dependencies = list(filter(None, planning['runwaydependenciesdefault'].split(':')))
  85. if 0 != len(dependencies) % 2:
  86. sys.stderr.write('No valid set of runway dependencies found!')
  87. sys.exit(-1)
  88. for i in range(0, len(dependencies), 2):
  89. self.DefaultSequencing.addDependency(dependencies[i], dependencies[i + 1])
  90. def parseConstraints(self, planning):
  91. self.ArrivalRouteConstraints = {}
  92. # check if the IAF sequence constraint is defined
  93. if 'iafsequence' in planning:
  94. self.IafSpacing = float(planning['iafsequence'])
  95. else:
  96. self.IafSpacing = 10.0
  97. # parse the arrival constraints
  98. for key in planning:
  99. if True == key.startswith('constraints'):
  100. star = key.replace('constraints', '').upper()
  101. if '' != star:
  102. elements = list(filter(None, planning[key].split(':')))
  103. if 3 > len(elements):
  104. sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
  105. sys.exit(-1)
  106. waypoints = []
  107. # values for the waypoint constraints
  108. waypointName = elements[0]
  109. constraints = [-1, -1]
  110. isBaseTurn = False
  111. isFinalTurn = False
  112. index = 1
  113. while index < len(elements):
  114. if 'A' == elements[index] or 'S' == elements[index]:
  115. if index + 1 == len(elements) or False == elements[index + 1].isnumeric():
  116. sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
  117. sys.exit(-1)
  118. if 'A' == elements[index]:
  119. constraints[0] = int(elements[index + 1])
  120. else:
  121. constraints[1] = int(elements[index + 1])
  122. index += 1
  123. elif 'B' == elements[index]:
  124. isBaseTurn = True
  125. elif 'F' == elements[index]:
  126. isFinalTurn = True
  127. else:
  128. if False == isBaseTurn and False == isFinalTurn and -1 == constraints[0] and -1 == constraints[1] and '' == waypointName:
  129. sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
  130. sys.exit(-1)
  131. if True == isBaseTurn and True == isFinalTurn:
  132. sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
  133. sys.exit(-1)
  134. waypoints.append(Waypoint(name = waypointName, base = isBaseTurn, final = isFinalTurn))
  135. if -1 != constraints[0]:
  136. waypoints[-1].Altitude = constraints[0]
  137. if -1 != constraints[1]:
  138. waypoints[-1].Speed = constraints[1]
  139. # reset temporary data
  140. waypointName = elements[index]
  141. constraints = [-1, -1]
  142. isBaseTurn = False
  143. isFinalTurn = False
  144. index += 1
  145. # check if we have to add the last waypoint
  146. if 0 != len(waypoints) and waypointName != waypoints[-1].Name:
  147. waypoints.append(Waypoint(name = waypointName, base = isBaseTurn, final = isFinalTurn))
  148. if -1 != constraints[0]:
  149. waypoints[-1].Altitude = constraints[0]
  150. if -1 != constraints[1]:
  151. waypoints[-1].Speed = constraints[1]
  152. # register the arrival route
  153. self.ArrivalRouteConstraints[star] = waypoints
  154. def parseAssignment(assignment : str):
  155. elements = list(filter(None, assignment.split(':')))
  156. retval = {}
  157. type = None
  158. index = 0
  159. while index < len(elements):
  160. if 0 == index % 2:
  161. if 'A' == elements[index]:
  162. type = RunwayAssignmentType.AircraftType
  163. elif 'G' == elements[index]:
  164. type = RunwayAssignmentType.GateAssignment
  165. else:
  166. sys.stderr.write('Invalid assignment type: ' + elements[index])
  167. sys.exit(-1)
  168. else:
  169. if None == type:
  170. sys.stderr.write('No assignment type defined')
  171. sys.exit(-1)
  172. if type not in retval:
  173. retval.setdefault(type, [])
  174. retval[type].append(elements[index])
  175. type = None
  176. index += 1
  177. return retval
  178. def findRunway(self, icao : str, name : str):
  179. for runway in self.GngData.Runways[icao]:
  180. if name == runway.Name:
  181. return runway
  182. sys.stderr.write('Unable to find runway ' + name + ' in the sequencing data for ' + icao)
  183. raise Exception()
  184. def updateRunwayAssignment(dictionary, runway, assignments):
  185. if runway not in dictionary:
  186. dictionary.setdefault(runway, {})
  187. for key in assignments:
  188. if key not in dictionary[runway]:
  189. dictionary[runway].setdefault(key, assignments[key])
  190. else:
  191. dictionary[runway][key].extend(assignments[key])
  192. def parseOptimization(self, key : str, line : str):
  193. star = key.replace('optimization', '').upper()
  194. # check if the STAR exists
  195. found = False
  196. for rwy in self.GngData.ArrivalRoutes:
  197. for route in self.GngData.ArrivalRoutes[rwy]:
  198. if star == route.Name:
  199. found = True
  200. break
  201. if True == found:
  202. break
  203. if False == found:
  204. sys.stderr.write('Unknown star:' + key)
  205. raise Exception()
  206. elements = line.split(':')
  207. if 2 != len(elements):
  208. sys.stderr.write('Invalid optimization parameter for ' + key)
  209. raise Exception()
  210. maxTTG = int(elements[0])
  211. ttgRatio = float(elements[1])
  212. return star, maxTTG, ttgRatio
  213. def updateOptimizationParameters(dictionary, star, maxTTG, ttgRatio):
  214. if star not in dictionary:
  215. dictionary.setdefault(star, [])
  216. dictionary[star] = [ maxTTG, ttgRatio ]
  217. def parseRunwayAssignment(self, icao : str, planning):
  218. self.OptimizationParameters = {}
  219. self.RunwayAssignmentsShall = {}
  220. self.RunwayAssignmentsShould = {}
  221. self.RunwayAssignmentsMay = {}
  222. self.MaxDelayMay = timedelta(minutes=10)
  223. mayFound = False
  224. for key in planning:
  225. if True == key.startswith('shallassign'):
  226. runway = self.findRunway(icao, key.replace('shallassign', '').upper())
  227. assignments = Airport.parseAssignment(planning[key])
  228. Airport.updateRunwayAssignment(self.RunwayAssignmentsShall, runway, assignments)
  229. elif True == key.startswith('shouldassign'):
  230. runway = self.findRunway(icao, key.replace('shouldassign', '').upper())
  231. assignments = Airport.parseAssignment(planning[key])
  232. Airport.updateRunwayAssignment(self.RunwayAssignmentsShould, runway, assignments)
  233. elif True == key.startswith('mayassign'):
  234. runway = self.findRunway(icao, key.replace('mayassign', '').upper())
  235. assignments = Airport.parseAssignment(planning[key])
  236. Airport.updateRunwayAssignment(self.RunwayAssignmentsMay, runway, assignments)
  237. mayFound = True
  238. elif True == key.startswith('optimization'):
  239. star, maxTTG, ttgRatio = self.parseOptimization(key, planning[key])
  240. Airport.updateOptimizationParameters(self.OptimizationParameters, star, maxTTG, ttgRatio)
  241. # find the max delays
  242. if True == mayFound:
  243. if 'maxdelaymay' not in planning:
  244. sys.stderr.write('maxDelaymay needs to be defined')
  245. sys.exit(-1)
  246. self.MaxDelayMay = timedelta(minutes=int(planning['maxdelaymay']))
  247. def parseWebUI(self, webui):
  248. self.IafColorization = {}
  249. for key in webui:
  250. if 'iafcolorization' == key:
  251. elements = list(filter(None, webui[key].split(':')))
  252. for i in range(0, len(elements), 4):
  253. self.IafColorization[elements[i]] = [ int(elements[i + 1]), int(elements[i + 2]), int(elements[i + 3]) ]
  254. def __init__(self, filepath : str, icao : str):
  255. config = configparser.ConfigParser()
  256. config.read(filepath)
  257. dataConfig = None
  258. planningConfig = None
  259. rhcConfig = None
  260. webUiConfig = None
  261. # search the required sections
  262. for key in config:
  263. if 'DATA' == key:
  264. dataConfig = config['DATA']
  265. elif 'PLANNING' == key:
  266. planningConfig = config['PLANNING']
  267. elif 'RHC' == key:
  268. rhcConfig = config['RHC']
  269. elif 'WEBUI' == key:
  270. webUiConfig = config['WEBUI']
  271. # find the GNG-file data
  272. sctFile, eseFile = Airport.findGngData(dataConfig, os.path.dirname(filepath))
  273. if None == sctFile or None == eseFile:
  274. sys.stderr.write('No GNG-files found')
  275. sys.exit(-1)
  276. # parse the planning information
  277. if None == planningConfig or False == self.parsePlanning(planningConfig):
  278. sys.stderr.write('No planning configuration found')
  279. sys.exit(-1)
  280. requiredArrivalRoutes = self.parsePlanning(planningConfig)
  281. if 0 == len(requiredArrivalRoutes):
  282. sys.stderr.write('No valid planning configuration found')
  283. sys.exit(-1)
  284. # parse the RHC information
  285. if None == rhcConfig:
  286. sys.stderr.write('No RHC configuration found')
  287. sys.exit(-1)
  288. self.RecedingHorizonControl = RHC(rhcConfig)
  289. # check if thw WebUI information is available
  290. if None == webUiConfig:
  291. sys.stderr.write('No WEBUI configuration found')
  292. sys.exit(-1)
  293. # parse the GNG data
  294. print('Used GNG-Data: ' + eseFile)
  295. self.GngData = SctEseFormat(sctFile, eseFile, icao, requiredArrivalRoutes)
  296. # get the GAFOR id
  297. if None == dataConfig.get('gaforid'):
  298. sys.stderr.write('No GAFOR-ID found!')
  299. sys.exit(-1)
  300. self.GaforId = int(dataConfig['gaforid'])
  301. # get the default sequencing data
  302. self.parseDefaultSequencingConfiguration(icao, planningConfig)
  303. self.parseConstraints(planningConfig)
  304. self.parseRunwayAssignment(icao, planningConfig)
  305. self.parseWebUI(webUiConfig)
  306. self.assignmentUpdate(self.DefaultSequencing)
  307. def assignmentUpdate(self, sequenceConfig : AirportSequencing):
  308. # initializes the default sequence data
  309. for active in sequenceConfig.ActiveArrivalRunways:
  310. if active.Runway in self.RunwayAssignmentsShall:
  311. active.ShallAssignments = self.RunwayAssignmentsShall[active.Runway]
  312. if active.Runway in self.RunwayAssignmentsShould:
  313. active.ShouldAssignments = self.RunwayAssignmentsShould[active.Runway]
  314. if active.Runway in self.RunwayAssignmentsMay:
  315. active.MayAssignments = self.RunwayAssignmentsMay[active.Runway]