#!/usr/bin/env python import pytz import sys from datetime import datetime, timedelta from aman.com import AircraftReport_pb2 from aman.config.AirportSequencing import AirportSequencing from aman.formats.SctEseFormat import SctEseFormat from aman.sys.WeatherModel import WeatherModel from aman.types.PerformanceData import PerformanceData from aman.types.ArrivalRoute import ArrivalRoute from aman.types.ArrivalTime import ArrivalTime from aman.types.Runway import Runway from aman.types.Waypoint import Waypoint class Inbound: def findArrivalRoute(self, runway : Runway, navData : SctEseFormat): for arrivalRunway in navData.ArrivalRoutes: if arrivalRunway == runway.Name: stars = navData.ArrivalRoutes[arrivalRunway] for star in stars: if 0 != len(star.Route) and self.Report.initialApproachFix == star.Iaf.Name: return star return None def __init__(self, report : AircraftReport_pb2.AircraftReport, sequencingConfig : AirportSequencing, navData : SctEseFormat, performanceData : PerformanceData, weatherModel : WeatherModel): self.Report = report self.Callsign = report.aircraft.callsign self.CurrentPosition = report.position self.ReportTime = datetime.strptime(report.reportTime + '+0000', '%Y%m%d%H%M%S%z').replace(tzinfo = pytz.UTC) self.InitialArrivalTime = None self.EarliestArrivalTime = None self.PlannedArrivalTime = None self.EstimatedStarEntryTime = None self.PlannedRunway = None self.PlannedStar = None self.ArrivalCandidates = {} self.WTC = None # analyze the WTC wtc = report.aircraft.wtc.upper() if 'L' == wtc or 'M' == wtc or 'H' == wtc or 'J' == wtc: self.WTC = wtc # search performance data -> fallback to A320 if self.Report.aircraft.type in performanceData.Aircrafts: self.PerformanceData = performanceData.Aircrafts[self.Report.aircraft.type] if None == self.PerformanceData: self.PerformanceData = performanceData.Aircrafts['A320'] # calculate the timings for the different arrival runways for identifier in sequencingConfig.ActiveArrivalRunways: star = self.findArrivalRoute(identifier.Runway, navData) if None != star: flightTime, flightTimeUntilIaf, trackmiles = self.arrivalEstimation(identifier.Runway, star, weatherModel) avgSpeed = trackmiles / (float(flightTime.seconds) / 3600.0) ttg = flightTime - timedelta(minutes = (trackmiles / (avgSpeed * 1.1)) * 60) ttl = timedelta(minutes = (trackmiles / (avgSpeed * 0.9)) * 60) - flightTime ita = self.ReportTime + flightTime earliest = ita - ttg latest = ita + ttl self.ArrivalCandidates[identifier.Runway.Name] = ArrivalTime(ttg = ttg, star = star, ita = ita, earliest = earliest, entry = flightTimeUntilIaf, touchdown = flightTime, ttl = ttl, latest = latest) # calculate the first values for later plannings for candidate in self.ArrivalCandidates: if None == self.EarliestArrivalTime or self.ArrivalCandidates[candidate].EarliestArrivalTime < self.EarliestArrivalTime: self.InitialArrivalTime = self.ArrivalCandidates[candidate].InitialArrivalTime self.EarliestArrivalTime = self.ArrivalCandidates[candidate].EarliestArrivalTime self.EstimatedStarEntryTime = self.ReportTime + self.ArrivalCandidates[candidate].FlightTimeUntilIaf self.PlannedStar = self.ArrivalCandidates[candidate].Star if None != self.PlannedStar: for runway in navData.Runways[self.Report.destination.upper()]: if runway.Name == self.PlannedStar.Runway: self.PlannedRunway = runway break def arrivalEstimation(self, runway : Runway, star : ArrivalRoute, weather : WeatherModel): # calculate remaining trackmiles trackmiles = self.Report.distanceToIAF start = star.Route[0] turnIndices = [ -1, -1 ] constraints = [] for i in range(0, len(star.Route)): # identified the base turn if True == star.Route[i].BaseTurn: turnIndices[0] = i # identified the final turn elif -1 != turnIndices[0] and True == star.Route[i].FinalTurn: turnIndices[1] = i # skip waypoints until the final turn point is found elif -1 != turnIndices[0] and -1 == turnIndices[1]: continue trackmiles += start.haversine(star.Route[i]) * 0.539957 # check if a new constraint is defined altitude = -1 speed = -1 if None != star.Route[i].Altitude: altitude = star.Route[i].Altitude if None != star.Route[i].Speed: speed = star.Route[i].Speed if -1 != altitude or -1 != speed: constraints.append([ trackmiles, altitude, speed ]) start = star.Route[i] # add the remaining distance from the last waypoint to the runway threshold trackmiles += start.haversine(runway.Start) if turnIndices[0] > turnIndices[1] or (-1 == turnIndices[1] and -1 != turnIndices[0]): sys.stderr.write('Invalid constraint definition found for ' + star.Name) sys.exit(-1) # calculate descend profile currentHeading = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude).bearing(star.Route[0]) currentIAS = self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles) currentPosition = [ self.Report.dynamics.altitude, self.Report.dynamics.groundSpeed ] distanceToWaypoint = self.Report.distanceToIAF flightTimeUntilIafSeconds = 0 flightTimeSeconds = 0 nextWaypointIndex = 0 flownDistance = 0.0 while True: # check if a constraint cleanup is needed and if a speed-update is needed if 0 != len(constraints) and flownDistance >= constraints[0][0]: if -1 != constraints[0][2]: currentIAS = min(constraints[0][2], self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles - flownDistance)) currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1]) constraints.pop(0) # search next altitude constraint altitudeDistance = 0 nextAltitude = 0 for constraint in constraints: if -1 != constraint[1]: altitudeDistance = constraint[0] nextAltitude = constraint[1] break # check if update of altitude and speed is needed on 3° glide if currentPosition[0] > nextAltitude and ((currentPosition[0] - nextAltitude) / 1000 * 3) > (altitudeDistance - flownDistance): oldGroundspeed = currentPosition[1] descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6 newAltitude = currentPosition[0] - descendRate if 0 > newAltitude: newAltitude = 0 currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) ] distance = (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6 else: distance = currentPosition[1] / 60 / 6 # update the statistics distanceToWaypoint -= distance flownDistance += distance newIAS = min(currentIAS, self.PerformanceData.ias(currentPosition[0], trackmiles - flownDistance)) if newIAS < currentIAS: currentPosition[1] = min(weather.calculateGS(currentPosition[0], newIAS, currentHeading), currentPosition[1]) currentIAS = newIAS flightTimeSeconds += 10 if flownDistance <= self.Report.distanceToIAF: flightTimeUntilIafSeconds += 10 if flownDistance >= trackmiles: break # check if we follow a new waypoint pair if 0 >= distanceToWaypoint: lastWaypointIndex = nextWaypointIndex nextWaypointIndex += 1 # check if a skip from base to final turn waypoints is needed if -1 != turnIndices[0] and nextWaypointIndex > turnIndices[0] and nextWaypointIndex < turnIndices[1]: nextWaypointIndex = turnIndices[1] # update the statistics if nextWaypointIndex < len(star.Route): distanceToWaypoint = star.Route[lastWaypointIndex].haversine(star.Route[nextWaypointIndex]) * 0.539957 currentHeading = star.Route[lastWaypointIndex].bearing(star.Route[nextWaypointIndex]) currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1]) return timedelta(seconds = flightTimeSeconds), timedelta(seconds = flightTimeUntilIafSeconds), trackmiles