Node.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. #!/usr/bin/env python
  2. import math
  3. import sys
  4. from datetime import datetime, timedelta
  5. from aman.config.AirportSequencing import AirportSequencing
  6. from aman.formats.SctEseFormat import SctEseFormat
  7. from aman.sys.WeatherModel import WeatherModel
  8. from aman.types.ArrivalData import ArrivalData
  9. from aman.types.ArrivalRoute import ArrivalRoute
  10. from aman.types.ArrivalWaypoint import ArrivalWaypoint
  11. from aman.types.Runway import Runway
  12. from aman.types.Inbound import Inbound
  13. from aman.types.Waypoint import Waypoint
  14. class Node:
  15. def findArrivalRoute(iaf : str, runway : Runway, navData : SctEseFormat):
  16. for arrivalRunway in navData.ArrivalRoutes:
  17. if arrivalRunway == runway.Name:
  18. stars = navData.ArrivalRoutes[arrivalRunway]
  19. for star in stars:
  20. if 0 != len(star.Route) and iaf == star.Iaf.Name:
  21. return star
  22. return None
  23. def arrivalEstimation(self, runway : Runway, star : ArrivalRoute, weather : WeatherModel):
  24. # calculate remaining trackmiles
  25. trackmiles = self.PredictedDistanceToIAF
  26. start = star.Route[0]
  27. turnIndices = [ -1, -1 ]
  28. constraints = []
  29. for i in range(0, len(star.Route)):
  30. # identified the base turn
  31. if True == star.Route[i].BaseTurn:
  32. turnIndices[0] = i
  33. # identified the final turn
  34. elif -1 != turnIndices[0] and True == star.Route[i].FinalTurn:
  35. turnIndices[1] = i
  36. # skip waypoints until the final turn point is found
  37. elif -1 != turnIndices[0] and -1 == turnIndices[1]:
  38. continue
  39. trackmiles += start.haversine(star.Route[i])
  40. # check if a new constraint is defined
  41. altitude = -1
  42. speed = -1
  43. if None != star.Route[i].Altitude:
  44. altitude = star.Route[i].Altitude
  45. if None != star.Route[i].Speed:
  46. speed = star.Route[i].Speed
  47. if -1 != altitude or -1 != speed:
  48. constraints.append([ trackmiles, altitude, speed ])
  49. start = star.Route[i]
  50. # add the remaining distance from the last waypoint to the runway threshold
  51. trackmiles += start.haversine(runway.Start)
  52. if turnIndices[0] > turnIndices[1] or (-1 == turnIndices[1] and -1 != turnIndices[0]):
  53. sys.stderr.write('Invalid constraint definition found for ' + star.Name)
  54. sys.exit(-1)
  55. # calculate descend profile
  56. currentHeading = Waypoint(latitude = self.Inbound.Report.position.latitude, longitude = self.Inbound.Report.position.longitude).bearing(star.Route[0])
  57. currentIAS = self.Inbound.PerformanceData.ias(self.Inbound.Report.dynamics.altitude, trackmiles)
  58. currentPosition = [ self.Inbound.Report.dynamics.altitude, self.Inbound.Report.dynamics.groundSpeed ]
  59. distanceToWaypoint = self.PredictedDistanceToIAF
  60. flightTimeSeconds = 0
  61. nextWaypointIndex = 0
  62. flownDistance = 0.0
  63. arrivalRoute = [ ArrivalWaypoint(waypoint = star.Route[0], trackmiles = distanceToWaypoint) ]
  64. while True:
  65. # check if a constraint cleanup is needed and if a speed-update is needed
  66. if 0 != len(constraints) and flownDistance >= constraints[0][0]:
  67. if -1 != constraints[0][2]:
  68. currentIAS = min(constraints[0][2], self.Inbound.PerformanceData.ias(self.Inbound.Report.dynamics.altitude, trackmiles - flownDistance))
  69. currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
  70. constraints.pop(0)
  71. # search next altitude constraint
  72. altitudeDistance = 0
  73. nextAltitude = 0
  74. for constraint in constraints:
  75. if -1 != constraint[1]:
  76. altitudeDistance = constraint[0]
  77. nextAltitude = constraint[1]
  78. break
  79. # check if update of altitude and speed is needed on 3° glide
  80. if currentPosition[0] > nextAltitude and ((currentPosition[0] - nextAltitude) / 1000 * 3) > (altitudeDistance - flownDistance):
  81. oldGroundspeed = currentPosition[1]
  82. descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6
  83. newAltitude = currentPosition[0] - descendRate
  84. if 0 > newAltitude:
  85. newAltitude = 0
  86. currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) ]
  87. distance = (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6
  88. else:
  89. distance = currentPosition[1] / 60 / 6
  90. # update the statistics
  91. distanceToWaypoint -= distance
  92. flownDistance += distance
  93. newIAS = min(currentIAS, self.Inbound.PerformanceData.ias(currentPosition[0], trackmiles - flownDistance))
  94. if newIAS < currentIAS:
  95. currentPosition[1] = min(weather.calculateGS(currentPosition[0], newIAS, currentHeading), currentPosition[1])
  96. currentIAS = newIAS
  97. flightTimeSeconds += 10
  98. if flownDistance >= trackmiles:
  99. break
  100. # check if we follow a new waypoint pair
  101. if 0 >= distanceToWaypoint:
  102. lastWaypointIndex = nextWaypointIndex
  103. nextWaypointIndex += 1
  104. arrivalRoute[-1].FlightTime = timedelta(seconds = flightTimeSeconds)
  105. arrivalRoute[-1].ETA = self.Inbound.ReportTime + arrivalRoute[-1].FlightTime
  106. arrivalRoute[-1].PTA = arrivalRoute[-1].ETA
  107. arrivalRoute[-1].Altitude = currentPosition[0]
  108. arrivalRoute[-1].IndicatedAirspeed = currentIAS
  109. arrivalRoute[-1].GroundSpeed = currentPosition[1]
  110. # check if a skip from base to final turn waypoints is needed
  111. if -1 != turnIndices[0] and nextWaypointIndex > turnIndices[0] and nextWaypointIndex < turnIndices[1]:
  112. nextWaypointIndex = turnIndices[1]
  113. # update the statistics
  114. if nextWaypointIndex < len(star.Route):
  115. distanceToWaypoint = star.Route[lastWaypointIndex].haversine(star.Route[nextWaypointIndex])
  116. currentHeading = star.Route[lastWaypointIndex].bearing(star.Route[nextWaypointIndex])
  117. currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
  118. arrivalRoute.append(ArrivalWaypoint(waypoint = star.Route[nextWaypointIndex], trackmiles = arrivalRoute[-1].Trackmiles + distanceToWaypoint))
  119. return timedelta(seconds = flightTimeSeconds), trackmiles, arrivalRoute
  120. def __init__(self, inbound : Inbound, referenceTime : datetime, weatherModel : WeatherModel,
  121. navData : SctEseFormat, sequencingConfig : AirportSequencing):
  122. self.PredictedDistanceToIAF = inbound.Report.distanceToIAF
  123. self.ArrivalCandidates = {}
  124. self.Inbound = inbound
  125. # predict the distance to IAF
  126. timePrediction = (referenceTime - inbound.ReportTime).total_seconds()
  127. if 0 != timePrediction and 0 != len(sequencingConfig.ActiveArrivalRunways):
  128. # calculate current motion information
  129. course = weatherModel.estimateCourse(inbound.Report.dynamics.altitude, inbound.Report.dynamics.groundSpeed, inbound.Report.dynamics.heading)
  130. tempWaypoint = Waypoint(longitude = inbound.CurrentPosition.longitude, latitude = inbound.CurrentPosition.latitude)
  131. gs = inbound.Report.dynamics.groundSpeed * 0.514444 # ground speed in m/s
  132. distance = gs * timePrediction * 0.000539957 # distance back to nm
  133. # calculate the bearing between the current position and the IAF
  134. star = Node.findArrivalRoute(inbound.Report.initialApproachFix, sequencingConfig.ActiveArrivalRunways[0].Runway, navData)
  135. if None != star:
  136. bearing = tempWaypoint.bearing(star.Route[0])
  137. else:
  138. bearing = inbound.Report.dynamics.heading
  139. # calculate the distance based on the flown distance and update the predicted distance
  140. self.PredictedDistanceToIAF -= math.cos(math.radians(bearing - course)) * distance
  141. if 0.0 > self.PredictedDistanceToIAF:
  142. self.PredictedDistanceToIAF = 0.0
  143. # calculate the timings for the different arrival runways
  144. for identifier in sequencingConfig.ActiveArrivalRunways:
  145. star = Node.findArrivalRoute(self.Inbound.Report.initialApproachFix, identifier.Runway, navData)
  146. if None != star:
  147. flightTime, trackmiles, arrivalRoute = self.arrivalEstimation(identifier.Runway, star, weatherModel)
  148. avgSpeed = trackmiles / (float(flightTime.seconds) / 3600.0)
  149. # the closer we get to the IAF the less time delta can be achieved by short cuts, delay vectors or speeds
  150. ratio = min(2.0, max(0.0, self.PredictedDistanceToIAF / (trackmiles - self.PredictedDistanceToIAF)))
  151. possibleTimeDelta = (trackmiles / (avgSpeed * 0.9)) * 60
  152. ttg = timedelta(minutes = (possibleTimeDelta - flightTime.total_seconds() / 60) * ratio)
  153. ttl = timedelta(minutes = (possibleTimeDelta - flightTime.total_seconds() / 60))
  154. ita = self.Inbound.ReportTime + flightTime
  155. earliest = ita - ttg
  156. latest = ita + ttl
  157. self.ArrivalCandidates[identifier.Runway.Name] = ArrivalData(ttg = ttg, star = star, ita = ita, earliest = earliest,
  158. ttl = ttl, latest = latest, route = arrivalRoute,
  159. trackmiles = trackmiles)
  160. if None == self.Inbound.InitialArrivalTime:
  161. self.Inbound.InitialArrivalTime = self.ArrivalCandidates[identifier.Runway.Name].InitialArrivalTime