diff --git a/aman/types/ArrivalTime.py b/aman/types/ArrivalTime.py new file mode 100644 index 0000000..97ec70f --- /dev/null +++ b/aman/types/ArrivalTime.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python + +from datetime import datetime, timedelta + +from aman.types.ArrivalRoute import ArrivalRoute + +class ArrivalTime: + def __init__(self, **kargs): + self.Star = None + self.MaximumTimeToGain = None + self.FlightTimeUntilIaf = None + self.FlightTimeUntilTouchdown = None + self.InitialArrivalTime = None + self.EarliestArrivalTime = None + + for key, value in kargs.items(): + if 'ttg' == key: + if True == isinstance(value, timedelta): + self.MaximumTimeToGain = value + elif True == isinstance(value, (int, float)): + self.MaximumTimeToGain = timedelta(seconds = float(value)) + else: + raise Exception('Invalid type for ttg') + elif 'star' == key: + if True == isinstance(value, ArrivalRoute): + self.Star = value + else: + raise Exception('Invalid type for star') + elif 'ita' == key: + if True == isinstance(value, datetime): + self.InitialArrivalTime = value + else: + raise Exception('Invalid type for ita') + elif 'earliest' == key: + if True == isinstance(value, datetime): + self.EarliestArrivalTime = value + else: + raise Exception('Invalid type for earliest') + elif 'touchdown' == key: + if True == isinstance(value, timedelta): + self.FlightTimeUntilTouchdown = value + else: + raise Exception('Invalid type for touchdown') + elif 'entry' == key: + if True == isinstance(value, timedelta): + self.FlightTimeUntilIaf = value + else: + raise Exception('Invalid type for entry') + else: + raise Exception('Unknown key: ' + key) diff --git a/aman/types/Inbound.py b/aman/types/Inbound.py index 63ce2b6..a43200e 100644 --- a/aman/types/Inbound.py +++ b/aman/types/Inbound.py @@ -10,14 +10,33 @@ from aman.config.AirportSequencing import AirportSequencing from aman.formats.SctEseFormat import SctEseFormat from aman.sys.WeatherModel import WeatherModel from aman.types.PerformanceData import PerformanceData +from aman.types.ArrivalRoute import ArrivalRoute +from aman.types.ArrivalTime import ArrivalTime +from aman.types.Runway import Runway from aman.types.Waypoint import Waypoint class Inbound: + def findArrivalRoute(self, runway : Runway, navData : SctEseFormat): + for arrivalRunway in navData.ArrivalRoutes: + if arrivalRunway == runway.Name: + stars = navData.ArrivalRoutes[arrivalRunway] + for star in stars: + if 0 != len(star.Route) and self.Report.initialApproachFix == star.Iaf.Name: + return star + return None + def __init__(self, report : AircraftReport_pb2.AircraftReport, sequencingConfig : AirportSequencing, navData : SctEseFormat, performanceData : PerformanceData, weatherModel : WeatherModel): self.Report = report self.CurrentPosition = report.position self.ReportTime = datetime.strptime(report.reportTime + '+0000', '%Y%m%d%H%M%S%z').replace(tzinfo = pytz.UTC) + self.InitialArrivalTime = None + self.EarliestArrivalTime = None + self.EstimatedArrivalTime = None + self.EstimatedStarEntryTime = None + self.PlannedRunway = None + self.PlannedStar = None + self.ArrivalCandidates = {} # search performance data -> fallback to A320 if self.Report.aircraft.type in performanceData.Aircrafts: @@ -25,89 +44,76 @@ class Inbound: if None == self.PerformanceData: self.PerformanceData = performanceData.Aircrafts['A320'] - self.findArrivalRunway(sequencingConfig) - self.findArrivalRoute(navData) + # calculate the timings for the different arrival runways + for identifier in sequencingConfig.ActiveArrivalRunways: + star = self.findArrivalRoute(identifier.Runway, navData) - flightTime, flightTimeUntilIaf, trackmiles = self.secondsUntilTouchdown(weatherModel) + if None != star: + flightTime, flightTimeUntilIaf, trackmiles = self.arrivalEstimation(identifier.Runway, star, weatherModel) - # calculate the maximum time to gain (assumption: 10% speed increase by acceleration and shortcuts) - avgSpeed = trackmiles / (float(flightTime.seconds) / 3600.0) - self.MaximumTimeToGain = flightTime - timedelta(minutes = (trackmiles / (avgSpeed * 1.1)) * 60) + avgSpeed = trackmiles / (float(flightTime.seconds) / 3600.0) + ttg = flightTime - timedelta(minutes = (trackmiles / (avgSpeed * 1.1)) * 60) + ita = self.ReportTime + flightTime + earliest = ita - ttg + + self.ArrivalCandidates[identifier.Runway.Name] = ArrivalTime(ttg = ttg, star = star, ita = ita, earliest = earliest, + entry = flightTimeUntilIaf, touchdown = flightTime) + + # calculate the first values for later plannings + for candidate in self.ArrivalCandidates: + if None == self.EarliestArrivalTime or self.ArrivalCandidates[candidate].EarliestArrivalTime < self.EarliestArrivalTime: + self.InitialArrivalTime = self.ArrivalCandidates[candidate].InitialArrivalTime + self.EarliestArrivalTime = self.ArrivalCandidates[candidate].EarliestArrivalTime + self.EstimatedStarEntryTime = self.ReportTime + self.ArrivalCandidates[candidate].FlightTimeUntilIaf + self.PlannedStar = self.ArrivalCandidates[candidate].Star - # calculate the different arrival times - self.InitialArrivalTime = self.ReportTime + flightTime - self.EarliestArrivalTime = self.InitialArrivalTime - self.MaximumTimeToGain self.EstimatedArrivalTime = self.InitialArrivalTime - self.EstimatedStarEntryTime = None - - def findArrivalRunway(self, sequencingConfig : AirportSequencing): - self.PlannedRunway = None - - # find the nearest runway for an initial guess - distance = 100000.0 - currentPosition = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude) - for runway in sequencingConfig.ActiveArrivalRunways: - candidateDistance = runway.Runway.Start.haversine(currentPosition) - if distance > candidateDistance: - self.PlannedRunway = runway - distance = candidateDistance - - def findArrivalRoute(self, navData : SctEseFormat): - self.PlannedStar = None - if None == self.PlannedRunway: - return - - for arrivalRunway in navData.ArrivalRoutes: - if arrivalRunway == self.PlannedRunway.Runway.Name: - stars = navData.ArrivalRoutes[arrivalRunway] - for star in stars: - if 0 != len(star.Route) and self.Report.initialApproachFix == star.Iaf.Name: - self.PlannedStar = star - return - - def secondsUntilTouchdown(self, weather : WeatherModel): - if None == self.PlannedRunway or None == self.PlannedStar: - return timedelta(seconds = 0) + if None != self.PlannedStar: + for runway in navData.Runways[self.Report.destination]: + if runway.Name == self.PlannedStar.Runway: + self.PlannedRunway = runway + break + def arrivalEstimation(self, runway : Runway, star : ArrivalRoute, weather : WeatherModel): # calculate remaining trackmiles trackmiles = self.Report.distanceToIAF - start = self.PlannedStar.Route[0] + start = star.Route[0] turnIndices = [ -1, -1 ] constraints = [] - for i in range(0, len(self.PlannedStar.Route)): + for i in range(0, len(star.Route)): # identified the base turn - if True == self.PlannedStar.Route[i].BaseTurn: + if True == star.Route[i].BaseTurn: turnIndices[0] = i # identified the final turn - elif -1 != turnIndices[0] and True == self.PlannedStar.Route[i].FinalTurn: + elif -1 != turnIndices[0] and True == star.Route[i].FinalTurn: turnIndices[1] = i # skip waypoints until the final turn point is found elif -1 != turnIndices[0] and -1 == turnIndices[1]: continue - trackmiles += start.haversine(self.PlannedStar.Route[i]) * 0.539957 + trackmiles += start.haversine(star.Route[i]) * 0.539957 # check if a new constraint is defined altitude = -1 speed = -1 - if None != self.PlannedStar.Route[i].Altitude: - altitude = self.PlannedStar.Route[i].Altitude - if None != self.PlannedStar.Route[i].Speed: - speed = self.PlannedStar.Route[i].Speed + if None != star.Route[i].Altitude: + altitude = star.Route[i].Altitude + if None != star.Route[i].Speed: + speed = star.Route[i].Speed if -1 != altitude or -1 != speed: constraints.append([ trackmiles, altitude, speed ]) - start = self.PlannedStar.Route[i] + start = star.Route[i] # add the remaining distance from the last waypoint to the runway threshold - trackmiles += start.haversine(self.PlannedRunway.Runway.Start) + trackmiles += start.haversine(runway.Start) if turnIndices[0] > turnIndices[1] or (-1 == turnIndices[1] and -1 != turnIndices[0]): - sys.stderr.write('Invalid constraint definition found for ' + self.PlannedStar.Name) + sys.stderr.write('Invalid constraint definition found for ' + star.Name) sys.exit(-1) # calculate descend profile - currentHeading = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude).bearing(self.PlannedStar.Route[0]) + currentHeading = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude).bearing(star.Route[0]) currentIAS = self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles) currentPosition = [ self.Report.dynamics.altitude, self.Report.dynamics.groundSpeed ] distanceToWaypoint = self.Report.distanceToIAF @@ -170,9 +176,9 @@ class Inbound: nextWaypointIndex = turnIndices[1] # update the statistics - if nextWaypointIndex < len(self.PlannedStar.Route): - distanceToWaypoint = self.PlannedStar.Route[lastWaypointIndex].haversine(self.PlannedStar.Route[nextWaypointIndex]) * 0.539957 - currentHeading = self.PlannedStar.Route[lastWaypointIndex].bearing(self.PlannedStar.Route[nextWaypointIndex]) + if nextWaypointIndex < len(star.Route): + distanceToWaypoint = star.Route[lastWaypointIndex].haversine(star.Route[nextWaypointIndex]) * 0.539957 + currentHeading = star.Route[lastWaypointIndex].bearing(star.Route[nextWaypointIndex]) currentPosition[1] = min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) return timedelta(seconds = flightTimeSeconds), timedelta(seconds = flightTimeUntilIafSeconds), trackmiles