Inbound.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. #!/usr/bin/env python
  2. import json
  3. import pytz
  4. import sys
  5. from datetime import datetime, timedelta
  6. from aman.com import AircraftReport_pb2
  7. from aman.config.AirportSequencing import AirportSequencing
  8. from aman.formats.SctEseFormat import SctEseFormat
  9. from aman.sys.WeatherModel import WeatherModel
  10. from aman.types.PerformanceData import PerformanceData
  11. from aman.types.ArrivalRoute import ArrivalRoute
  12. from aman.types.ArrivalTime import ArrivalTime
  13. from aman.types.Runway import Runway
  14. from aman.types.Waypoint import Waypoint
  15. class Inbound:
  16. def findArrivalRoute(self, runway : Runway, navData : SctEseFormat):
  17. for arrivalRunway in navData.ArrivalRoutes:
  18. if arrivalRunway == runway.Name:
  19. stars = navData.ArrivalRoutes[arrivalRunway]
  20. for star in stars:
  21. if 0 != len(star.Route) and self.Report.initialApproachFix == star.Iaf.Name:
  22. return star
  23. return None
  24. def __init__(self, report : AircraftReport_pb2.AircraftReport, sequencingConfig : AirportSequencing, navData : SctEseFormat,
  25. performanceData : PerformanceData, weatherModel : WeatherModel):
  26. self.Report = report
  27. self.Callsign = report.aircraft.callsign
  28. self.CurrentPosition = report.position
  29. self.ReportTime = datetime.strptime(report.reportTime + '+0000', '%Y%m%d%H%M%S%z').replace(tzinfo = pytz.UTC)
  30. self.InitialArrivalTime = None
  31. self.EarliestArrivalTime = None
  32. self.PlannedArrivalTime = None
  33. self.EstimatedStarEntryTime = None
  34. self.PlannedRunway = None
  35. self.PlannedStar = None
  36. self.ArrivalCandidates = {}
  37. self.WTC = None
  38. # analyze the WTC
  39. wtc = report.aircraft.wtc.upper()
  40. if 'L' == wtc or 'M' == wtc or 'H' == wtc or 'J' == wtc:
  41. self.WTC = wtc
  42. # search performance data -> fallback to A320
  43. if self.Report.aircraft.type in performanceData.Aircrafts:
  44. self.PerformanceData = performanceData.Aircrafts[self.Report.aircraft.type]
  45. if None == self.PerformanceData:
  46. self.PerformanceData = performanceData.Aircrafts['A320']
  47. # calculate the timings for the different arrival runways
  48. for identifier in sequencingConfig.ActiveArrivalRunways:
  49. star = self.findArrivalRoute(identifier.Runway, navData)
  50. if None != star:
  51. flightTime, flightTimeUntilIaf, trackmiles = self.arrivalEstimation(identifier.Runway, star, weatherModel)
  52. avgSpeed = trackmiles / (float(flightTime.seconds) / 3600.0)
  53. ttg = flightTime - timedelta(minutes = (trackmiles / (avgSpeed * 1.1)) * 60)
  54. ttl = timedelta(minutes = (trackmiles / (avgSpeed * 0.9)) * 60) - flightTime
  55. ita = self.ReportTime + flightTime
  56. earliest = ita - ttg
  57. latest = ita + ttl
  58. self.ArrivalCandidates[identifier.Runway.Name] = ArrivalTime(ttg = ttg, star = star, ita = ita, earliest = earliest,
  59. entry = flightTimeUntilIaf, touchdown = flightTime,
  60. ttl = ttl, latest = latest)
  61. # calculate the first values for later plannings
  62. for candidate in self.ArrivalCandidates:
  63. if None == self.EarliestArrivalTime or self.ArrivalCandidates[candidate].EarliestArrivalTime < self.EarliestArrivalTime:
  64. self.InitialArrivalTime = self.ArrivalCandidates[candidate].InitialArrivalTime
  65. self.EarliestArrivalTime = self.ArrivalCandidates[candidate].EarliestArrivalTime
  66. self.EstimatedStarEntryTime = self.ReportTime + self.ArrivalCandidates[candidate].FlightTimeUntilIaf
  67. self.PlannedStar = self.ArrivalCandidates[candidate].Star
  68. if None != self.PlannedStar:
  69. for runway in navData.Runways[self.Report.destination.upper()]:
  70. if runway.Name == self.PlannedStar.Runway:
  71. self.PlannedRunway = runway
  72. break
  73. def toJSON(self):
  74. pta = str(self.PlannedArrivalTime)
  75. delimiter = pta.find('.')
  76. if -1 == delimiter:
  77. delimiter = pta.find('+')
  78. return json.dumps({ 'callsign' : self.Callsign, 'runway' : self.PlannedRunway.Name, 'pta' : pta[0:delimiter] }, ensure_ascii=True)
  79. def arrivalEstimation(self, runway : Runway, star : ArrivalRoute, weather : WeatherModel):
  80. # calculate remaining trackmiles
  81. trackmiles = self.Report.distanceToIAF
  82. start = star.Route[0]
  83. turnIndices = [ -1, -1 ]
  84. constraints = []
  85. for i in range(0, len(star.Route)):
  86. # identified the base turn
  87. if True == star.Route[i].BaseTurn:
  88. turnIndices[0] = i
  89. # identified the final turn
  90. elif -1 != turnIndices[0] and True == star.Route[i].FinalTurn:
  91. turnIndices[1] = i
  92. # skip waypoints until the final turn point is found
  93. elif -1 != turnIndices[0] and -1 == turnIndices[1]:
  94. continue
  95. trackmiles += start.haversine(star.Route[i]) * 0.539957
  96. # check if a new constraint is defined
  97. altitude = -1
  98. speed = -1
  99. if None != star.Route[i].Altitude:
  100. altitude = star.Route[i].Altitude
  101. if None != star.Route[i].Speed:
  102. speed = star.Route[i].Speed
  103. if -1 != altitude or -1 != speed:
  104. constraints.append([ trackmiles, altitude, speed ])
  105. start = star.Route[i]
  106. # add the remaining distance from the last waypoint to the runway threshold
  107. trackmiles += start.haversine(runway.Start)
  108. if turnIndices[0] > turnIndices[1] or (-1 == turnIndices[1] and -1 != turnIndices[0]):
  109. sys.stderr.write('Invalid constraint definition found for ' + star.Name)
  110. sys.exit(-1)
  111. # calculate descend profile
  112. currentHeading = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude).bearing(star.Route[0])
  113. currentIAS = self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles)
  114. currentPosition = [ self.Report.dynamics.altitude, self.Report.dynamics.groundSpeed ]
  115. distanceToWaypoint = self.Report.distanceToIAF
  116. flightTimeUntilIafSeconds = 0
  117. flightTimeSeconds = 0
  118. nextWaypointIndex = 0
  119. flownDistance = 0.0
  120. while True:
  121. # check if a constraint cleanup is needed and if a speed-update is needed
  122. if 0 != len(constraints) and flownDistance >= constraints[0][0]:
  123. if -1 != constraints[0][2]:
  124. currentIAS = min(constraints[0][2], self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles - flownDistance))
  125. currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
  126. constraints.pop(0)
  127. # search next altitude constraint
  128. altitudeDistance = 0
  129. nextAltitude = 0
  130. for constraint in constraints:
  131. if -1 != constraint[1]:
  132. altitudeDistance = constraint[0]
  133. nextAltitude = constraint[1]
  134. break
  135. # check if update of altitude and speed is needed on 3° glide
  136. if currentPosition[0] > nextAltitude and ((currentPosition[0] - nextAltitude) / 1000 * 3) > (altitudeDistance - flownDistance):
  137. oldGroundspeed = currentPosition[1]
  138. descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6
  139. newAltitude = currentPosition[0] - descendRate
  140. if 0 > newAltitude:
  141. newAltitude = 0
  142. currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) ]
  143. distance = (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6
  144. else:
  145. distance = currentPosition[1] / 60 / 6
  146. # update the statistics
  147. distanceToWaypoint -= distance
  148. flownDistance += distance
  149. newIAS = min(currentIAS, self.PerformanceData.ias(currentPosition[0], trackmiles - flownDistance))
  150. if newIAS < currentIAS:
  151. currentPosition[1] = min(weather.calculateGS(currentPosition[0], newIAS, currentHeading), currentPosition[1])
  152. currentIAS = newIAS
  153. flightTimeSeconds += 10
  154. if flownDistance <= self.Report.distanceToIAF:
  155. flightTimeUntilIafSeconds += 10
  156. if flownDistance >= trackmiles:
  157. break
  158. # check if we follow a new waypoint pair
  159. if 0 >= distanceToWaypoint:
  160. lastWaypointIndex = nextWaypointIndex
  161. nextWaypointIndex += 1
  162. # check if a skip from base to final turn waypoints is needed
  163. if -1 != turnIndices[0] and nextWaypointIndex > turnIndices[0] and nextWaypointIndex < turnIndices[1]:
  164. nextWaypointIndex = turnIndices[1]
  165. # update the statistics
  166. if nextWaypointIndex < len(star.Route):
  167. distanceToWaypoint = star.Route[lastWaypointIndex].haversine(star.Route[nextWaypointIndex]) * 0.539957
  168. currentHeading = star.Route[lastWaypointIndex].bearing(star.Route[nextWaypointIndex])
  169. currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
  170. return timedelta(seconds = flightTimeSeconds), timedelta(seconds = flightTimeUntilIafSeconds), trackmiles