Inbound.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. #!/usr/bin/env python
  2. import pytz
  3. import sys
  4. from datetime import datetime, timedelta
  5. from aman.com import AircraftReport_pb2
  6. from aman.config.AirportSequencing import AirportSequencing
  7. from aman.formats.SctEseFormat import SctEseFormat
  8. from aman.sys.WeatherModel import WeatherModel
  9. from aman.types.PerformanceData import PerformanceData
  10. from aman.types.Waypoint import Waypoint
  11. class Inbound:
  12. def __init__(self, report : AircraftReport_pb2.AircraftReport, sequencingConfig : AirportSequencing, navData : SctEseFormat,
  13. performanceData : PerformanceData, weatherModel : WeatherModel):
  14. self.Report = report
  15. self.CurrentPosition = report.position
  16. self.ReportTime = datetime.strptime(report.reportTime + '+0000', '%Y%m%d%H%M%S%z').replace(tzinfo = pytz.UTC)
  17. # search performance data -> fallback to A320
  18. if self.Report.aircraft.type in performanceData.Aircrafts:
  19. self.PerformanceData = performanceData.Aircrafts[self.Report.aircraft.type]
  20. if None == self.PerformanceData:
  21. self.PerformanceData = performanceData.Aircrafts['A320']
  22. self.findArrivalRunway(sequencingConfig)
  23. self.findArrivalRoute(navData)
  24. flightTime, flightTimeUntilIaf, trackmiles = self.secondsUntilTouchdown(weatherModel)
  25. # calculate the maximum time to gain (assumption: 10% speed increase by acceleration and shortcuts)
  26. avgSpeed = self.Report.distanceToIAF / (float(flightTimeUntilIaf.seconds) / 3600.0)
  27. self.MaximumTimeToGain = flightTimeUntilIaf - timedelta(minutes = (self.Report.distanceToIAF / (avgSpeed * 1.1)) * 60)
  28. avgSpeed = trackmiles / (float(flightTime.seconds) / 3600.0)
  29. self.MaximumTimeToGain += flightTime - timedelta(minutes = (trackmiles / (avgSpeed * 1.1)) * 60)
  30. # calculate the different arrival times
  31. self.InitialArrivalTime = self.ReportTime + flightTime
  32. self.EarliestArrivalTime = self.InitialArrivalTime - self.MaximumTimeToGain
  33. self.EstimatedArrivalTime = self.InitialArrivalTime
  34. self.EstimatedStarEntryTime = None
  35. def findArrivalRunway(self, sequencingConfig : AirportSequencing):
  36. self.PlannedRunway = None
  37. # find the nearest runway for an initial guess
  38. distance = 100000.0
  39. currentPosition = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude)
  40. for runway in sequencingConfig.ActiveArrivalRunways:
  41. candidateDistance = runway.Runway.Start.haversine(currentPosition)
  42. if distance > candidateDistance:
  43. self.PlannedRunway = runway
  44. distance = candidateDistance
  45. def findArrivalRoute(self, navData : SctEseFormat):
  46. self.PlannedStar = None
  47. if None == self.PlannedRunway:
  48. return
  49. for arrivalRunway in navData.ArrivalRoutes:
  50. if arrivalRunway == self.PlannedRunway.Runway.Name:
  51. stars = navData.ArrivalRoutes[arrivalRunway]
  52. for star in stars:
  53. if 0 != len(star.Route) and self.Report.initialApproachFix == star.Iaf.Name:
  54. self.PlannedStar = star
  55. return
  56. def secondsUntilTouchdown(self, weather : WeatherModel):
  57. if None == self.PlannedRunway or None == self.PlannedStar:
  58. return timedelta(seconds = 0)
  59. # calculate remaining trackmiles
  60. trackmiles = self.Report.distanceToIAF
  61. start = self.PlannedStar.Route[0]
  62. turnIndices = [ -1, -1 ]
  63. constraints = []
  64. for i in range(0, len(self.PlannedStar.Route)):
  65. # identified the base turn
  66. if True == self.PlannedStar.Route[i].BaseTurn:
  67. turnIndices[0] = i
  68. # identified the final turn
  69. elif -1 != turnIndices[0] and True == self.PlannedStar.Route[i].FinalTurn:
  70. turnIndices[1] = i
  71. # skip waypoints until the final turn point is found
  72. elif -1 != turnIndices[0] and -1 == turnIndices[1]:
  73. continue
  74. trackmiles += start.haversine(self.PlannedStar.Route[i]) * 0.539957
  75. # check if a new constraint is defined
  76. altitude = -1
  77. speed = -1
  78. if None != self.PlannedStar.Route[i].Altitude:
  79. altitude = self.PlannedStar.Route[i].Altitude
  80. if None != self.PlannedStar.Route[i].Speed:
  81. speed = self.PlannedStar.Route[i].Speed
  82. if -1 != altitude or -1 != speed:
  83. constraints.append([ trackmiles, altitude, speed ])
  84. start = self.PlannedStar.Route[i]
  85. # add the remaining distance from the last waypoint to the runway threshold
  86. trackmiles += start.haversine(self.PlannedRunway.Runway.Start)
  87. if turnIndices[0] > turnIndices[1] or (-1 == turnIndices[1] and -1 != turnIndices[0]):
  88. sys.stderr.write('Invalid constraint definition found for ' + self.PlannedStar.Name)
  89. sys.exit(-1)
  90. # calculate descend profile
  91. currentHeading = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude).bearing(self.PlannedStar.Route[0])
  92. currentIAS = self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles)
  93. currentPosition = [ self.Report.dynamics.altitude, self.Report.dynamics.groundSpeed ]
  94. distanceToWaypoint = self.Report.distanceToIAF
  95. flightTimeUntilIafSeconds = 0
  96. flightTimeSeconds = 0
  97. nextWaypointIndex = 0
  98. flownDistance = 0.0
  99. while True:
  100. # check if a constraint cleanup is needed and if a speed-update is needed
  101. if 0 != len(constraints) and flownDistance >= constraints[0][0]:
  102. if -1 != constraints[0][2]:
  103. currentIAS = min(constraints[0][2], self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles - flownDistance))
  104. currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
  105. constraints.pop(0)
  106. # search next altitude constraint
  107. altitudeDistance = 0
  108. nextAltitude = 0
  109. for constraint in constraints:
  110. if -1 != constraint[1]:
  111. altitudeDistance = constraint[0]
  112. nextAltitude = constraint[1]
  113. break
  114. # check if update of altitude and speed is needed on 3° glide
  115. if currentPosition[0] > nextAltitude and ((currentPosition[0] - nextAltitude) / 1000 * 3) > (altitudeDistance - flownDistance):
  116. oldGroundspeed = currentPosition[1]
  117. descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6
  118. newAltitude = currentPosition[0] - descendRate
  119. if 0 > newAltitude:
  120. newAltitude = 0
  121. currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) ]
  122. distance = (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6
  123. else:
  124. distance = currentPosition[1] / 60 / 6
  125. # update the statistics
  126. distanceToWaypoint -= distance
  127. flownDistance += distance
  128. newIAS = min(currentIAS, self.PerformanceData.ias(currentPosition[0], trackmiles - flownDistance))
  129. if newIAS < currentIAS:
  130. currentPosition[1] = min(weather.calculateGS(currentPosition[0], newIAS, currentHeading), currentPosition[1])
  131. currentIAS = newIAS
  132. flightTimeSeconds += 10
  133. if flownDistance <= self.Report.distanceToIAF:
  134. flightTimeUntilIafSeconds += 10
  135. if flownDistance >= trackmiles:
  136. break
  137. # check if we follow a new waypoint pair
  138. if 0 >= distanceToWaypoint:
  139. lastWaypointIndex = nextWaypointIndex
  140. nextWaypointIndex += 1
  141. # check if a skip from base to final turn waypoints is needed
  142. if -1 != turnIndices[0] and nextWaypointIndex > turnIndices[0] and nextWaypointIndex < turnIndices[1]:
  143. nextWaypointIndex = turnIndices[1]
  144. # update the statistics
  145. if nextWaypointIndex < len(self.PlannedStar.Route):
  146. distanceToWaypoint = self.PlannedStar.Route[lastWaypointIndex].haversine(self.PlannedStar.Route[nextWaypointIndex]) * 0.539957
  147. currentHeading = self.PlannedStar.Route[lastWaypointIndex].bearing(self.PlannedStar.Route[nextWaypointIndex])
  148. currentPosition[1] = min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1])
  149. return timedelta(seconds = flightTimeSeconds), timedelta(seconds = flightTimeUntilIafSeconds), trackmiles