112 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			112 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| 
 | |
| import pytz
 | |
| 
 | |
| from datetime import datetime, timedelta
 | |
| 
 | |
| from aman.com import AircraftReport_pb2
 | |
| from aman.config.AirportSequencing import AirportSequencing
 | |
| from aman.formats.SctEseFormat import SctEseFormat
 | |
| from aman.sys.WeatherModel import WeatherModel
 | |
| from aman.types.PerformanceData import PerformanceData
 | |
| from aman.types.Waypoint import Waypoint
 | |
| 
 | |
| class Inbound:
 | |
|     def __init__(self, report : AircraftReport_pb2.AircraftReport, sequencingConfig : AirportSequencing, navData : SctEseFormat,
 | |
|                  performanceData : PerformanceData, weatherModel : WeatherModel):
 | |
|         self.Report = report
 | |
|         self.CurrentPosition = report.position
 | |
|         self.ReportTime = datetime.strptime(report.reportTime + '+0000', '%Y%m%d%H%M%S%z').replace(tzinfo = pytz.UTC)
 | |
| 
 | |
|         # search performance data -> fallback to A320
 | |
|         if self.Report.aircraft.type in performanceData.Aircrafts:
 | |
|             self.PerformanceData = performanceData.Aircrafts[self.Report.aircraft.type]
 | |
|         if None == self.PerformanceData:
 | |
|             self.PerformanceData = performanceData.Aircrafts['A320']
 | |
| 
 | |
|         self.findArrivalRunway(sequencingConfig)
 | |
|         self.findArrivalRoute(navData)
 | |
| 
 | |
|         duration = self.secondsUntilTouchdown(weatherModel)
 | |
|         self.InitialArrivalTime = self.ReportTime + duration
 | |
|         self.EstimatedArrivalTime = self.InitialArrivalTime
 | |
|         self.EstimatedStarEntryTime = None
 | |
| 
 | |
|     def findArrivalRunway(self, sequencingConfig : AirportSequencing):
 | |
|         self.PlannedRunway = None
 | |
| 
 | |
|         # find the nearest runway for an initial guess
 | |
|         distance = 100000.0
 | |
|         currentPosition = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude)
 | |
|         for runway in sequencingConfig.ActiveArrivalRunways:
 | |
|             candidateDistance = runway.Runway.Start.haversine(currentPosition)
 | |
|             if distance > candidateDistance:
 | |
|                 self.PlannedRunway = runway
 | |
|                 distance = candidateDistance
 | |
| 
 | |
|     def findArrivalRoute(self, navData : SctEseFormat):
 | |
|         self.PlannedStar = None
 | |
|         if None == self.PlannedRunway:
 | |
|             return
 | |
| 
 | |
|         for arrivalRunway in navData.ArrivalRoutes:
 | |
|             if arrivalRunway == self.PlannedRunway.Runway.Name:
 | |
|                 stars = navData.ArrivalRoutes[arrivalRunway]
 | |
|                 for star in stars:
 | |
|                     if 0 != len(star.Route) and self.Report.initialApproachFix == star.Iaf.Name:
 | |
|                         self.PlannedStar = star
 | |
|                         return
 | |
| 
 | |
|     def secondsUntilTouchdown(self, weather : WeatherModel):
 | |
|         if None == self.PlannedRunway or None == self.PlannedStar:
 | |
|             return timedelta(seconds = 0)
 | |
| 
 | |
|         # calculate remaining trackmiles
 | |
|         remainingDistanceNM = self.Report.distanceToIAF
 | |
|         start = self.PlannedStar.Route[0]
 | |
|         for i in range(1, len(self.PlannedStar.Route)):
 | |
|             remainingDistanceNM += start.haversine(self.PlannedStar.Route[i]) * 0.539957
 | |
|             start = self.PlannedStar.Route[i]
 | |
| 
 | |
|         # calculate descend profile
 | |
|         flightTimeSeconds = 0
 | |
|         currentHeading = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude).bearing(self.PlannedStar.Route[0])
 | |
|         distanceToWaypoint = self.Report.distanceToIAF
 | |
|         nextWaypointIndex = 0
 | |
|         currentPosition = [ self.Report.dynamics.altitude, self.Report.dynamics.groundSpeed ]
 | |
|         while 0 < currentPosition[0]:
 | |
|             lastDistance = remainingDistanceNM
 | |
| 
 | |
|             # TODO integrate speed and altitude constraints
 | |
| 
 | |
|             # calculate the next position after 10 seconds
 | |
|             if (currentPosition[0] / 1000 * 3) > remainingDistanceNM:
 | |
|                 oldGroundspeed = currentPosition[1]
 | |
|                 descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6
 | |
|                 newAltitude = currentPosition[0] - descendRate
 | |
|                 if 0 > newAltitude:
 | |
|                     newAltitude = 0
 | |
| 
 | |
|                 # get the planned IAS and calculate the new altitude and GS out of the predicted information
 | |
|                 # we assume that the aircraft only decelerates
 | |
|                 ias = self.PerformanceData.ias(newAltitude, remainingDistanceNM)
 | |
|                 currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, int(ias), currentHeading), currentPosition[1]) ]
 | |
| 
 | |
|                 # use the average between last and current speed to estimate the remaining distance
 | |
|                 remainingDistanceNM -= (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6
 | |
|             else:
 | |
|                 remainingDistanceNM -= currentPosition[1] / 60 / 6
 | |
| 
 | |
|             flightTimeSeconds += 10
 | |
|             if 0 > remainingDistanceNM:
 | |
|                 break
 | |
| 
 | |
|             # check if we follow a new waypoint pair
 | |
|             distanceToWaypoint -= abs(lastDistance - remainingDistanceNM)
 | |
|             if 0 >= distanceToWaypoint:
 | |
|                 nextWaypointIndex += 1
 | |
|                 if nextWaypointIndex < len(self.PlannedStar.Route):
 | |
|                     currentHeading = self.PlannedStar.Route[nextWaypointIndex - 1].bearing(self.PlannedStar.Route[nextWaypointIndex])
 | |
| 
 | |
|         return timedelta(seconds = flightTimeSeconds)
 |