Inbound.py 11 KB


  1. #!/usr/bin/env python
  2. import pytz
  3. import sys
  4. from datetime import datetime, timedelta
  5. from aman.com import AircraftReport_pb2
  6. from aman.config.AirportSequencing import AirportSequencing
  7. from aman.formats.SctEseFormat import SctEseFormat
  8. from aman.sys.WeatherModel import WeatherModel
  9. from aman.types.ArrivalWaypoint import ArrivalWaypoint
  10. from aman.types.PerformanceData import PerformanceData
  11. from aman.types.ArrivalRoute import ArrivalRoute
  12. from aman.types.ArrivalData import ArrivalData
  13. from aman.types.Runway import Runway
  14. from aman.types.Waypoint import Waypoint
  15. class Inbound:
  16. def findArrivalRoute(self, runway : Runway, navData : SctEseFormat):
  17. for arrivalRunway in navData.ArrivalRoutes:
  18. if arrivalRunway == runway.Name:
  19. stars = navData.ArrivalRoutes[arrivalRunway]
  20. for star in stars:
  21. if 0 != len(star.Route) and self.Report.initialApproachFix == star.Iaf.Name:
  22. return star
  23. return None
  24. def __init__(self, report : AircraftReport_pb2.AircraftReport, sequencingConfig : AirportSequencing, navData : SctEseFormat,
  25. performanceData : PerformanceData, weatherModel : WeatherModel):
  26. self.Report = report
  27. self.Callsign = report.aircraft.callsign
  28. self.CurrentPosition = report.position
  29. self.ReportTime = datetime.strptime(report.reportTime + '+0000', '%Y%m%d%H%M%S%z').replace(tzinfo = pytz.UTC)
  30. self.InitialArrivalTime = None
  31. self.EarliestArrivalTime = None
  32. self.PlannedArrivalTime = None
  33. self.EstimatedStarEntryTime = None
  34. self.PlannedRunway = None
  35. self.PlannedStar = None
  36. self.PlannedArrivalRoute = None
  37. self.PlannedTrackmiles = None
  38. self.ArrivalCandidates = {}
  39. self.WTC = None
  40. self.FixedSequence = False
  41. # analyze the WTC
  42. wtc = report.aircraft.wtc.upper()
  43. if 'L' == wtc or 'M' == wtc or 'H' == wtc or 'J' == wtc:
  44. self.WTC = wtc
  45. # search performance data -> fallback to A320
  46. if self.Report.aircraft.type in performanceData.Aircrafts:
  47. self.PerformanceData = performanceData.Aircrafts[self.Report.aircraft.type]
  48. if None == self.PerformanceData:
  49. self.PerformanceData = performanceData.Aircrafts['A320']
  50. # calculate the timings for the different arrival runways
  51. for identifier in sequencingConfig.ActiveArrivalRunways:
  52. star = self.findArrivalRoute(identifier.Runway, navData)
  53. if None != star:
  54. flightTime, flightTimeUntilIaf, trackmiles, arrivalRoute = self.arrivalEstimation(identifier.Runway, star, weatherModel)
  55. avgSpeed = trackmiles / (float(flightTime.seconds) / 3600.0)
  56. # the closer we get to the IAF the less time delta can be achieved by short cuts, delay vectors or speeds
  57. ratio = min(2.0, max(0.0, self.Report.distanceToIAF / (trackmiles - self.Report.distanceToIAF)))
  58. possibleTimeDelta = (trackmiles / (avgSpeed * 0.9)) * 60
  59. ttg = timedelta(minutes = (possibleTimeDelta - flightTime.total_seconds() / 60) * ratio)
  60. ttl = timedelta(minutes = (possibleTimeDelta - flightTime.total_seconds() / 60))
  61. ita = self.ReportTime + flightTime
  62. earliest = ita - ttg
  63. latest = ita + ttl
  64. self.ArrivalCandidates[identifier.Runway.Name] = ArrivalData(ttg = ttg, star = star, ita = ita, earliest = earliest,
  65. entry = flightTimeUntilIaf, touchdown = flightTime,
  66. ttl = ttl, latest = latest, route = arrivalRoute, trackmiles = trackmiles)
  67. # calculate the first values for later plannings
  68. for candidate in self.ArrivalCandidates:
  69. if None == self.EarliestArrivalTime or self.ArrivalCandidates[candidate].EarliestArrivalTime < self.EarliestArrivalTime:
  70. self.InitialArrivalTime = self.ArrivalCandidates[candidate].InitialArrivalTime
  71. self.EarliestArrivalTime = self.ArrivalCandidates[candidate].EarliestArrivalTime
  72. self.EstimatedStarEntryTime = self.ReportTime + self.ArrivalCandidates[candidate].FlightTimeUntilIaf
  73. self.PlannedStar = self.ArrivalCandidates[candidate].Star
  74. if None != self.PlannedStar:
  75. for runway in navData.Runways[self.Report.destination.upper()]:
  76. if runway.Name == self.PlannedStar.Runway:
  77. self.PlannedRunway = runway
  78. break
  79. def arrivalEstimation(self, runway : Runway, star : ArrivalRoute, weather : WeatherModel):
  80. # calculate remaining trackmiles
  81. trackmiles = self.Report.distanceToIAF
  82. start = star.Route[0]
  83. turnIndices = [ -1, -1 ]
  84. constraints = []
  85. for i in range(0, len(star.Route)):
  86. # identified the base turn
  87. if True == star.Route[i].BaseTurn:
  88. turnIndices[0] = i
  89. # identified the final turn
  90. elif -1 != turnIndices[0] and True == star.Route[i].FinalTurn:
  91. turnIndices[1] = i
  92. # skip waypoints until the final turn point is found
  93. elif -1 != turnIndices[0] and -1 == turnIndices[1]:
  94. continue
  95. trackmiles += start.haversine(star.Route[i]) * 0.539957
  96. # check if a new constraint is defined
  97. altitude = -1
  98. speed = -1
  99. if None != star.Route[i].Altitude:
  100. altitude = star.Route[i].Altitude
  101. if None != star.Route[i].Speed:
  102. speed = star.Route[i].Speed
  103. if -1 != altitude or -1 != speed:
  104. constraints.append([ trackmiles, altitude, speed ])
  105. start = star.Route[i]
  106. # add the remaining distance from the last waypoint to the runway threshold
  107. trackmiles += start.haversine(runway.Start)
  108. if turnIndices[0] > turnIndices[1] or (-1 == turnIndices[1] and -1 != turnIndices[0]):
  109. sys.stderr.write('Invalid constraint definition found for ' + star.Name)
  110. sys.exit(-1)
  111. # calculate descend profile
  112. currentHeading = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude).bearing(star.Route[0])
  113. currentIAS = self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles)
  114. currentPosition = [ self.Report.dynamics.altitude, self.Report.dynamics.groundSpeed ]
  115. distanceToWaypoint = self.Report.distanceToIAF
  116. flightTimeUntilIafSeconds = 0
  117. flightTimeSeconds = 0
  118. nextWaypointIndex = 0
  119. flownDistance = 0.0
  120. arrivalRoute = [ ArrivalWaypoint(waypoint = star.Route[0], trackmiles = distanceToWaypoint) ]
  121. while True:
  122. # check if a constraint cleanup is needed and if a speed-update is needed
  123. if 0 != len(constraints) and flownDistance >= constraints[0][0]:
  124. if -1 != constraints[0][2]:
  125. currentIAS = min(constraints[0][2], self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles - flownDistance))
  126. currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
  127. constraints.pop(0)
  128. # search next altitude constraint
  129. altitudeDistance = 0
  130. nextAltitude = 0
  131. for constraint in constraints:
  132. if -1 != constraint[1]:
  133. altitudeDistance = constraint[0]
  134. nextAltitude = constraint[1]
  135. break
  136. # check if update of altitude and speed is needed on 3° glide
  137. if currentPosition[0] > nextAltitude and ((currentPosition[0] - nextAltitude) / 1000 * 3) > (altitudeDistance - flownDistance):
  138. oldGroundspeed = currentPosition[1]
  139. descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6
  140. newAltitude = currentPosition[0] - descendRate
  141. if 0 > newAltitude:
  142. newAltitude = 0
  143. currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) ]
  144. distance = (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6
  145. else:
  146. distance = currentPosition[1] / 60 / 6
  147. # update the statistics
  148. distanceToWaypoint -= distance
  149. flownDistance += distance
  150. newIAS = min(currentIAS, self.PerformanceData.ias(currentPosition[0], trackmiles - flownDistance))
  151. if newIAS < currentIAS:
  152. currentPosition[1] = min(weather.calculateGS(currentPosition[0], newIAS, currentHeading), currentPosition[1])
  153. currentIAS = newIAS
  154. flightTimeSeconds += 10
  155. if flownDistance <= self.Report.distanceToIAF:
  156. flightTimeUntilIafSeconds += 10
  157. if flownDistance >= trackmiles:
  158. break
  159. # check if we follow a new waypoint pair
  160. if 0 >= distanceToWaypoint:
  161. lastWaypointIndex = nextWaypointIndex
  162. nextWaypointIndex += 1
  163. arrivalRoute[-1].FlightTime = timedelta(seconds = flightTimeSeconds)
  164. arrivalRoute[-1].ETA = self.ReportTime + arrivalRoute[-1].FlightTime
  165. arrivalRoute[-1].PTA = arrivalRoute[-1].ETA
  166. arrivalRoute[-1].Altitude = currentPosition[0]
  167. arrivalRoute[-1].IndicatedAirspeed = currentIAS
  168. arrivalRoute[-1].GroundSpeed = currentPosition[1]
  169. # check if a skip from base to final turn waypoints is needed
  170. if -1 != turnIndices[0] and nextWaypointIndex > turnIndices[0] and nextWaypointIndex < turnIndices[1]:
  171. nextWaypointIndex = turnIndices[1]
  172. # update the statistics
  173. if nextWaypointIndex < len(star.Route):
  174. distanceToWaypoint = star.Route[lastWaypointIndex].haversine(star.Route[nextWaypointIndex]) * 0.539957
  175. currentHeading = star.Route[lastWaypointIndex].bearing(star.Route[nextWaypointIndex])
  176. currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
  177. arrivalRoute.append(ArrivalWaypoint(waypoint = star.Route[nextWaypointIndex], trackmiles = arrivalRoute[-1].Trackmiles + distanceToWaypoint))
  178. return timedelta(seconds = flightTimeSeconds), timedelta(seconds = flightTimeUntilIafSeconds), trackmiles, arrivalRoute