Inbound.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. #!/usr/bin/env python
  2. import pytz
  3. import sys
  4. from datetime import datetime, timedelta
  5. from aman.com import AircraftReport_pb2
  6. from aman.config.AirportSequencing import AirportSequencing
  7. from aman.formats.SctEseFormat import SctEseFormat
  8. from aman.sys.WeatherModel import WeatherModel
  9. from aman.types.PerformanceData import PerformanceData
  10. from aman.types.Waypoint import Waypoint
  11. class Inbound:
  12. def __init__(self, report : AircraftReport_pb2.AircraftReport, sequencingConfig : AirportSequencing, navData : SctEseFormat,
  13. performanceData : PerformanceData, weatherModel : WeatherModel):
  14. self.Report = report
  15. self.CurrentPosition = report.position
  16. self.ReportTime = datetime.strptime(report.reportTime + '+0000', '%Y%m%d%H%M%S%z').replace(tzinfo = pytz.UTC)
  17. # search performance data -> fallback to A320
  18. if self.Report.aircraft.type in performanceData.Aircrafts:
  19. self.PerformanceData = performanceData.Aircrafts[self.Report.aircraft.type]
  20. if None == self.PerformanceData:
  21. self.PerformanceData = performanceData.Aircrafts['A320']
  22. self.findArrivalRunway(sequencingConfig)
  23. self.findArrivalRoute(navData)
  24. duration = self.secondsUntilTouchdown(weatherModel)
  25. self.InitialArrivalTime = self.ReportTime + duration
  26. self.EstimatedArrivalTime = self.InitialArrivalTime
  27. self.EstimatedStarEntryTime = None
  28. def findArrivalRunway(self, sequencingConfig : AirportSequencing):
  29. self.PlannedRunway = None
  30. # find the nearest runway for an initial guess
  31. distance = 100000.0
  32. currentPosition = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude)
  33. for runway in sequencingConfig.ActiveArrivalRunways:
  34. candidateDistance = runway.Runway.Start.haversine(currentPosition)
  35. if distance > candidateDistance:
  36. self.PlannedRunway = runway
  37. distance = candidateDistance
  38. def findArrivalRoute(self, navData : SctEseFormat):
  39. self.PlannedStar = None
  40. if None == self.PlannedRunway:
  41. return
  42. for arrivalRunway in navData.ArrivalRoutes:
  43. if arrivalRunway == self.PlannedRunway.Runway.Name:
  44. stars = navData.ArrivalRoutes[arrivalRunway]
  45. for star in stars:
  46. if 0 != len(star.Route) and self.Report.initialApproachFix == star.Iaf.Name:
  47. self.PlannedStar = star
  48. return
  49. def secondsUntilTouchdown(self, weather : WeatherModel):
  50. if None == self.PlannedRunway or None == self.PlannedStar:
  51. return timedelta(seconds = 0)
  52. # calculate remaining trackmiles
  53. trackmiles = self.Report.distanceToIAF
  54. start = self.PlannedStar.Route[0]
  55. turnIndices = [ -1, -1 ]
  56. constraints = []
  57. for i in range(0, len(self.PlannedStar.Route)):
  58. # identified the base turn
  59. if True == self.PlannedStar.Route[i].BaseTurn:
  60. turnIndices[0] = i
  61. # identified the final turn
  62. elif -1 != turnIndices[0] and True == self.PlannedStar.Route[i].FinalTurn:
  63. turnIndices[1] = i
  64. # skip waypoints until the final turn point is found
  65. elif -1 != turnIndices[0] and -1 == turnIndices[1]:
  66. continue
  67. trackmiles += start.haversine(self.PlannedStar.Route[i]) * 0.539957
  68. # check if a new constraint is defined
  69. altitude = -1
  70. speed = -1
  71. if None != self.PlannedStar.Route[i].Altitude:
  72. altitude = self.PlannedStar.Route[i].Altitude
  73. if None != self.PlannedStar.Route[i].Speed:
  74. speed = self.PlannedStar.Route[i].Speed
  75. if -1 != altitude or -1 != speed:
  76. constraints.append([ trackmiles, altitude, speed ])
  77. start = self.PlannedStar.Route[i]
  78. # add the remaining distance from the last waypoint to the runway threshold
  79. trackmiles += start.haversine(self.PlannedRunway.Runway.Start)
  80. if turnIndices[0] > turnIndices[1] or (-1 == turnIndices[1] and -1 != turnIndices[0]):
  81. sys.stderr.write('Invalid constraint definition found for ' + self.PlannedStar.Name)
  82. sys.exit(-1)
  83. # calculate descend profile
  84. currentHeading = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude).bearing(self.PlannedStar.Route[0])
  85. currentIAS = self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles)
  86. currentPosition = [ self.Report.dynamics.altitude, self.Report.dynamics.groundSpeed ]
  87. distanceToWaypoint = self.Report.distanceToIAF
  88. flightTimeSeconds = 0
  89. nextWaypointIndex = 0
  90. flownDistance = 0.0
  91. while True:
  92. # check if a constraint cleanup is needed and if a speed-update is needed
  93. if 0 != len(constraints) and flownDistance >= constraints[0][0]:
  94. if -1 != constraints[0][2]:
  95. currentIAS = min(constraints[0][2], self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles - flownDistance))
  96. currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
  97. constraints.pop(0)
  98. # search next altitude constraint
  99. altitudeDistance = 0
  100. nextAltitude = 0
  101. for constraint in constraints:
  102. if -1 != constraint[1]:
  103. altitudeDistance = constraint[0]
  104. nextAltitude = constraint[1]
  105. break
  106. # check if update of altitude and speed is needed on 3° glide
  107. if currentPosition[0] > nextAltitude and ((currentPosition[0] - nextAltitude) / 1000 * 3) > (altitudeDistance - flownDistance):
  108. oldGroundspeed = currentPosition[1]
  109. descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6
  110. newAltitude = currentPosition[0] - descendRate
  111. if 0 > newAltitude:
  112. newAltitude = 0
  113. currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) ]
  114. distance = (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6
  115. else:
  116. distance = currentPosition[1] / 60 / 6
  117. # update the statistics
  118. distanceToWaypoint -= distance
  119. flownDistance += distance
  120. newIAS = min(currentIAS, self.PerformanceData.ias(currentPosition[0], trackmiles - flownDistance))
  121. if newIAS < currentIAS:
  122. currentPosition[1] = min(weather.calculateGS(currentPosition[0], newIAS, currentHeading), currentPosition[1])
  123. currentIAS = newIAS
  124. flightTimeSeconds += 10
  125. if flownDistance >= trackmiles:
  126. break
  127. # check if we follow a new waypoint pair
  128. if 0 >= distanceToWaypoint:
  129. lastWaypointIndex = nextWaypointIndex
  130. nextWaypointIndex += 1
  131. # check if a skip from base to final turn waypoints is needed
  132. if -1 != turnIndices[0] and nextWaypointIndex > turnIndices[0] and nextWaypointIndex < turnIndices[1]:
  133. nextWaypointIndex = turnIndices[1]
  134. # update the statistics
  135. if nextWaypointIndex < len(self.PlannedStar.Route):
  136. distanceToWaypoint = self.PlannedStar.Route[lastWaypointIndex].haversine(self.PlannedStar.Route[nextWaypointIndex]) * 0.539957
  137. currentHeading = self.PlannedStar.Route[lastWaypointIndex].bearing(self.PlannedStar.Route[nextWaypointIndex])
  138. currentPosition[1] = min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1])
  139. return timedelta(seconds = flightTimeSeconds)