#!/usr/bin/env python import pytz import sys from datetime import datetime, timedelta from aman.com import AircraftReport_pb2 from aman.config.AirportSequencing import AirportSequencing from aman.formats.SctEseFormat import SctEseFormat from aman.sys.WeatherModel import WeatherModel from aman.types.PerformanceData import PerformanceData from aman.types.Waypoint import Waypoint class Inbound: def __init__(self, report : AircraftReport_pb2.AircraftReport, sequencingConfig : AirportSequencing, navData : SctEseFormat, performanceData : PerformanceData, weatherModel : WeatherModel): self.Report = report self.CurrentPosition = report.position self.ReportTime = datetime.strptime(report.reportTime + '+0000', '%Y%m%d%H%M%S%z').replace(tzinfo = pytz.UTC) # search performance data -> fallback to A320 if self.Report.aircraft.type in performanceData.Aircrafts: self.PerformanceData = performanceData.Aircrafts[self.Report.aircraft.type] if None == self.PerformanceData: self.PerformanceData = performanceData.Aircrafts['A320'] self.findArrivalRunway(sequencingConfig) self.findArrivalRoute(navData) flightTime, flightTimeUntilIaf, trackmiles = self.secondsUntilTouchdown(weatherModel) # calculate the maximum time to gain (assumption: 10% speed increase by acceleration and shortcuts) avgSpeed = self.Report.distanceToIAF / (float(flightTimeUntilIaf.seconds) / 3600.0) self.MaximumTimeToGain = flightTimeUntilIaf - timedelta(minutes = (self.Report.distanceToIAF / (avgSpeed * 1.1)) * 60) avgSpeed = trackmiles / (float(flightTime.seconds) / 3600.0) self.MaximumTimeToGain += flightTime - timedelta(minutes = (trackmiles / (avgSpeed * 1.1)) * 60) # calculate the different arrival times self.InitialArrivalTime = self.ReportTime + flightTime self.EarliestArrivalTime = self.InitialArrivalTime - self.MaximumTimeToGain self.EstimatedArrivalTime = self.InitialArrivalTime self.EstimatedStarEntryTime = None def findArrivalRunway(self, sequencingConfig : AirportSequencing): self.PlannedRunway = None # find the nearest runway for an initial guess distance = 100000.0 currentPosition = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude) for runway in sequencingConfig.ActiveArrivalRunways: candidateDistance = runway.Runway.Start.haversine(currentPosition) if distance > candidateDistance: self.PlannedRunway = runway distance = candidateDistance def findArrivalRoute(self, navData : SctEseFormat): self.PlannedStar = None if None == self.PlannedRunway: return for arrivalRunway in navData.ArrivalRoutes: if arrivalRunway == self.PlannedRunway.Runway.Name: stars = navData.ArrivalRoutes[arrivalRunway] for star in stars: if 0 != len(star.Route) and self.Report.initialApproachFix == star.Iaf.Name: self.PlannedStar = star return def secondsUntilTouchdown(self, weather : WeatherModel): if None == self.PlannedRunway or None == self.PlannedStar: return timedelta(seconds = 0) # calculate remaining trackmiles trackmiles = self.Report.distanceToIAF start = self.PlannedStar.Route[0] turnIndices = [ -1, -1 ] constraints = [] for i in range(0, len(self.PlannedStar.Route)): # identified the base turn if True == self.PlannedStar.Route[i].BaseTurn: turnIndices[0] = i # identified the final turn elif -1 != turnIndices[0] and True == self.PlannedStar.Route[i].FinalTurn: turnIndices[1] = i # skip waypoints until the final turn point is found elif -1 != turnIndices[0] and -1 == turnIndices[1]: continue trackmiles += start.haversine(self.PlannedStar.Route[i]) * 0.539957 # check if a new constraint is defined altitude = -1 speed = -1 if None != self.PlannedStar.Route[i].Altitude: altitude = self.PlannedStar.Route[i].Altitude if None != self.PlannedStar.Route[i].Speed: speed = self.PlannedStar.Route[i].Speed if -1 != altitude or -1 != speed: constraints.append([ trackmiles, altitude, speed ]) start = self.PlannedStar.Route[i] # add the remaining distance from the last waypoint to the runway threshold trackmiles += start.haversine(self.PlannedRunway.Runway.Start) if turnIndices[0] > turnIndices[1] or (-1 == turnIndices[1] and -1 != turnIndices[0]): sys.stderr.write('Invalid constraint definition found for ' + self.PlannedStar.Name) sys.exit(-1) # calculate descend profile currentHeading = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude).bearing(self.PlannedStar.Route[0]) currentIAS = self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles) currentPosition = [ self.Report.dynamics.altitude, self.Report.dynamics.groundSpeed ] distanceToWaypoint = self.Report.distanceToIAF flightTimeUntilIafSeconds = 0 flightTimeSeconds = 0 nextWaypointIndex = 0 flownDistance = 0.0 while True: # check if a constraint cleanup is needed and if a speed-update is needed if 0 != len(constraints) and flownDistance >= constraints[0][0]: if -1 != constraints[0][2]: currentIAS = min(constraints[0][2], self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles - flownDistance)) currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1]) constraints.pop(0) # search next altitude constraint altitudeDistance = 0 nextAltitude = 0 for constraint in constraints: if -1 != constraint[1]: altitudeDistance = constraint[0] nextAltitude = constraint[1] break # check if update of altitude and speed is needed on 3° glide if currentPosition[0] > nextAltitude and ((currentPosition[0] - nextAltitude) / 1000 * 3) > (altitudeDistance - flownDistance): oldGroundspeed = currentPosition[1] descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6 newAltitude = currentPosition[0] - descendRate if 0 > newAltitude: newAltitude = 0 currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) ] distance = (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6 else: distance = currentPosition[1] / 60 / 6 # update the statistics distanceToWaypoint -= distance flownDistance += distance newIAS = min(currentIAS, self.PerformanceData.ias(currentPosition[0], trackmiles - flownDistance)) if newIAS < currentIAS: currentPosition[1] = min(weather.calculateGS(currentPosition[0], newIAS, currentHeading), currentPosition[1]) currentIAS = newIAS flightTimeSeconds += 10 if flownDistance <= self.Report.distanceToIAF: flightTimeUntilIafSeconds += 10 if flownDistance >= trackmiles: break # check if we follow a new waypoint pair if 0 >= distanceToWaypoint: lastWaypointIndex = nextWaypointIndex nextWaypointIndex += 1 # check if a skip from base to final turn waypoints is needed if -1 != turnIndices[0] and nextWaypointIndex > turnIndices[0] and nextWaypointIndex < turnIndices[1]: nextWaypointIndex = turnIndices[1] # update the statistics if nextWaypointIndex < len(self.PlannedStar.Route): distanceToWaypoint = self.PlannedStar.Route[lastWaypointIndex].haversine(self.PlannedStar.Route[nextWaypointIndex]) * 0.539957 currentHeading = self.PlannedStar.Route[lastWaypointIndex].bearing(self.PlannedStar.Route[nextWaypointIndex]) currentPosition[1] = min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) return timedelta(seconds = flightTimeSeconds), timedelta(seconds = flightTimeUntilIafSeconds), trackmiles