220 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			220 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| 
 | |
| import math
 | |
| import sys
 | |
| 
 | |
| from datetime import datetime, timedelta
 | |
| 
 | |
| from aman.config.Airport import Airport
 | |
| from aman.config.AirportSequencing import AirportSequencing
 | |
| from aman.formats.SctEseFormat import SctEseFormat
 | |
| from aman.sys.WeatherModel import WeatherModel
 | |
| from aman.types.ArrivalData import ArrivalData
 | |
| from aman.types.ArrivalRoute import ArrivalRoute
 | |
| from aman.types.ArrivalWaypoint import ArrivalWaypoint
 | |
| from aman.types.Runway import Runway
 | |
| from aman.types.Inbound import Inbound
 | |
| from aman.types.Waypoint import Waypoint
 | |
| 
 | |
| class Node:
 | |
|     def findArrivalRoute(iaf : str, runway : Runway, navData : SctEseFormat):
 | |
|         for arrivalRunway in navData.ArrivalRoutes:
 | |
|             if arrivalRunway == runway.Name:
 | |
|                 stars = navData.ArrivalRoutes[arrivalRunway]
 | |
|                 for star in stars:
 | |
|                     if 0 != len(star.Route) and iaf == star.Iaf.Name:
 | |
|                         return star
 | |
|         return None
 | |
| 
 | |
|     def updateArrivalWaypoint(self, arrivalRoute, flightTime, altitude, indicatedAirspeed, groundSpeed):
 | |
|         arrivalRoute[-1].FlightTime = timedelta(seconds = flightTime)
 | |
|         arrivalRoute[-1].ETA = self.PredictionTime + arrivalRoute[-1].FlightTime
 | |
|         arrivalRoute[-1].PTA = arrivalRoute[-1].ETA
 | |
|         arrivalRoute[-1].Altitude = altitude
 | |
|         arrivalRoute[-1].IndicatedAirspeed = indicatedAirspeed
 | |
|         arrivalRoute[-1].GroundSpeed = groundSpeed
 | |
| 
 | |
|     def arrivalEstimation(self, runway : Runway, star : ArrivalRoute, weather : WeatherModel):
 | |
|         # calculate remaining trackmiles
 | |
|         trackmiles = self.PredictedDistanceToIAF
 | |
|         start = star.Route[0]
 | |
|         turnIndices = [ -1, -1 ]
 | |
|         constraints = []
 | |
|         for i in range(0, len(star.Route)):
 | |
|             # identified the base turn
 | |
|             if True == star.Route[i].BaseTurn:
 | |
|                 turnIndices[0] = i
 | |
|             # identified the final turn
 | |
|             elif -1 != turnIndices[0] and True == star.Route[i].FinalTurn:
 | |
|                 turnIndices[1] = i
 | |
|             # skip waypoints until the final turn point is found
 | |
|             elif -1 != turnIndices[0] and -1 == turnIndices[1]:
 | |
|                 continue
 | |
| 
 | |
|             trackmiles += start.haversine(star.Route[i])
 | |
| 
 | |
|             # check if a new constraint is defined
 | |
|             altitude = -1
 | |
|             speed = -1
 | |
|             if None != star.Route[i].Altitude:
 | |
|                 altitude = star.Route[i].Altitude
 | |
|             if None != star.Route[i].Speed:
 | |
|                 speed = star.Route[i].Speed
 | |
|             if -1 != altitude or -1 != speed:
 | |
|                 constraints.append([ trackmiles, altitude, speed ])
 | |
| 
 | |
|             start = star.Route[i]
 | |
| 
 | |
|         # add the remaining distance from the last waypoint to the runway threshold
 | |
|         trackmiles += start.haversine(runway.Start)
 | |
| 
 | |
|         if turnIndices[0] > turnIndices[1] or (-1 == turnIndices[1] and -1 != turnIndices[0]):
 | |
|             sys.stderr.write('Invalid constraint definition found for ' + star.Name)
 | |
|             sys.exit(-1)
 | |
| 
 | |
|         # calculate descend profile
 | |
|         currentHeading = Waypoint(latitude = self.Inbound.Report.position.latitude, longitude = self.Inbound.Report.position.longitude).bearing(star.Route[0])
 | |
|         currentIAS = self.Inbound.PerformanceData.ias(self.Inbound.Report.dynamics.altitude, trackmiles)
 | |
|         currentPosition = [ self.Inbound.Report.dynamics.altitude, self.Inbound.Report.dynamics.groundSpeed ]
 | |
|         distanceToWaypoint = self.PredictedDistanceToIAF
 | |
|         flightTimeSeconds = 0
 | |
|         flightTimeOnStarSeconds = 0
 | |
|         nextWaypointIndex = 0
 | |
|         flownDistance = 0.0
 | |
|         arrivalRoute = [ ArrivalWaypoint(waypoint = star.Route[0], trackmiles = distanceToWaypoint) ]
 | |
| 
 | |
|         while True:
 | |
|             # check if a constraint cleanup is needed and if a speed-update is needed
 | |
|             if 0 != len(constraints) and flownDistance >= constraints[0][0]:
 | |
|                 if -1 != constraints[0][2]:
 | |
|                     currentIAS = min(constraints[0][2], self.Inbound.PerformanceData.ias(self.Inbound.Report.dynamics.altitude, trackmiles - flownDistance))
 | |
|                     currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
 | |
|                 constraints.pop(0)
 | |
| 
 | |
|             # search next altitude constraint
 | |
|             altitudeDistance = 0
 | |
|             nextAltitude = 0
 | |
|             for constraint in constraints:
 | |
|                 if -1 != constraint[1]:
 | |
|                     altitudeDistance = constraint[0]
 | |
|                     nextAltitude = constraint[1]
 | |
|                     break
 | |
| 
 | |
|             # check if update of altitude and speed is needed on 3° glide
 | |
|             if currentPosition[0] > nextAltitude and ((currentPosition[0] - nextAltitude) / 1000 * 3) > (altitudeDistance - flownDistance):
 | |
|                 oldGroundspeed = currentPosition[1]
 | |
|                 descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6
 | |
|                 newAltitude = currentPosition[0] - descendRate
 | |
|                 if 0 > newAltitude:
 | |
|                     newAltitude = 0
 | |
| 
 | |
|                 currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) ]
 | |
|                 distance = (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6
 | |
|             else:
 | |
|                 distance = currentPosition[1] / 60 / 6
 | |
| 
 | |
|             # update the statistics
 | |
|             distanceToWaypoint -= distance
 | |
|             flownDistance += distance
 | |
|             newIAS = min(currentIAS, self.Inbound.PerformanceData.ias(currentPosition[0], trackmiles - flownDistance))
 | |
|             if newIAS < currentIAS:
 | |
|                 currentPosition[1] = min(weather.calculateGS(currentPosition[0], newIAS, currentHeading), currentPosition[1])
 | |
|                 currentIAS = newIAS
 | |
| 
 | |
|             flightTimeSeconds += 10
 | |
|             if flownDistance >= self.PredictedDistanceToIAF:
 | |
|                 flightTimeOnStarSeconds += 10
 | |
|             if flownDistance >= trackmiles:
 | |
|                 if None == arrivalRoute[-1].FlightTime:
 | |
|                     self.updateArrivalWaypoint(arrivalRoute, flightTimeSeconds, currentPosition[0], currentIAS, currentPosition[1])
 | |
|                 break
 | |
| 
 | |
|             # check if we follow a new waypoint pair
 | |
|             if 0 >= distanceToWaypoint:
 | |
|                 lastWaypointIndex = nextWaypointIndex
 | |
|                 nextWaypointIndex += 1
 | |
| 
 | |
|                 self.updateArrivalWaypoint(arrivalRoute, flightTimeSeconds, currentPosition[0], currentIAS, currentPosition[1])
 | |
| 
 | |
|                 # check if a skip from base to final turn waypoints is needed
 | |
|                 if -1 != turnIndices[0] and nextWaypointIndex > turnIndices[0] and nextWaypointIndex < turnIndices[1]:
 | |
|                     nextWaypointIndex = turnIndices[1]
 | |
| 
 | |
|                 # update the statistics
 | |
|                 if nextWaypointIndex < len(star.Route):
 | |
|                     distanceToWaypoint = star.Route[lastWaypointIndex].haversine(star.Route[nextWaypointIndex])
 | |
|                     currentHeading = star.Route[lastWaypointIndex].bearing(star.Route[nextWaypointIndex])
 | |
|                     currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
 | |
| 
 | |
|                     arrivalRoute.append(ArrivalWaypoint(waypoint = star.Route[nextWaypointIndex], trackmiles = arrivalRoute[-1].Trackmiles + distanceToWaypoint))
 | |
| 
 | |
|         return timedelta(seconds = flightTimeSeconds), trackmiles, arrivalRoute, timedelta(seconds = flightTimeOnStarSeconds)
 | |
| 
 | |
|     def __init__(self, inbound : Inbound, referenceTime : datetime, weatherModel : WeatherModel,
 | |
|                  airportConfig : Airport, sequencingConfig : AirportSequencing):
 | |
|         self.PredictedDistanceToIAF = inbound.Report.distanceToIAF
 | |
|         self.PredictedCoordinate = [ inbound.CurrentPosition.latitude, inbound.CurrentPosition.longitude ]
 | |
|         self.PredictionTime = referenceTime
 | |
|         self.ArrivalCandidates = {}
 | |
|         self.Inbound = inbound
 | |
| 
 | |
|         if None == referenceTime or None == sequencingConfig:
 | |
|             return
 | |
| 
 | |
|         # predict the distance to IAF
 | |
|         timePrediction = (referenceTime - inbound.ReportTime).total_seconds()
 | |
|         if 0 != timePrediction and 0 != len(sequencingConfig.ActiveArrivalRunways):
 | |
|             # calculate current motion information
 | |
|             course = weatherModel.estimateCourse(inbound.Report.dynamics.altitude, inbound.Report.dynamics.groundSpeed, inbound.Report.dynamics.heading)
 | |
|             tempWaypoint = Waypoint(longitude = inbound.CurrentPosition.longitude, latitude = inbound.CurrentPosition.latitude)
 | |
|             gs = inbound.Report.dynamics.groundSpeed * 0.514444 # ground speed in m/s
 | |
|             distance = gs * timePrediction
 | |
|             prediction = tempWaypoint.project(course, distance)
 | |
| 
 | |
|             # calculate the bearing between the current position and the IAF
 | |
|             star = Node.findArrivalRoute(inbound.Report.initialApproachFix, sequencingConfig.ActiveArrivalRunways[0].Runway, airportConfig.GngData)
 | |
| 
 | |
|             # calculate the distance based on the flown distance and update the predicted distance
 | |
|             if None != star:
 | |
|                 bearing = Waypoint(longitude = prediction[1], latitude = prediction[0]).bearing(star.Route[0])
 | |
|                 correctedDistance = math.cos(abs(bearing - course)) * distance * 0.000539957
 | |
|                 self.PredictedDistanceToIAF -= correctedDistance
 | |
|                 if 0.0 > self.PredictedDistanceToIAF:
 | |
|                     self.PredictedDistanceToIAF = 0.0
 | |
| 
 | |
|                 self.PredictedCoordinate = prediction
 | |
| 
 | |
|         setInitialArrivalTime = None == self.Inbound.InitialArrivalTime
 | |
| 
 | |
|         # calculate the timings for the different arrival runways
 | |
|         for identifier in sequencingConfig.ActiveArrivalRunways:
 | |
|             star = Node.findArrivalRoute(self.Inbound.Report.initialApproachFix, identifier.Runway, airportConfig.GngData)
 | |
| 
 | |
|             if None != star:
 | |
|                 flightTime, trackmiles, arrivalRoute, flightTimeOnStar = self.arrivalEstimation(identifier.Runway, star, weatherModel)
 | |
| 
 | |
|                 # use the the distance to the IAF for optimizations
 | |
|                 timeUntilIAF = flightTime - flightTimeOnStar
 | |
|                 if 0.0 > timeUntilIAF.total_seconds():
 | |
|                     timeUntilIAF = timedelta(seconds = 0)
 | |
| 
 | |
|                 # the best TTL is the longest path with the slowest speed
 | |
|                 ttgMax = 60
 | |
|                 ttgRatio = 0.05
 | |
|                 if star.Name in airportConfig.OptimizationParameters:
 | |
|                     ttgMax = airportConfig.OptimizationParameters[star.Name][0]
 | |
|                     ttgRatio = airportConfig.OptimizationParameters[star.Name][1]
 | |
| 
 | |
|                 ttg = timedelta(seconds = timeUntilIAF.total_seconds() * ttgRatio)
 | |
|                 if (ttg.total_seconds() > ttgMax):
 | |
|                     ttg = timedelta(seconds = ttgMax)
 | |
|                 ita = self.Inbound.ReportTime + flightTime
 | |
|                 earliest = ita - ttg
 | |
| 
 | |
|                 self.ArrivalCandidates[identifier.Runway.Name] = ArrivalData(star = star, ita = earliest, route = arrivalRoute,
 | |
|                                                                              trackmiles = trackmiles)
 | |
| 
 | |
|                 ita = self.ArrivalCandidates[identifier.Runway.Name].InitialArrivalTime
 | |
|                 if True == setInitialArrivalTime and (None == self.Inbound.InitialArrivalTime or ita < self.Inbound.InitialArrivalTime):
 | |
|                     self.Inbound.InitialArrivalTime = ita
 |