Inbound.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. #!/usr/bin/env python
  2. import pytz
  3. import sys
  4. from datetime import datetime, timedelta
  5. from aman.com import AircraftReport_pb2
  6. from aman.config.AirportSequencing import AirportSequencing
  7. from aman.formats.SctEseFormat import SctEseFormat
  8. from aman.sys.WeatherModel import WeatherModel
  9. from aman.types.PerformanceData import PerformanceData
  10. from aman.types.ArrivalRoute import ArrivalRoute
  11. from aman.types.ArrivalTime import ArrivalTime
  12. from aman.types.Runway import Runway
  13. from aman.types.Waypoint import Waypoint
  14. class Inbound:
  15. def findArrivalRoute(self, runway : Runway, navData : SctEseFormat):
  16. for arrivalRunway in navData.ArrivalRoutes:
  17. if arrivalRunway == runway.Name:
  18. stars = navData.ArrivalRoutes[arrivalRunway]
  19. for star in stars:
  20. if 0 != len(star.Route) and self.Report.initialApproachFix == star.Iaf.Name:
  21. return star
  22. return None
  23. def __init__(self, report : AircraftReport_pb2.AircraftReport, sequencingConfig : AirportSequencing, navData : SctEseFormat,
  24. performanceData : PerformanceData, weatherModel : WeatherModel):
  25. self.Report = report
  26. self.Callsign = report.aircraft.callsign
  27. self.CurrentPosition = report.position
  28. self.ReportTime = datetime.strptime(report.reportTime + '+0000', '%Y%m%d%H%M%S%z').replace(tzinfo = pytz.UTC)
  29. self.InitialArrivalTime = None
  30. self.EarliestArrivalTime = None
  31. self.PlannedArrivalTime = None
  32. self.EstimatedStarEntryTime = None
  33. self.PlannedRunway = None
  34. self.PlannedStar = None
  35. self.ArrivalCandidates = {}
  36. self.WTC = None
  37. # analyze the WTC
  38. wtc = report.aircraft.wtc.upper()
  39. if 'L' == wtc or 'M' == wtc or 'H' == wtc or 'J' == wtc:
  40. self.WTC = wtc
  41. # search performance data -> fallback to A320
  42. if self.Report.aircraft.type in performanceData.Aircrafts:
  43. self.PerformanceData = performanceData.Aircrafts[self.Report.aircraft.type]
  44. if None == self.PerformanceData:
  45. self.PerformanceData = performanceData.Aircrafts['A320']
  46. # calculate the timings for the different arrival runways
  47. for identifier in sequencingConfig.ActiveArrivalRunways:
  48. star = self.findArrivalRoute(identifier.Runway, navData)
  49. if None != star:
  50. flightTime, flightTimeUntilIaf, trackmiles = self.arrivalEstimation(identifier.Runway, star, weatherModel)
  51. avgSpeed = trackmiles / (float(flightTime.seconds) / 3600.0)
  52. ttg = flightTime - timedelta(minutes = (trackmiles / (avgSpeed * 1.1)) * 60)
  53. ita = self.ReportTime + flightTime
  54. earliest = ita - ttg
  55. self.ArrivalCandidates[identifier.Runway.Name] = ArrivalTime(ttg = ttg, star = star, ita = ita, earliest = earliest,
  56. entry = flightTimeUntilIaf, touchdown = flightTime)
  57. # calculate the first values for later plannings
  58. for candidate in self.ArrivalCandidates:
  59. if None == self.EarliestArrivalTime or self.ArrivalCandidates[candidate].EarliestArrivalTime < self.EarliestArrivalTime:
  60. self.InitialArrivalTime = self.ArrivalCandidates[candidate].InitialArrivalTime
  61. self.EarliestArrivalTime = self.ArrivalCandidates[candidate].EarliestArrivalTime
  62. self.EstimatedStarEntryTime = self.ReportTime + self.ArrivalCandidates[candidate].FlightTimeUntilIaf
  63. self.PlannedStar = self.ArrivalCandidates[candidate].Star
  64. if None != self.PlannedStar:
  65. for runway in navData.Runways[self.Report.destination.upper()]:
  66. if runway.Name == self.PlannedStar.Runway:
  67. self.PlannedRunway = runway
  68. break
  69. def arrivalEstimation(self, runway : Runway, star : ArrivalRoute, weather : WeatherModel):
  70. # calculate remaining trackmiles
  71. trackmiles = self.Report.distanceToIAF
  72. start = star.Route[0]
  73. turnIndices = [ -1, -1 ]
  74. constraints = []
  75. for i in range(0, len(star.Route)):
  76. # identified the base turn
  77. if True == star.Route[i].BaseTurn:
  78. turnIndices[0] = i
  79. # identified the final turn
  80. elif -1 != turnIndices[0] and True == star.Route[i].FinalTurn:
  81. turnIndices[1] = i
  82. # skip waypoints until the final turn point is found
  83. elif -1 != turnIndices[0] and -1 == turnIndices[1]:
  84. continue
  85. trackmiles += start.haversine(star.Route[i]) * 0.539957
  86. # check if a new constraint is defined
  87. altitude = -1
  88. speed = -1
  89. if None != star.Route[i].Altitude:
  90. altitude = star.Route[i].Altitude
  91. if None != star.Route[i].Speed:
  92. speed = star.Route[i].Speed
  93. if -1 != altitude or -1 != speed:
  94. constraints.append([ trackmiles, altitude, speed ])
  95. start = star.Route[i]
  96. # add the remaining distance from the last waypoint to the runway threshold
  97. trackmiles += start.haversine(runway.Start)
  98. if turnIndices[0] > turnIndices[1] or (-1 == turnIndices[1] and -1 != turnIndices[0]):
  99. sys.stderr.write('Invalid constraint definition found for ' + star.Name)
  100. sys.exit(-1)
  101. # calculate descend profile
  102. currentHeading = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude).bearing(star.Route[0])
  103. currentIAS = self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles)
  104. currentPosition = [ self.Report.dynamics.altitude, self.Report.dynamics.groundSpeed ]
  105. distanceToWaypoint = self.Report.distanceToIAF
  106. flightTimeUntilIafSeconds = 0
  107. flightTimeSeconds = 0
  108. nextWaypointIndex = 0
  109. flownDistance = 0.0
  110. while True:
  111. # check if a constraint cleanup is needed and if a speed-update is needed
  112. if 0 != len(constraints) and flownDistance >= constraints[0][0]:
  113. if -1 != constraints[0][2]:
  114. currentIAS = min(constraints[0][2], self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles - flownDistance))
  115. currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
  116. constraints.pop(0)
  117. # search next altitude constraint
  118. altitudeDistance = 0
  119. nextAltitude = 0
  120. for constraint in constraints:
  121. if -1 != constraint[1]:
  122. altitudeDistance = constraint[0]
  123. nextAltitude = constraint[1]
  124. break
  125. # check if update of altitude and speed is needed on 3° glide
  126. if currentPosition[0] > nextAltitude and ((currentPosition[0] - nextAltitude) / 1000 * 3) > (altitudeDistance - flownDistance):
  127. oldGroundspeed = currentPosition[1]
  128. descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6
  129. newAltitude = currentPosition[0] - descendRate
  130. if 0 > newAltitude:
  131. newAltitude = 0
  132. currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) ]
  133. distance = (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6
  134. else:
  135. distance = currentPosition[1] / 60 / 6
  136. # update the statistics
  137. distanceToWaypoint -= distance
  138. flownDistance += distance
  139. newIAS = min(currentIAS, self.PerformanceData.ias(currentPosition[0], trackmiles - flownDistance))
  140. if newIAS < currentIAS:
  141. currentPosition[1] = min(weather.calculateGS(currentPosition[0], newIAS, currentHeading), currentPosition[1])
  142. currentIAS = newIAS
  143. flightTimeSeconds += 10
  144. if flownDistance <= self.Report.distanceToIAF:
  145. flightTimeUntilIafSeconds += 10
  146. if flownDistance >= trackmiles:
  147. break
  148. # check if we follow a new waypoint pair
  149. if 0 >= distanceToWaypoint:
  150. lastWaypointIndex = nextWaypointIndex
  151. nextWaypointIndex += 1
  152. # check if a skip from base to final turn waypoints is needed
  153. if -1 != turnIndices[0] and nextWaypointIndex > turnIndices[0] and nextWaypointIndex < turnIndices[1]:
  154. nextWaypointIndex = turnIndices[1]
  155. # update the statistics
  156. if nextWaypointIndex < len(star.Route):
  157. distanceToWaypoint = star.Route[lastWaypointIndex].haversine(star.Route[nextWaypointIndex]) * 0.539957
  158. currentHeading = star.Route[lastWaypointIndex].bearing(star.Route[nextWaypointIndex])
  159. currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
  160. return timedelta(seconds = flightTimeSeconds), timedelta(seconds = flightTimeUntilIafSeconds), trackmiles