#!/usr/bin/env python import math import sys from datetime import datetime, timedelta from aman.config.AirportSequencing import AirportSequencing from aman.formats.SctEseFormat import SctEseFormat from aman.sys.WeatherModel import WeatherModel from aman.types.ArrivalData import ArrivalData from aman.types.ArrivalRoute import ArrivalRoute from aman.types.ArrivalWaypoint import ArrivalWaypoint from aman.types.Runway import Runway from aman.types.Inbound import Inbound from aman.types.Waypoint import Waypoint class Node: def findArrivalRoute(iaf : str, runway : Runway, navData : SctEseFormat): for arrivalRunway in navData.ArrivalRoutes: if arrivalRunway == runway.Name: stars = navData.ArrivalRoutes[arrivalRunway] for star in stars: if 0 != len(star.Route) and iaf == star.Iaf.Name: return star return None def arrivalEstimation(self, runway : Runway, star : ArrivalRoute, weather : WeatherModel): # calculate remaining trackmiles trackmiles = self.PredictedDistanceToIAF start = star.Route[0] turnIndices = [ -1, -1 ] constraints = [] for i in range(0, len(star.Route)): # identified the base turn if True == star.Route[i].BaseTurn: turnIndices[0] = i # identified the final turn elif -1 != turnIndices[0] and True == star.Route[i].FinalTurn: turnIndices[1] = i # skip waypoints until the final turn point is found elif -1 != turnIndices[0] and -1 == turnIndices[1]: continue trackmiles += start.haversine(star.Route[i]) * 0.539957 # check if a new constraint is defined altitude = -1 speed = -1 if None != star.Route[i].Altitude: altitude = star.Route[i].Altitude if None != star.Route[i].Speed: speed = star.Route[i].Speed if -1 != altitude or -1 != speed: constraints.append([ trackmiles, altitude, speed ]) start = star.Route[i] # add the remaining distance from the last waypoint to the runway threshold trackmiles += start.haversine(runway.Start) if turnIndices[0] > turnIndices[1] or (-1 == turnIndices[1] and -1 != turnIndices[0]): sys.stderr.write('Invalid constraint definition found for ' + star.Name) sys.exit(-1) # calculate descend profile currentHeading = Waypoint(latitude = self.Inbound.Report.position.latitude, longitude = self.Inbound.Report.position.longitude).bearing(star.Route[0]) currentIAS = self.Inbound.PerformanceData.ias(self.Inbound.Report.dynamics.altitude, trackmiles) currentPosition = [ self.Inbound.Report.dynamics.altitude, self.Inbound.Report.dynamics.groundSpeed ] distanceToWaypoint = self.PredictedDistanceToIAF flightTimeSeconds = 0 nextWaypointIndex = 0 flownDistance = 0.0 arrivalRoute = [ ArrivalWaypoint(waypoint = star.Route[0], trackmiles = distanceToWaypoint) ] while True: # check if a constraint cleanup is needed and if a speed-update is needed if 0 != len(constraints) and flownDistance >= constraints[0][0]: if -1 != constraints[0][2]: currentIAS = min(constraints[0][2], self.Inbound.PerformanceData.ias(self.Inbound.Report.dynamics.altitude, trackmiles - flownDistance)) currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1]) constraints.pop(0) # search next altitude constraint altitudeDistance = 0 nextAltitude = 0 for constraint in constraints: if -1 != constraint[1]: altitudeDistance = constraint[0] nextAltitude = constraint[1] break # check if update of altitude and speed is needed on 3° glide if currentPosition[0] > nextAltitude and ((currentPosition[0] - nextAltitude) / 1000 * 3) > (altitudeDistance - flownDistance): oldGroundspeed = currentPosition[1] descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6 newAltitude = currentPosition[0] - descendRate if 0 > newAltitude: newAltitude = 0 currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) ] distance = (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6 else: distance = currentPosition[1] / 60 / 6 # update the statistics distanceToWaypoint -= distance flownDistance += distance newIAS = min(currentIAS, self.Inbound.PerformanceData.ias(currentPosition[0], trackmiles - flownDistance)) if newIAS < currentIAS: currentPosition[1] = min(weather.calculateGS(currentPosition[0], newIAS, currentHeading), currentPosition[1]) currentIAS = newIAS flightTimeSeconds += 10 if flownDistance >= trackmiles: break # check if we follow a new waypoint pair if 0 >= distanceToWaypoint: lastWaypointIndex = nextWaypointIndex nextWaypointIndex += 1 arrivalRoute[-1].FlightTime = timedelta(seconds = flightTimeSeconds) arrivalRoute[-1].ETA = self.Inbound.ReportTime + arrivalRoute[-1].FlightTime arrivalRoute[-1].PTA = arrivalRoute[-1].ETA arrivalRoute[-1].Altitude = currentPosition[0] arrivalRoute[-1].IndicatedAirspeed = currentIAS arrivalRoute[-1].GroundSpeed = currentPosition[1] # check if a skip from base to final turn waypoints is needed if -1 != turnIndices[0] and nextWaypointIndex > turnIndices[0] and nextWaypointIndex < turnIndices[1]: nextWaypointIndex = turnIndices[1] # update the statistics if nextWaypointIndex < len(star.Route): distanceToWaypoint = star.Route[lastWaypointIndex].haversine(star.Route[nextWaypointIndex]) * 0.539957 currentHeading = star.Route[lastWaypointIndex].bearing(star.Route[nextWaypointIndex]) currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1]) arrivalRoute.append(ArrivalWaypoint(waypoint = star.Route[nextWaypointIndex], trackmiles = arrivalRoute[-1].Trackmiles + distanceToWaypoint)) return timedelta(seconds = flightTimeSeconds), trackmiles, arrivalRoute def __init__(self, inbound : Inbound, referenceTime : datetime, weatherModel : WeatherModel, navData : SctEseFormat, sequencingConfig : AirportSequencing): self.PredictedDistanceToIAF = inbound.Report.distanceToIAF self.ArrivalCandidates = {} self.Inbound = inbound # predict the distance to IAF timePrediction = (referenceTime - inbound.ReportTime).total_seconds() if 0 != timePrediction and 0 != len(sequencingConfig.ActiveArrivalRunways): # calculate current motion information course = weatherModel.estimateCourse(inbound.Report.dynamics.altitude, inbound.Report.dynamics.groundSpeed, inbound.Report.dynamics.heading) tempWaypoint = Waypoint(longitude = inbound.CurrentPosition.longitude, latitude = inbound.CurrentPosition.latitude) gs = inbound.Report.dynamics.groundSpeed * 0.514444 # ground speed in m/s distance = gs * timePrediction * 0.000539957 # distance back to nm # calculate the bearing between the current position and the IAF star = Node.findArrivalRoute(inbound.Report.initialApproachFix, sequencingConfig.ActiveArrivalRunways[0].Runway, navData) if None != star: bearing = tempWaypoint.bearing(star.Route[0]) else: bearing = inbound.Report.dynamics.heading # calculate the distance based on the flown distance and update the predicted distance self.PredictedDistanceToIAF -= math.cos(math.radians(bearing - course)) * distance if 0.0 > self.PredictedDistanceToIAF: self.PredictedDistanceToIAF = 0.0 # calculate the timings for the different arrival runways for identifier in sequencingConfig.ActiveArrivalRunways: star = Node.findArrivalRoute(self.Inbound.Report.initialApproachFix, identifier.Runway, navData) if None != star: flightTime, trackmiles, arrivalRoute = self.arrivalEstimation(identifier.Runway, star, weatherModel) avgSpeed = trackmiles / (float(flightTime.seconds) / 3600.0) # the closer we get to the IAF the less time delta can be achieved by short cuts, delay vectors or speeds ratio = min(2.0, max(0.0, self.PredictedDistanceToIAF / (trackmiles - self.PredictedDistanceToIAF))) possibleTimeDelta = (trackmiles / (avgSpeed * 0.9)) * 60 ttg = timedelta(minutes = (possibleTimeDelta - flightTime.total_seconds() / 60) * ratio) ttl = timedelta(minutes = (possibleTimeDelta - flightTime.total_seconds() / 60)) ita = self.Inbound.ReportTime + flightTime earliest = ita - ttg latest = ita + ttl self.ArrivalCandidates[identifier.Runway.Name] = ArrivalData(ttg = ttg, star = star, ita = ita, earliest = earliest, ttl = ttl, latest = latest, route = arrivalRoute, trackmiles = trackmiles) if None == self.Inbound.InitialArrivalTime: self.Inbound.InitialArrivalTime = self.ArrivalCandidates[identifier.Runway.Name].InitialArrivalTime