Airport.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. #!/usr/bin/env python
  2. import configparser
  3. from datetime import timedelta
  4. import glob
  5. import os
  6. import sys
  7. from aman.config.RHC import RHC
  8. from aman.config.AirportSequencing import AirportSequencing
  9. from aman.config.RunwaySequencing import RunwaySequencing, RunwayAssignmentType
  10. from aman.formats.SctEseFormat import SctEseFormat
  11. from aman.types.Waypoint import Waypoint
  12. class Airport:
  13. def findGngData(data, path):
  14. if None == data.get('gngwildcard'):
  15. return None, None
  16. # find the newest ESE file
  17. files = glob.glob(os.path.join(path, data['gngwildcard'] + '.ese'))
  18. latestEse = max(files, key=os.path.getctime)
  19. # search for the corresponding SCT file
  20. latestSct = os.path.splitext(latestEse)[0] + '.sct'
  21. # check if the files exist
  22. if False == os.path.isfile(latestEse) or False == os.path.isfile(latestSct):
  23. return None, None
  24. return latestSct, latestEse
  25. def parsePlanning(self, planning):
  26. if None == planning.get('routes'):
  27. return []
  28. return planning['routes'].split(':')
  29. def parseDefaultSequencingConfiguration(self, icao : str, planning):
  30. if None == planning.get('activearrivalrunwaydefault'):
  31. sys.stderr.write('No "activearrivalrunwaydefault" entry found!')
  32. sys.exit(-1)
  33. if None == planning.get('activearrivalmodedefault'):
  34. sys.stderr.write('No "activearrivalmodedefault" entry found!')
  35. sys.exit(-1)
  36. if None == planning.get('arrivalspacingdefault'):
  37. sys.stderr.write('No "arrivalspacingdefault" entry found!')
  38. sys.exit(-1)
  39. if not icao in self.GngData.Runways:
  40. sys.stderr.write('Unable to find' + icao + 'in the SCT data!')
  41. sys.exit(-1)
  42. # parse the default arrival mode
  43. if 'STAGGERED' == planning['activearrivalmodedefault']:
  44. staggered = True
  45. elif 'IPA' == planning['activearrivalmodedefault']:
  46. staggered = False
  47. else:
  48. sys.stderr.write('Unknown arrival mode in "" found! (STAGGERED or IPA needs to be set)')
  49. sys.exit(-1)
  50. # translate the spacing into a map
  51. ident = ''
  52. spacings = {}
  53. spacingConfig = list(filter(None, planning['arrivalspacingdefault'].split(':')))
  54. for i in range(0, len(spacingConfig)):
  55. if 0 == i % 2:
  56. ident = spacingConfig[i]
  57. elif '' != ident:
  58. spacings[ident] = int(spacingConfig[i])
  59. else:
  60. sys.stderr.write('No runway defined in "arrivalspacingdefault"!')
  61. sys.exit(-1)
  62. # create the sequencing data per runway
  63. self.DefaultSequencing = AirportSequencing(icao)
  64. for ident in list(filter(None, planning['activearrivalrunwaydefault'].split(':'))):
  65. if not ident in spacings:
  66. sys.stderr.write('Unable to find sequencing data for ' + ident + ' of ' + icao)
  67. sys.exit(-1)
  68. found = False
  69. for runway in self.GngData.Runways[icao]:
  70. if ident == runway.Name:
  71. sequence = RunwaySequencing(runway)
  72. sequence.Spacing = spacings[ident]
  73. self.DefaultSequencing.activateRunway(sequence)
  74. found = True
  75. break
  76. if False == found:
  77. sys.stderr.write('Unable to find the runway for ' + ident + ' of ' + icao + ' in SCT data!')
  78. sys.exit(-1)
  79. # create the dependencies, if needed
  80. if True == staggered:
  81. if None == planning.get('runwaydependenciesdefault'):
  82. sys.stderr.write('Unable to find the runway dependencies for staggered approaches of ' + icao + '!')
  83. sys.exit(-1)
  84. dependencies = list(filter(None, planning['runwaydependenciesdefault'].split(':')))
  85. if 0 != len(dependencies) % 2:
  86. sys.stderr.write('No valid set of runway dependencies found!')
  87. sys.exit(-1)
  88. for i in range(0, len(dependencies), 2):
  89. self.DefaultSequencing.addDependency(dependencies[i], dependencies[i + 1])
  90. def parseConstraints(self, planning):
  91. self.ArrivalRouteConstraints = {}
  92. for key in planning:
  93. if True == key.startswith('constraints'):
  94. star = key.replace('constraints', '').upper()
  95. if '' != star:
  96. elements = list(filter(None, planning[key].split(':')))
  97. if 3 > len(elements):
  98. sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
  99. sys.exit(-1)
  100. waypoints = []
  101. # values for the waypoint constraints
  102. waypointName = elements[0]
  103. constraints = [-1, -1]
  104. isBaseTurn = False
  105. isFinalTurn = False
  106. index = 1
  107. while index < len(elements):
  108. if 'A' == elements[index] or 'S' == elements[index]:
  109. if index + 1 == len(elements) or False == elements[index + 1].isnumeric():
  110. sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
  111. sys.exit(-1)
  112. if 'A' == elements[index]:
  113. constraints[0] = int(elements[index + 1])
  114. else:
  115. constraints[1] = int(elements[index + 1])
  116. index += 1
  117. elif 'B' == elements[index]:
  118. isBaseTurn = True
  119. elif 'F' == elements[index]:
  120. isFinalTurn = True
  121. else:
  122. if False == isBaseTurn and False == isFinalTurn and -1 == constraints[0] and -1 == constraints[1] and '' == waypointName:
  123. sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
  124. sys.exit(-1)
  125. if True == isBaseTurn and True == isFinalTurn:
  126. sys.stderr.write('Invalid constraint line: ' + key + '=' + planning[key])
  127. sys.exit(-1)
  128. waypoints.append(Waypoint(name = waypointName, base = isBaseTurn, final = isFinalTurn))
  129. if -1 != constraints[0]:
  130. waypoints[-1].Altitude = constraints[0]
  131. if -1 != constraints[1]:
  132. waypoints[-1].Speed = constraints[1]
  133. # reset temporary data
  134. waypointName = elements[index]
  135. constraints = [-1, -1]
  136. isBaseTurn = False
  137. isFinalTurn = False
  138. index += 1
  139. # check if we have to add the last waypoint
  140. if 0 != len(waypoints) and waypointName != waypoints[-1].Name:
  141. waypoints.append(Waypoint(name = waypointName, base = isBaseTurn, final = isFinalTurn))
  142. if -1 != constraints[0]:
  143. waypoints[-1].Altitude = constraints[0]
  144. if -1 != constraints[1]:
  145. waypoints[-1].Speed = constraints[1]
  146. # register the arrival route
  147. self.ArrivalRouteConstraints[star] = waypoints
  148. def parseAssignment(assignment : str):
  149. elements = list(filter(None, assignment.split(':')))
  150. retval = {}
  151. type = None
  152. index = 0
  153. while index < len(elements):
  154. if 0 == index % 2:
  155. if 'A' == elements[index]:
  156. type = RunwayAssignmentType.AircraftType
  157. elif 'G' == elements[index]:
  158. type = RunwayAssignmentType.GateAssignment
  159. else:
  160. sys.stderr.write('Invalid assignment type: ' + elements[index])
  161. sys.exit(-1)
  162. else:
  163. if None == type:
  164. sys.stderr.write('No assignment type defined')
  165. sys.exit(-1)
  166. if type not in retval:
  167. retval.setdefault(type, [])
  168. retval[type].append(elements[index])
  169. type = None
  170. index += 1
  171. return retval
  172. def findRunway(self, icao : str, name : str):
  173. for runway in self.GngData.Runways[icao]:
  174. if name == runway.Name:
  175. return runway
  176. sys.stderr.write('Unable to find runway ' + name + ' in the sequencing data for ' + icao)
  177. raise Exception()
  178. def updateRunwayAssignment(dictionary, runway, assignments):
  179. if runway not in dictionary:
  180. dictionary.setdefault(runway, {})
  181. for key in assignments:
  182. if key not in dictionary[runway]:
  183. dictionary[runway].setdefault(key, assignments[key])
  184. else:
  185. dictionary[runway][key].extend(assignments[key])
  186. def parseOptimization(self, key : str, line : str):
  187. star = key.replace('optimization', '').upper()
  188. # check if the STAR exists
  189. found = False
  190. for rwy in self.GngData.ArrivalRoutes:
  191. for route in self.GngData.ArrivalRoutes[rwy]:
  192. if star == route.Name:
  193. found = True
  194. break
  195. if True == found:
  196. break
  197. if False == found:
  198. sys.stderr.write('Unknown star:' + key)
  199. raise Exception()
  200. elements = line.split(':')
  201. if 2 != len(elements):
  202. sys.stderr.write('Invalid optimization parameter for ' + key)
  203. raise Exception()
  204. maxTTG = int(elements[0])
  205. ttgRatio = float(elements[1])
  206. return star, maxTTG, ttgRatio
  207. def updateOptimizationParameters(dictionary, star, maxTTG, ttgRatio):
  208. if star not in dictionary:
  209. dictionary.setdefault(star, [])
  210. dictionary[star] = [ maxTTG, ttgRatio ]
  211. def parseRunwayAssignment(self, icao : str, planning):
  212. self.OptimizationParameters = {}
  213. self.RunwayAssignmentsShall = {}
  214. self.RunwayAssignmentsShould = {}
  215. self.RunwayAssignmentsMay = {}
  216. self.MaxDelayMay = timedelta(minutes=10)
  217. mayFound = False
  218. for key in planning:
  219. if True == key.startswith('shallassign'):
  220. runway = self.findRunway(icao, key.replace('shallassign', '').upper())
  221. assignments = Airport.parseAssignment(planning[key])
  222. Airport.updateRunwayAssignment(self.RunwayAssignmentsShall, runway, assignments)
  223. elif True == key.startswith('shouldassign'):
  224. runway = self.findRunway(icao, key.replace('shouldassign', '').upper())
  225. assignments = Airport.parseAssignment(planning[key])
  226. Airport.updateRunwayAssignment(self.RunwayAssignmentsShould, runway, assignments)
  227. elif True == key.startswith('mayassign'):
  228. runway = self.findRunway(icao, key.replace('mayassign', '').upper())
  229. assignments = Airport.parseAssignment(planning[key])
  230. Airport.updateRunwayAssignment(self.RunwayAssignmentsMay, runway, assignments)
  231. mayFound = True
  232. elif True == key.startswith('optimization'):
  233. star, maxTTG, ttgRatio = self.parseOptimization(key, planning[key])
  234. Airport.updateOptimizationParameters(self.OptimizationParameters, star, maxTTG, ttgRatio)
  235. # find the max delays
  236. if True == mayFound:
  237. if 'maxdelaymay' not in planning:
  238. sys.stderr.write('maxDelaymay needs to be defined')
  239. sys.exit(-1)
  240. self.MaxDelayMay = timedelta(minutes=int(planning['maxdelaymay']))
  241. def __init__(self, filepath : str, icao : str):
  242. config = configparser.ConfigParser()
  243. config.read(filepath)
  244. dataConfig = None
  245. planningConfig = None
  246. rhcConfig = None
  247. # search the required sections
  248. for key in config:
  249. if 'DATA' == key:
  250. dataConfig = config['DATA']
  251. elif 'PLANNING' == key:
  252. planningConfig = config['PLANNING']
  253. elif 'RHC' == key:
  254. rhcConfig = config['RHC']
  255. # find the GNG-file data
  256. sctFile, eseFile = Airport.findGngData(dataConfig, os.path.dirname(filepath))
  257. if None == sctFile or None == eseFile:
  258. sys.stderr.write('No GNG-files found')
  259. sys.exit(-1)
  260. # parse the planning information
  261. if None == planningConfig or False == self.parsePlanning(planningConfig):
  262. sys.stderr.write('No planning configuration found')
  263. sys.exit(-1)
  264. requiredArrivalRoutes = self.parsePlanning(planningConfig)
  265. if 0 == len(requiredArrivalRoutes):
  266. sys.stderr.write('No valid planning configuration found')
  267. sys.exit(-1)
  268. # parse the RHC information
  269. if None == rhcConfig:
  270. sys.stderr.write('No RHC configuration found')
  271. sys.exit(-1)
  272. self.RecedingHorizonControl = RHC(rhcConfig)
  273. # parse the GNG data
  274. print('Used GNG-Data: ' + eseFile)
  275. self.GngData = SctEseFormat(sctFile, eseFile, icao, requiredArrivalRoutes)
  276. # get the GAFOR id
  277. if None == dataConfig.get('gaforid'):
  278. sys.stderr.write('No GAFOR-ID found!')
  279. sys.exit(-1)
  280. self.GaforId = int(dataConfig['gaforid'])
  281. # get the default sequencing data
  282. self.parseDefaultSequencingConfiguration(icao, planningConfig)
  283. self.parseConstraints(planningConfig)
  284. self.parseRunwayAssignment(icao, planningConfig)
  285. self.assignmentUpdate(self.DefaultSequencing)
  286. def assignmentUpdate(self, sequenceConfig : AirportSequencing):
  287. # initializes the default sequence data
  288. for active in sequenceConfig.ActiveArrivalRunways:
  289. if active.Runway in self.RunwayAssignmentsShall:
  290. active.ShallAssignments = self.RunwayAssignmentsShall[active.Runway]
  291. if active.Runway in self.RunwayAssignmentsShould:
  292. active.ShouldAssignments = self.RunwayAssignmentsShould[active.Runway]
  293. if active.Runway in self.RunwayAssignmentsMay:
  294. active.MayAssignments = self.RunwayAssignmentsMay[active.Runway]