Inbound.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. #!/usr/bin/env python
  2. import pytz
  3. import sys
  4. from datetime import datetime, timedelta
  5. from aman.com import AircraftReport_pb2
  6. from aman.config.AirportSequencing import AirportSequencing
  7. from aman.formats.SctEseFormat import SctEseFormat
  8. from aman.sys.WeatherModel import WeatherModel
  9. from aman.types.ArrivalWaypoint import ArrivalWaypoint
  10. from aman.types.PerformanceData import PerformanceData
  11. from aman.types.ArrivalRoute import ArrivalRoute
  12. from aman.types.ArrivalData import ArrivalData
  13. from aman.types.Runway import Runway
  14. from aman.types.Waypoint import Waypoint
  15. class Inbound:
  16. def findArrivalRoute(self, runway : Runway, navData : SctEseFormat):
  17. for arrivalRunway in navData.ArrivalRoutes:
  18. if arrivalRunway == runway.Name:
  19. stars = navData.ArrivalRoutes[arrivalRunway]
  20. for star in stars:
  21. if 0 != len(star.Route) and self.Report.initialApproachFix == star.Iaf.Name:
  22. return star
  23. return None
  24. def __init__(self, report : AircraftReport_pb2.AircraftReport, sequencingConfig : AirportSequencing, navData : SctEseFormat,
  25. performanceData : PerformanceData, weatherModel : WeatherModel):
  26. self.Report = report
  27. self.Callsign = report.aircraft.callsign
  28. self.CurrentPosition = report.position
  29. self.ReportTime = datetime.strptime(report.reportTime + '+0000', '%Y%m%d%H%M%S%z').replace(tzinfo = pytz.UTC)
  30. self.InitialArrivalTime = None
  31. self.EarliestArrivalTime = None
  32. self.PlannedArrivalTime = None
  33. self.EstimatedStarEntryTime = None
  34. self.PlannedRunway = None
  35. self.PlannedStar = None
  36. self.ArrivalCandidates = {}
  37. self.WTC = None
  38. self.FixedSequence = False
  39. # analyze the WTC
  40. wtc = report.aircraft.wtc.upper()
  41. if 'L' == wtc or 'M' == wtc or 'H' == wtc or 'J' == wtc:
  42. self.WTC = wtc
  43. # search performance data -> fallback to A320
  44. if self.Report.aircraft.type in performanceData.Aircrafts:
  45. self.PerformanceData = performanceData.Aircrafts[self.Report.aircraft.type]
  46. if None == self.PerformanceData:
  47. self.PerformanceData = performanceData.Aircrafts['A320']
  48. # calculate the timings for the different arrival runways
  49. for identifier in sequencingConfig.ActiveArrivalRunways:
  50. star = self.findArrivalRoute(identifier.Runway, navData)
  51. if None != star:
  52. flightTime, flightTimeUntilIaf, trackmiles, arrivalRoute = self.arrivalEstimation(identifier.Runway, star, weatherModel)
  53. avgSpeed = trackmiles / (float(flightTime.seconds) / 3600.0)
  54. ttg = flightTime - timedelta(minutes = (trackmiles / (avgSpeed * 1.1)) * 60)
  55. ttl = timedelta(minutes = (trackmiles / (avgSpeed * 0.9)) * 60) - flightTime
  56. ita = self.ReportTime + flightTime
  57. earliest = ita - ttg
  58. latest = ita + ttl
  59. self.ArrivalCandidates[identifier.Runway.Name] = ArrivalData(ttg = ttg, star = star, ita = ita, earliest = earliest,
  60. entry = flightTimeUntilIaf, touchdown = flightTime,
  61. ttl = ttl, latest = latest)
  62. # calculate the first values for later plannings
  63. for candidate in self.ArrivalCandidates:
  64. if None == self.EarliestArrivalTime or self.ArrivalCandidates[candidate].EarliestArrivalTime < self.EarliestArrivalTime:
  65. self.InitialArrivalTime = self.ArrivalCandidates[candidate].InitialArrivalTime
  66. self.EarliestArrivalTime = self.ArrivalCandidates[candidate].EarliestArrivalTime
  67. self.EstimatedStarEntryTime = self.ReportTime + self.ArrivalCandidates[candidate].FlightTimeUntilIaf
  68. self.PlannedStar = self.ArrivalCandidates[candidate].Star
  69. if None != self.PlannedStar:
  70. for runway in navData.Runways[self.Report.destination.upper()]:
  71. if runway.Name == self.PlannedStar.Runway:
  72. self.PlannedRunway = runway
  73. break
  74. def arrivalEstimation(self, runway : Runway, star : ArrivalRoute, weather : WeatherModel):
  75. # calculate remaining trackmiles
  76. trackmiles = self.Report.distanceToIAF
  77. start = star.Route[0]
  78. turnIndices = [ -1, -1 ]
  79. constraints = []
  80. for i in range(0, len(star.Route)):
  81. # identified the base turn
  82. if True == star.Route[i].BaseTurn:
  83. turnIndices[0] = i
  84. # identified the final turn
  85. elif -1 != turnIndices[0] and True == star.Route[i].FinalTurn:
  86. turnIndices[1] = i
  87. # skip waypoints until the final turn point is found
  88. elif -1 != turnIndices[0] and -1 == turnIndices[1]:
  89. continue
  90. trackmiles += start.haversine(star.Route[i]) * 0.539957
  91. # check if a new constraint is defined
  92. altitude = -1
  93. speed = -1
  94. if None != star.Route[i].Altitude:
  95. altitude = star.Route[i].Altitude
  96. if None != star.Route[i].Speed:
  97. speed = star.Route[i].Speed
  98. if -1 != altitude or -1 != speed:
  99. constraints.append([ trackmiles, altitude, speed ])
  100. start = star.Route[i]
  101. # add the remaining distance from the last waypoint to the runway threshold
  102. trackmiles += start.haversine(runway.Start)
  103. if turnIndices[0] > turnIndices[1] or (-1 == turnIndices[1] and -1 != turnIndices[0]):
  104. sys.stderr.write('Invalid constraint definition found for ' + star.Name)
  105. sys.exit(-1)
  106. # calculate descend profile
  107. currentHeading = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude).bearing(star.Route[0])
  108. currentIAS = self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles)
  109. currentPosition = [ self.Report.dynamics.altitude, self.Report.dynamics.groundSpeed ]
  110. distanceToWaypoint = self.Report.distanceToIAF
  111. flightTimeUntilIafSeconds = 0
  112. flightTimeSeconds = 0
  113. nextWaypointIndex = 0
  114. flownDistance = 0.0
  115. arrivalRoute = [ ArrivalWaypoint(waypoint = star.Route[0], trackmiles = distanceToWaypoint) ]
  116. while True:
  117. # check if a constraint cleanup is needed and if a speed-update is needed
  118. if 0 != len(constraints) and flownDistance >= constraints[0][0]:
  119. if -1 != constraints[0][2]:
  120. currentIAS = min(constraints[0][2], self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles - flownDistance))
  121. currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
  122. constraints.pop(0)
  123. # search next altitude constraint
  124. altitudeDistance = 0
  125. nextAltitude = 0
  126. for constraint in constraints:
  127. if -1 != constraint[1]:
  128. altitudeDistance = constraint[0]
  129. nextAltitude = constraint[1]
  130. break
  131. # check if update of altitude and speed is needed on 3° glide
  132. if currentPosition[0] > nextAltitude and ((currentPosition[0] - nextAltitude) / 1000 * 3) > (altitudeDistance - flownDistance):
  133. oldGroundspeed = currentPosition[1]
  134. descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6
  135. newAltitude = currentPosition[0] - descendRate
  136. if 0 > newAltitude:
  137. newAltitude = 0
  138. currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) ]
  139. distance = (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6
  140. else:
  141. distance = currentPosition[1] / 60 / 6
  142. # update the statistics
  143. distanceToWaypoint -= distance
  144. flownDistance += distance
  145. newIAS = min(currentIAS, self.PerformanceData.ias(currentPosition[0], trackmiles - flownDistance))
  146. if newIAS < currentIAS:
  147. currentPosition[1] = min(weather.calculateGS(currentPosition[0], newIAS, currentHeading), currentPosition[1])
  148. currentIAS = newIAS
  149. flightTimeSeconds += 10
  150. if flownDistance <= self.Report.distanceToIAF:
  151. flightTimeUntilIafSeconds += 10
  152. if flownDistance >= trackmiles:
  153. break
  154. # check if we follow a new waypoint pair
  155. if 0 >= distanceToWaypoint:
  156. lastWaypointIndex = nextWaypointIndex
  157. nextWaypointIndex += 1
  158. arrivalRoute[-1].FlightTime = timedelta(seconds = flightTimeSeconds)
  159. arrivalRoute[-1].ETA = self.ReportTime + arrivalRoute[-1].FlightTime
  160. arrivalRoute[-1].PTA = arrivalRoute[-1].ETA
  161. # check if a skip from base to final turn waypoints is needed
  162. if -1 != turnIndices[0] and nextWaypointIndex > turnIndices[0] and nextWaypointIndex < turnIndices[1]:
  163. nextWaypointIndex = turnIndices[1]
  164. # update the statistics
  165. if nextWaypointIndex < len(star.Route):
  166. distanceToWaypoint = star.Route[lastWaypointIndex].haversine(star.Route[nextWaypointIndex]) * 0.539957
  167. currentHeading = star.Route[lastWaypointIndex].bearing(star.Route[nextWaypointIndex])
  168. currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
  169. arrivalRoute.append(ArrivalWaypoint(waypoint = star.Route[nextWaypointIndex], trackmiles = arrivalRoute[-1].Trackmiles + distanceToWaypoint))
  170. return timedelta(seconds = flightTimeSeconds), timedelta(seconds = flightTimeUntilIafSeconds), trackmiles, arrivalRoute