Inbound.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. #!/usr/bin/env python
  2. import pytz
  3. import sys
  4. from datetime import datetime, timedelta
  5. from aman.com import AircraftReport_pb2
  6. from aman.config.AirportSequencing import AirportSequencing
  7. from aman.formats.SctEseFormat import SctEseFormat
  8. from aman.sys.WeatherModel import WeatherModel
  9. from aman.types.PerformanceData import PerformanceData
  10. from aman.types.ArrivalRoute import ArrivalRoute
  11. from aman.types.ArrivalTime import ArrivalTime
  12. from aman.types.Runway import Runway
  13. from aman.types.Waypoint import Waypoint
  14. class Inbound:
  15. def findArrivalRoute(self, runway : Runway, navData : SctEseFormat):
  16. for arrivalRunway in navData.ArrivalRoutes:
  17. if arrivalRunway == runway.Name:
  18. stars = navData.ArrivalRoutes[arrivalRunway]
  19. for star in stars:
  20. if 0 != len(star.Route) and self.Report.initialApproachFix == star.Iaf.Name:
  21. return star
  22. return None
  23. def __init__(self, report : AircraftReport_pb2.AircraftReport, sequencingConfig : AirportSequencing, navData : SctEseFormat,
  24. performanceData : PerformanceData, weatherModel : WeatherModel):
  25. self.Report = report
  26. self.Callsign = report.aircraft.callsign
  27. self.CurrentPosition = report.position
  28. self.ReportTime = datetime.strptime(report.reportTime + '+0000', '%Y%m%d%H%M%S%z').replace(tzinfo = pytz.UTC)
  29. self.InitialArrivalTime = None
  30. self.EarliestArrivalTime = None
  31. self.PlannedArrivalTime = None
  32. self.EstimatedStarEntryTime = None
  33. self.PlannedRunway = None
  34. self.PlannedStar = None
  35. self.ArrivalCandidates = {}
  36. self.WTC = None
  37. # analyze the WTC
  38. wtc = report.aircraft.wtc.upper()
  39. if 'L' == wtc or 'M' == wtc or 'H' == wtc or 'J' == wtc:
  40. self.WTC = wtc
  41. # search performance data -> fallback to A320
  42. if self.Report.aircraft.type in performanceData.Aircrafts:
  43. self.PerformanceData = performanceData.Aircrafts[self.Report.aircraft.type]
  44. if None == self.PerformanceData:
  45. self.PerformanceData = performanceData.Aircrafts['A320']
  46. # calculate the timings for the different arrival runways
  47. for identifier in sequencingConfig.ActiveArrivalRunways:
  48. star = self.findArrivalRoute(identifier.Runway, navData)
  49. if None != star:
  50. flightTime, flightTimeUntilIaf, trackmiles = self.arrivalEstimation(identifier.Runway, star, weatherModel)
  51. avgSpeed = trackmiles / (float(flightTime.seconds) / 3600.0)
  52. ttg = flightTime - timedelta(minutes = (trackmiles / (avgSpeed * 1.1)) * 60)
  53. ttl = timedelta(minutes = (trackmiles / (avgSpeed * 0.9)) * 60) - flightTime
  54. ita = self.ReportTime + flightTime
  55. earliest = ita - ttg
  56. latest = ita + ttl
  57. self.ArrivalCandidates[identifier.Runway.Name] = ArrivalTime(ttg = ttg, star = star, ita = ita, earliest = earliest,
  58. entry = flightTimeUntilIaf, touchdown = flightTime,
  59. ttl = ttl, latest = latest)
  60. # calculate the first values for later plannings
  61. for candidate in self.ArrivalCandidates:
  62. if None == self.EarliestArrivalTime or self.ArrivalCandidates[candidate].EarliestArrivalTime < self.EarliestArrivalTime:
  63. self.InitialArrivalTime = self.ArrivalCandidates[candidate].InitialArrivalTime
  64. self.EarliestArrivalTime = self.ArrivalCandidates[candidate].EarliestArrivalTime
  65. self.EstimatedStarEntryTime = self.ReportTime + self.ArrivalCandidates[candidate].FlightTimeUntilIaf
  66. self.PlannedStar = self.ArrivalCandidates[candidate].Star
  67. if None != self.PlannedStar:
  68. for runway in navData.Runways[self.Report.destination.upper()]:
  69. if runway.Name == self.PlannedStar.Runway:
  70. self.PlannedRunway = runway
  71. break
  72. def arrivalEstimation(self, runway : Runway, star : ArrivalRoute, weather : WeatherModel):
  73. # calculate remaining trackmiles
  74. trackmiles = self.Report.distanceToIAF
  75. start = star.Route[0]
  76. turnIndices = [ -1, -1 ]
  77. constraints = []
  78. for i in range(0, len(star.Route)):
  79. # identified the base turn
  80. if True == star.Route[i].BaseTurn:
  81. turnIndices[0] = i
  82. # identified the final turn
  83. elif -1 != turnIndices[0] and True == star.Route[i].FinalTurn:
  84. turnIndices[1] = i
  85. # skip waypoints until the final turn point is found
  86. elif -1 != turnIndices[0] and -1 == turnIndices[1]:
  87. continue
  88. trackmiles += start.haversine(star.Route[i]) * 0.539957
  89. # check if a new constraint is defined
  90. altitude = -1
  91. speed = -1
  92. if None != star.Route[i].Altitude:
  93. altitude = star.Route[i].Altitude
  94. if None != star.Route[i].Speed:
  95. speed = star.Route[i].Speed
  96. if -1 != altitude or -1 != speed:
  97. constraints.append([ trackmiles, altitude, speed ])
  98. start = star.Route[i]
  99. # add the remaining distance from the last waypoint to the runway threshold
  100. trackmiles += start.haversine(runway.Start)
  101. if turnIndices[0] > turnIndices[1] or (-1 == turnIndices[1] and -1 != turnIndices[0]):
  102. sys.stderr.write('Invalid constraint definition found for ' + star.Name)
  103. sys.exit(-1)
  104. # calculate descend profile
  105. currentHeading = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude).bearing(star.Route[0])
  106. currentIAS = self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles)
  107. currentPosition = [ self.Report.dynamics.altitude, self.Report.dynamics.groundSpeed ]
  108. distanceToWaypoint = self.Report.distanceToIAF
  109. flightTimeUntilIafSeconds = 0
  110. flightTimeSeconds = 0
  111. nextWaypointIndex = 0
  112. flownDistance = 0.0
  113. while True:
  114. # check if a constraint cleanup is needed and if a speed-update is needed
  115. if 0 != len(constraints) and flownDistance >= constraints[0][0]:
  116. if -1 != constraints[0][2]:
  117. currentIAS = min(constraints[0][2], self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles - flownDistance))
  118. currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
  119. constraints.pop(0)
  120. # search next altitude constraint
  121. altitudeDistance = 0
  122. nextAltitude = 0
  123. for constraint in constraints:
  124. if -1 != constraint[1]:
  125. altitudeDistance = constraint[0]
  126. nextAltitude = constraint[1]
  127. break
  128. # check if update of altitude and speed is needed on 3° glide
  129. if currentPosition[0] > nextAltitude and ((currentPosition[0] - nextAltitude) / 1000 * 3) > (altitudeDistance - flownDistance):
  130. oldGroundspeed = currentPosition[1]
  131. descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6
  132. newAltitude = currentPosition[0] - descendRate
  133. if 0 > newAltitude:
  134. newAltitude = 0
  135. currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) ]
  136. distance = (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6
  137. else:
  138. distance = currentPosition[1] / 60 / 6
  139. # update the statistics
  140. distanceToWaypoint -= distance
  141. flownDistance += distance
  142. newIAS = min(currentIAS, self.PerformanceData.ias(currentPosition[0], trackmiles - flownDistance))
  143. if newIAS < currentIAS:
  144. currentPosition[1] = min(weather.calculateGS(currentPosition[0], newIAS, currentHeading), currentPosition[1])
  145. currentIAS = newIAS
  146. flightTimeSeconds += 10
  147. if flownDistance <= self.Report.distanceToIAF:
  148. flightTimeUntilIafSeconds += 10
  149. if flownDistance >= trackmiles:
  150. break
  151. # check if we follow a new waypoint pair
  152. if 0 >= distanceToWaypoint:
  153. lastWaypointIndex = nextWaypointIndex
  154. nextWaypointIndex += 1
  155. # check if a skip from base to final turn waypoints is needed
  156. if -1 != turnIndices[0] and nextWaypointIndex > turnIndices[0] and nextWaypointIndex < turnIndices[1]:
  157. nextWaypointIndex = turnIndices[1]
  158. # update the statistics
  159. if nextWaypointIndex < len(star.Route):
  160. distanceToWaypoint = star.Route[lastWaypointIndex].haversine(star.Route[nextWaypointIndex]) * 0.539957
  161. currentHeading = star.Route[lastWaypointIndex].bearing(star.Route[nextWaypointIndex])
  162. currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
  163. return timedelta(seconds = flightTimeSeconds), timedelta(seconds = flightTimeUntilIafSeconds), trackmiles