Inbound.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. #!/usr/bin/env python
  2. import pytz
  3. import sys
  4. from datetime import datetime, timedelta
  5. from aman.com import AircraftReport_pb2
  6. from aman.config.AirportSequencing import AirportSequencing
  7. from aman.formats.SctEseFormat import SctEseFormat
  8. from aman.sys.WeatherModel import WeatherModel
  9. from aman.types.PerformanceData import PerformanceData
  10. from aman.types.ArrivalRoute import ArrivalRoute
  11. from aman.types.ArrivalData import ArrivalData
  12. from aman.types.Runway import Runway
  13. from aman.types.Waypoint import Waypoint
  14. class Inbound:
  15. def findArrivalRoute(self, runway : Runway, navData : SctEseFormat):
  16. for arrivalRunway in navData.ArrivalRoutes:
  17. if arrivalRunway == runway.Name:
  18. stars = navData.ArrivalRoutes[arrivalRunway]
  19. for star in stars:
  20. if 0 != len(star.Route) and self.Report.initialApproachFix == star.Iaf.Name:
  21. return star
  22. return None
  23. def __init__(self, report : AircraftReport_pb2.AircraftReport, sequencingConfig : AirportSequencing, navData : SctEseFormat,
  24. performanceData : PerformanceData, weatherModel : WeatherModel):
  25. self.Report = report
  26. self.Callsign = report.aircraft.callsign
  27. self.CurrentPosition = report.position
  28. self.ReportTime = datetime.strptime(report.reportTime + '+0000', '%Y%m%d%H%M%S%z').replace(tzinfo = pytz.UTC)
  29. self.InitialArrivalTime = None
  30. self.EarliestArrivalTime = None
  31. self.PlannedArrivalTime = None
  32. self.EstimatedStarEntryTime = None
  33. self.PlannedRunway = None
  34. self.PlannedStar = None
  35. self.ArrivalCandidates = {}
  36. self.WTC = None
  37. self.FixedSequence = False
  38. # analyze the WTC
  39. wtc = report.aircraft.wtc.upper()
  40. if 'L' == wtc or 'M' == wtc or 'H' == wtc or 'J' == wtc:
  41. self.WTC = wtc
  42. # search performance data -> fallback to A320
  43. if self.Report.aircraft.type in performanceData.Aircrafts:
  44. self.PerformanceData = performanceData.Aircrafts[self.Report.aircraft.type]
  45. if None == self.PerformanceData:
  46. self.PerformanceData = performanceData.Aircrafts['A320']
  47. # calculate the timings for the different arrival runways
  48. for identifier in sequencingConfig.ActiveArrivalRunways:
  49. star = self.findArrivalRoute(identifier.Runway, navData)
  50. if None != star:
  51. flightTime, flightTimeUntilIaf, trackmiles = self.arrivalEstimation(identifier.Runway, star, weatherModel)
  52. avgSpeed = trackmiles / (float(flightTime.seconds) / 3600.0)
  53. ttg = flightTime - timedelta(minutes = (trackmiles / (avgSpeed * 1.1)) * 60)
  54. ttl = timedelta(minutes = (trackmiles / (avgSpeed * 0.9)) * 60) - flightTime
  55. ita = self.ReportTime + flightTime
  56. earliest = ita - ttg
  57. latest = ita + ttl
  58. self.ArrivalCandidates[identifier.Runway.Name] = ArrivalData(ttg = ttg, star = star, ita = ita, earliest = earliest,
  59. entry = flightTimeUntilIaf, touchdown = flightTime,
  60. ttl = ttl, latest = latest)
  61. # calculate the first values for later plannings
  62. for candidate in self.ArrivalCandidates:
  63. if None == self.EarliestArrivalTime or self.ArrivalCandidates[candidate].EarliestArrivalTime < self.EarliestArrivalTime:
  64. self.InitialArrivalTime = self.ArrivalCandidates[candidate].InitialArrivalTime
  65. self.EarliestArrivalTime = self.ArrivalCandidates[candidate].EarliestArrivalTime
  66. self.EstimatedStarEntryTime = self.ReportTime + self.ArrivalCandidates[candidate].FlightTimeUntilIaf
  67. self.PlannedStar = self.ArrivalCandidates[candidate].Star
  68. if None != self.PlannedStar:
  69. for runway in navData.Runways[self.Report.destination.upper()]:
  70. if runway.Name == self.PlannedStar.Runway:
  71. self.PlannedRunway = runway
  72. break
  73. def arrivalEstimation(self, runway : Runway, star : ArrivalRoute, weather : WeatherModel):
  74. # calculate remaining trackmiles
  75. trackmiles = self.Report.distanceToIAF
  76. start = star.Route[0]
  77. turnIndices = [ -1, -1 ]
  78. constraints = []
  79. for i in range(0, len(star.Route)):
  80. # identified the base turn
  81. if True == star.Route[i].BaseTurn:
  82. turnIndices[0] = i
  83. # identified the final turn
  84. elif -1 != turnIndices[0] and True == star.Route[i].FinalTurn:
  85. turnIndices[1] = i
  86. # skip waypoints until the final turn point is found
  87. elif -1 != turnIndices[0] and -1 == turnIndices[1]:
  88. continue
  89. trackmiles += start.haversine(star.Route[i]) * 0.539957
  90. # check if a new constraint is defined
  91. altitude = -1
  92. speed = -1
  93. if None != star.Route[i].Altitude:
  94. altitude = star.Route[i].Altitude
  95. if None != star.Route[i].Speed:
  96. speed = star.Route[i].Speed
  97. if -1 != altitude or -1 != speed:
  98. constraints.append([ trackmiles, altitude, speed ])
  99. start = star.Route[i]
  100. # add the remaining distance from the last waypoint to the runway threshold
  101. trackmiles += start.haversine(runway.Start)
  102. if turnIndices[0] > turnIndices[1] or (-1 == turnIndices[1] and -1 != turnIndices[0]):
  103. sys.stderr.write('Invalid constraint definition found for ' + star.Name)
  104. sys.exit(-1)
  105. # calculate descend profile
  106. currentHeading = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude).bearing(star.Route[0])
  107. currentIAS = self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles)
  108. currentPosition = [ self.Report.dynamics.altitude, self.Report.dynamics.groundSpeed ]
  109. distanceToWaypoint = self.Report.distanceToIAF
  110. flightTimeUntilIafSeconds = 0
  111. flightTimeSeconds = 0
  112. nextWaypointIndex = 0
  113. flownDistance = 0.0
  114. while True:
  115. # check if a constraint cleanup is needed and if a speed-update is needed
  116. if 0 != len(constraints) and flownDistance >= constraints[0][0]:
  117. if -1 != constraints[0][2]:
  118. currentIAS = min(constraints[0][2], self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles - flownDistance))
  119. currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
  120. constraints.pop(0)
  121. # search next altitude constraint
  122. altitudeDistance = 0
  123. nextAltitude = 0
  124. for constraint in constraints:
  125. if -1 != constraint[1]:
  126. altitudeDistance = constraint[0]
  127. nextAltitude = constraint[1]
  128. break
  129. # check if update of altitude and speed is needed on 3° glide
  130. if currentPosition[0] > nextAltitude and ((currentPosition[0] - nextAltitude) / 1000 * 3) > (altitudeDistance - flownDistance):
  131. oldGroundspeed = currentPosition[1]
  132. descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6
  133. newAltitude = currentPosition[0] - descendRate
  134. if 0 > newAltitude:
  135. newAltitude = 0
  136. currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) ]
  137. distance = (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6
  138. else:
  139. distance = currentPosition[1] / 60 / 6
  140. # update the statistics
  141. distanceToWaypoint -= distance
  142. flownDistance += distance
  143. newIAS = min(currentIAS, self.PerformanceData.ias(currentPosition[0], trackmiles - flownDistance))
  144. if newIAS < currentIAS:
  145. currentPosition[1] = min(weather.calculateGS(currentPosition[0], newIAS, currentHeading), currentPosition[1])
  146. currentIAS = newIAS
  147. flightTimeSeconds += 10
  148. if flownDistance <= self.Report.distanceToIAF:
  149. flightTimeUntilIafSeconds += 10
  150. if flownDistance >= trackmiles:
  151. break
  152. # check if we follow a new waypoint pair
  153. if 0 >= distanceToWaypoint:
  154. lastWaypointIndex = nextWaypointIndex
  155. nextWaypointIndex += 1
  156. # check if a skip from base to final turn waypoints is needed
  157. if -1 != turnIndices[0] and nextWaypointIndex > turnIndices[0] and nextWaypointIndex < turnIndices[1]:
  158. nextWaypointIndex = turnIndices[1]
  159. # update the statistics
  160. if nextWaypointIndex < len(star.Route):
  161. distanceToWaypoint = star.Route[lastWaypointIndex].haversine(star.Route[nextWaypointIndex]) * 0.539957
  162. currentHeading = star.Route[lastWaypointIndex].bearing(star.Route[nextWaypointIndex])
  163. currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
  164. return timedelta(seconds = flightTimeSeconds), timedelta(seconds = flightTimeUntilIafSeconds), trackmiles