Inbound.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. #!/usr/bin/env python
  2. import pytz
  3. import sys
  4. from datetime import datetime, timedelta
  5. from aman.com import AircraftReport_pb2
  6. from aman.config.AirportSequencing import AirportSequencing
  7. from aman.formats.SctEseFormat import SctEseFormat
  8. from aman.sys.WeatherModel import WeatherModel
  9. from aman.types.ArrivalWaypoint import ArrivalWaypoint
  10. from aman.types.PerformanceData import PerformanceData
  11. from aman.types.ArrivalRoute import ArrivalRoute
  12. from aman.types.ArrivalData import ArrivalData
  13. from aman.types.Runway import Runway
  14. from aman.types.Waypoint import Waypoint
  15. class Inbound:
  16. def findArrivalRoute(self, runway : Runway, navData : SctEseFormat):
  17. for arrivalRunway in navData.ArrivalRoutes:
  18. if arrivalRunway == runway.Name:
  19. stars = navData.ArrivalRoutes[arrivalRunway]
  20. for star in stars:
  21. if 0 != len(star.Route) and self.Report.initialApproachFix == star.Iaf.Name:
  22. return star
  23. return None
  24. def __init__(self, report : AircraftReport_pb2.AircraftReport, sequencingConfig : AirportSequencing, navData : SctEseFormat,
  25. performanceData : PerformanceData, weatherModel : WeatherModel):
  26. self.Report = report
  27. self.Callsign = report.aircraft.callsign
  28. self.CurrentPosition = report.position
  29. self.ReportTime = datetime.strptime(report.reportTime + '+0000', '%Y%m%d%H%M%S%z').replace(tzinfo = pytz.UTC)
  30. self.InitialArrivalTime = None
  31. self.EarliestArrivalTime = None
  32. self.PlannedArrivalTime = None
  33. self.EstimatedStarEntryTime = None
  34. self.PlannedRunway = None
  35. self.PlannedStar = None
  36. self.PlannedArrivalRoute = None
  37. self.PlannedTrackmiles = None
  38. self.ArrivalCandidates = {}
  39. self.WTC = None
  40. self.FixedSequence = False
  41. # analyze the WTC
  42. wtc = report.aircraft.wtc.upper()
  43. if 'L' == wtc or 'M' == wtc or 'H' == wtc or 'J' == wtc:
  44. self.WTC = wtc
  45. # search performance data -> fallback to A320
  46. if self.Report.aircraft.type in performanceData.Aircrafts:
  47. self.PerformanceData = performanceData.Aircrafts[self.Report.aircraft.type]
  48. if None == self.PerformanceData:
  49. self.PerformanceData = performanceData.Aircrafts['A320']
  50. # calculate the timings for the different arrival runways
  51. for identifier in sequencingConfig.ActiveArrivalRunways:
  52. star = self.findArrivalRoute(identifier.Runway, navData)
  53. if None != star:
  54. flightTime, flightTimeUntilIaf, trackmiles, arrivalRoute = self.arrivalEstimation(identifier.Runway, star, weatherModel)
  55. avgSpeed = trackmiles / (float(flightTime.seconds) / 3600.0)
  56. # the closer we get to the IAF the less time delta can be achieved by short cuts, delay vectors or speeds
  57. ratio = min(2.0, max(0.0, self.Report.distanceToIAF / (trackmiles - self.Report.distanceToIAF)))
  58. possibleTimeDelta = (trackmiles / (avgSpeed * 0.9)) * 60
  59. timeDelta = timedelta(minutes = (possibleTimeDelta - flightTime.total_seconds() / 60) * ratio)
  60. ita = self.ReportTime + flightTime
  61. earliest = ita - timeDelta
  62. latest = ita + timeDelta
  63. self.ArrivalCandidates[identifier.Runway.Name] = ArrivalData(ttg = timeDelta, star = star, ita = ita, earliest = earliest,
  64. entry = flightTimeUntilIaf, touchdown = flightTime,
  65. ttl = timeDelta, latest = latest, route = arrivalRoute, trackmiles = trackmiles)
  66. # calculate the first values for later plannings
  67. for candidate in self.ArrivalCandidates:
  68. if None == self.EarliestArrivalTime or self.ArrivalCandidates[candidate].EarliestArrivalTime < self.EarliestArrivalTime:
  69. self.InitialArrivalTime = self.ArrivalCandidates[candidate].InitialArrivalTime
  70. self.EarliestArrivalTime = self.ArrivalCandidates[candidate].EarliestArrivalTime
  71. self.EstimatedStarEntryTime = self.ReportTime + self.ArrivalCandidates[candidate].FlightTimeUntilIaf
  72. self.PlannedStar = self.ArrivalCandidates[candidate].Star
  73. if None != self.PlannedStar:
  74. for runway in navData.Runways[self.Report.destination.upper()]:
  75. if runway.Name == self.PlannedStar.Runway:
  76. self.PlannedRunway = runway
  77. break
  78. def arrivalEstimation(self, runway : Runway, star : ArrivalRoute, weather : WeatherModel):
  79. # calculate remaining trackmiles
  80. trackmiles = self.Report.distanceToIAF
  81. start = star.Route[0]
  82. turnIndices = [ -1, -1 ]
  83. constraints = []
  84. for i in range(0, len(star.Route)):
  85. # identified the base turn
  86. if True == star.Route[i].BaseTurn:
  87. turnIndices[0] = i
  88. # identified the final turn
  89. elif -1 != turnIndices[0] and True == star.Route[i].FinalTurn:
  90. turnIndices[1] = i
  91. # skip waypoints until the final turn point is found
  92. elif -1 != turnIndices[0] and -1 == turnIndices[1]:
  93. continue
  94. trackmiles += start.haversine(star.Route[i]) * 0.539957
  95. # check if a new constraint is defined
  96. altitude = -1
  97. speed = -1
  98. if None != star.Route[i].Altitude:
  99. altitude = star.Route[i].Altitude
  100. if None != star.Route[i].Speed:
  101. speed = star.Route[i].Speed
  102. if -1 != altitude or -1 != speed:
  103. constraints.append([ trackmiles, altitude, speed ])
  104. start = star.Route[i]
  105. # add the remaining distance from the last waypoint to the runway threshold
  106. trackmiles += start.haversine(runway.Start)
  107. if turnIndices[0] > turnIndices[1] or (-1 == turnIndices[1] and -1 != turnIndices[0]):
  108. sys.stderr.write('Invalid constraint definition found for ' + star.Name)
  109. sys.exit(-1)
  110. # calculate descend profile
  111. currentHeading = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude).bearing(star.Route[0])
  112. currentIAS = self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles)
  113. currentPosition = [ self.Report.dynamics.altitude, self.Report.dynamics.groundSpeed ]
  114. distanceToWaypoint = self.Report.distanceToIAF
  115. flightTimeUntilIafSeconds = 0
  116. flightTimeSeconds = 0
  117. nextWaypointIndex = 0
  118. flownDistance = 0.0
  119. arrivalRoute = [ ArrivalWaypoint(waypoint = star.Route[0], trackmiles = distanceToWaypoint) ]
  120. while True:
  121. # check if a constraint cleanup is needed and if a speed-update is needed
  122. if 0 != len(constraints) and flownDistance >= constraints[0][0]:
  123. if -1 != constraints[0][2]:
  124. currentIAS = min(constraints[0][2], self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles - flownDistance))
  125. currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
  126. constraints.pop(0)
  127. # search next altitude constraint
  128. altitudeDistance = 0
  129. nextAltitude = 0
  130. for constraint in constraints:
  131. if -1 != constraint[1]:
  132. altitudeDistance = constraint[0]
  133. nextAltitude = constraint[1]
  134. break
  135. # check if update of altitude and speed is needed on 3° glide
  136. if currentPosition[0] > nextAltitude and ((currentPosition[0] - nextAltitude) / 1000 * 3) > (altitudeDistance - flownDistance):
  137. oldGroundspeed = currentPosition[1]
  138. descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6
  139. newAltitude = currentPosition[0] - descendRate
  140. if 0 > newAltitude:
  141. newAltitude = 0
  142. currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) ]
  143. distance = (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6
  144. else:
  145. distance = currentPosition[1] / 60 / 6
  146. # update the statistics
  147. distanceToWaypoint -= distance
  148. flownDistance += distance
  149. newIAS = min(currentIAS, self.PerformanceData.ias(currentPosition[0], trackmiles - flownDistance))
  150. if newIAS < currentIAS:
  151. currentPosition[1] = min(weather.calculateGS(currentPosition[0], newIAS, currentHeading), currentPosition[1])
  152. currentIAS = newIAS
  153. flightTimeSeconds += 10
  154. if flownDistance <= self.Report.distanceToIAF:
  155. flightTimeUntilIafSeconds += 10
  156. if flownDistance >= trackmiles:
  157. break
  158. # check if we follow a new waypoint pair
  159. if 0 >= distanceToWaypoint:
  160. lastWaypointIndex = nextWaypointIndex
  161. nextWaypointIndex += 1
  162. arrivalRoute[-1].FlightTime = timedelta(seconds = flightTimeSeconds)
  163. arrivalRoute[-1].ETA = self.ReportTime + arrivalRoute[-1].FlightTime
  164. arrivalRoute[-1].PTA = arrivalRoute[-1].ETA
  165. # check if a skip from base to final turn waypoints is needed
  166. if -1 != turnIndices[0] and nextWaypointIndex > turnIndices[0] and nextWaypointIndex < turnIndices[1]:
  167. nextWaypointIndex = turnIndices[1]
  168. # update the statistics
  169. if nextWaypointIndex < len(star.Route):
  170. distanceToWaypoint = star.Route[lastWaypointIndex].haversine(star.Route[nextWaypointIndex]) * 0.539957
  171. currentHeading = star.Route[lastWaypointIndex].bearing(star.Route[nextWaypointIndex])
  172. currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
  173. arrivalRoute.append(ArrivalWaypoint(waypoint = star.Route[nextWaypointIndex], trackmiles = arrivalRoute[-1].Trackmiles + distanceToWaypoint))
  174. return timedelta(seconds = flightTimeSeconds), timedelta(seconds = flightTimeUntilIafSeconds), trackmiles, arrivalRoute