123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- #!/usr/bin/env python
- import math
- import sys
- from datetime import datetime, timedelta
- from aman.config.Airport import Airport
- from aman.config.AirportSequencing import AirportSequencing
- from aman.formats.SctEseFormat import SctEseFormat
- from aman.sys.WeatherModel import WeatherModel
- from aman.types.ArrivalData import ArrivalData
- from aman.types.ArrivalRoute import ArrivalRoute
- from aman.types.ArrivalWaypoint import ArrivalWaypoint
- from aman.types.Runway import Runway
- from aman.types.Inbound import Inbound
- from aman.types.Waypoint import Waypoint
- class Node:
- def findArrivalRoute(iaf : str, runway : Runway, navData : SctEseFormat):
- for arrivalRunway in navData.ArrivalRoutes:
- if arrivalRunway == runway.Name:
- stars = navData.ArrivalRoutes[arrivalRunway]
- for star in stars:
- if 0 != len(star.Route) and iaf == star.Iaf.Name:
- return star
- return None
- def updateArrivalWaypoint(self, arrivalRoute, flightTime, altitude, indicatedAirspeed, groundSpeed):
- arrivalRoute[-1].FlightTime = timedelta(seconds = flightTime)
- arrivalRoute[-1].ETA = self.PredictionTime + arrivalRoute[-1].FlightTime
- arrivalRoute[-1].PTA = arrivalRoute[-1].ETA
- arrivalRoute[-1].Altitude = altitude
- arrivalRoute[-1].IndicatedAirspeed = indicatedAirspeed
- arrivalRoute[-1].GroundSpeed = groundSpeed
- def arrivalEstimation(self, runway : Runway, star : ArrivalRoute, weather : WeatherModel):
- # calculate remaining trackmiles
- trackmiles = self.PredictedDistanceToIAF
- start = star.Route[0]
- turnIndices = [ -1, -1 ]
- constraints = []
- for i in range(0, len(star.Route)):
- # identified the base turn
- if True == star.Route[i].BaseTurn:
- turnIndices[0] = i
- # identified the final turn
- elif -1 != turnIndices[0] and True == star.Route[i].FinalTurn:
- turnIndices[1] = i
- # skip waypoints until the final turn point is found
- elif -1 != turnIndices[0] and -1 == turnIndices[1]:
- continue
- trackmiles += start.haversine(star.Route[i])
- # check if a new constraint is defined
- altitude = -1
- speed = -1
- if None != star.Route[i].Altitude:
- altitude = star.Route[i].Altitude
- if None != star.Route[i].Speed:
- speed = star.Route[i].Speed
- if -1 != altitude or -1 != speed:
- constraints.append([ trackmiles, altitude, speed ])
- start = star.Route[i]
- # add the remaining distance from the last waypoint to the runway threshold
- trackmiles += start.haversine(runway.Start)
- if turnIndices[0] > turnIndices[1] or (-1 == turnIndices[1] and -1 != turnIndices[0]):
- sys.stderr.write('Invalid constraint definition found for ' + star.Name)
- sys.exit(-1)
- # calculate descend profile
- currentHeading = Waypoint(latitude = self.Inbound.Report.position.latitude, longitude = self.Inbound.Report.position.longitude).bearing(star.Route[0])
- currentIAS = self.Inbound.PerformanceData.ias(self.Inbound.Report.dynamics.altitude, trackmiles)
- currentPosition = [ self.Inbound.Report.dynamics.altitude, self.Inbound.Report.dynamics.groundSpeed ]
- distanceToWaypoint = self.PredictedDistanceToIAF
- flightTimeSeconds = 0
- flightTimeOnStarSeconds = 0
- nextWaypointIndex = 0
- flownDistance = 0.0
- arrivalRoute = [ ArrivalWaypoint(waypoint = star.Route[0], trackmiles = distanceToWaypoint) ]
- while True:
- # check if a constraint cleanup is needed and if a speed-update is needed
- if 0 != len(constraints) and flownDistance >= constraints[0][0]:
- if -1 != constraints[0][2]:
- currentIAS = min(constraints[0][2], self.Inbound.PerformanceData.ias(self.Inbound.Report.dynamics.altitude, trackmiles - flownDistance))
- currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
- constraints.pop(0)
- # search next altitude constraint
- altitudeDistance = 0
- nextAltitude = 0
- for constraint in constraints:
- if -1 != constraint[1]:
- altitudeDistance = constraint[0]
- nextAltitude = constraint[1]
- break
- # check if update of altitude and speed is needed on 3° glide
- if currentPosition[0] > nextAltitude and ((currentPosition[0] - nextAltitude) / 1000 * 3) > (altitudeDistance - flownDistance):
- oldGroundspeed = currentPosition[1]
- descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6
- newAltitude = currentPosition[0] - descendRate
- if 0 > newAltitude:
- newAltitude = 0
- currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) ]
- distance = (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6
- else:
- distance = currentPosition[1] / 60 / 6
- # update the statistics
- distanceToWaypoint -= distance
- flownDistance += distance
- newIAS = min(currentIAS, self.Inbound.PerformanceData.ias(currentPosition[0], trackmiles - flownDistance))
- if newIAS < currentIAS:
- currentPosition[1] = min(weather.calculateGS(currentPosition[0], newIAS, currentHeading), currentPosition[1])
- currentIAS = newIAS
- flightTimeSeconds += 10
- if flownDistance >= self.PredictedDistanceToIAF:
- flightTimeOnStarSeconds += 10
- if flownDistance >= trackmiles:
- if None == arrivalRoute[-1].FlightTime:
- self.updateArrivalWaypoint(arrivalRoute, flightTimeSeconds, currentPosition[0], currentIAS, currentPosition[1])
- break
- # check if we follow a new waypoint pair
- if 0 >= distanceToWaypoint:
- lastWaypointIndex = nextWaypointIndex
- nextWaypointIndex += 1
- self.updateArrivalWaypoint(arrivalRoute, flightTimeSeconds, currentPosition[0], currentIAS, currentPosition[1])
- # check if a skip from base to final turn waypoints is needed
- if -1 != turnIndices[0] and nextWaypointIndex > turnIndices[0] and nextWaypointIndex < turnIndices[1]:
- nextWaypointIndex = turnIndices[1]
- # update the statistics
- if nextWaypointIndex < len(star.Route):
- distanceToWaypoint = star.Route[lastWaypointIndex].haversine(star.Route[nextWaypointIndex])
- currentHeading = star.Route[lastWaypointIndex].bearing(star.Route[nextWaypointIndex])
- currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
- arrivalRoute.append(ArrivalWaypoint(waypoint = star.Route[nextWaypointIndex], trackmiles = arrivalRoute[-1].Trackmiles + distanceToWaypoint))
- return timedelta(seconds = flightTimeSeconds), trackmiles, arrivalRoute, timedelta(seconds = flightTimeOnStarSeconds)
- def __init__(self, inbound : Inbound, referenceTime : datetime, weatherModel : WeatherModel,
- airportConfig : Airport, sequencingConfig : AirportSequencing):
- self.PredictedDistanceToIAF = inbound.Report.distanceToIAF
- self.PredictedCoordinate = [ inbound.CurrentPosition.latitude, inbound.CurrentPosition.longitude ]
- self.PredictionTime = referenceTime
- self.ArrivalCandidates = None
- self.Inbound = inbound
- if None == referenceTime or None == sequencingConfig:
- return
- # predict the distance to IAF
- timePrediction = (referenceTime - inbound.ReportTime).total_seconds()
- if 0 != timePrediction and 0 != len(sequencingConfig.ActiveArrivalRunways):
- # calculate current motion information
- course = weatherModel.estimateCourse(inbound.Report.dynamics.altitude, inbound.Report.dynamics.groundSpeed, inbound.Report.dynamics.heading)
- tempWaypoint = Waypoint(longitude = inbound.CurrentPosition.longitude, latitude = inbound.CurrentPosition.latitude)
- gs = inbound.Report.dynamics.groundSpeed * 0.514444 # ground speed in m/s
- distance = gs * timePrediction
- prediction = tempWaypoint.project(course, distance)
- # calculate the bearing between the current position and the IAF
- star = Node.findArrivalRoute(inbound.Report.initialApproachFix, sequencingConfig.ActiveArrivalRunways[0].Runway, airportConfig.GngData)
- # calculate the distance based on the flown distance and update the predicted distance
- if None != star:
- bearing = Waypoint(longitude = prediction[1], latitude = prediction[0]).bearing(star.Route[0])
- correctedDistance = math.cos(abs(bearing - course)) * distance * 0.000539957
- self.PredictedDistanceToIAF -= correctedDistance
- if 0.0 > self.PredictedDistanceToIAF:
- self.PredictedDistanceToIAF = 0.0
- self.PredictedCoordinate = prediction
- setEnrouteTime = None == self.Inbound.EnrouteArrivalTime
- self.ArrivalCandidates = {}
- # calculate the timings for the different arrival runways
- for identifier in sequencingConfig.ActiveArrivalRunways:
- star = Node.findArrivalRoute(self.Inbound.Report.initialApproachFix, identifier.Runway, airportConfig.GngData)
- if None != star:
- flightTime, trackmiles, arrivalRoute, flightTimeOnStar = self.arrivalEstimation(identifier.Runway, star, weatherModel)
- # use the the distance to the IAF for optimizations
- timeUntilIAF = flightTime - flightTimeOnStar
- if 0.0 > timeUntilIAF.total_seconds():
- timeUntilIAF = timedelta(seconds = 0)
- # the best TTL is the longest path with the slowest speed
- ttgMax = 60
- ttgRatio = 0.05
- if star.Name in airportConfig.OptimizationParameters:
- ttgMax = airportConfig.OptimizationParameters[star.Name][0]
- ttgRatio = airportConfig.OptimizationParameters[star.Name][1]
- ttg = timedelta(seconds = timeUntilIAF.total_seconds() * ttgRatio)
- if (ttg.total_seconds() > ttgMax):
- ttg = timedelta(seconds = ttgMax)
- if None == self.Inbound.MaximumTimeToGain or ttg > self.Inbound.MaximumTimeToGain:
- self.Inbound.MaximumTimeToGain = ttg
- ita = self.Inbound.ReportTime + flightTime
- earliest = ita - self.Inbound.MaximumTimeToGain
- self.ArrivalCandidates[identifier.Runway.Name] = ArrivalData(star = star, ita = earliest, route = arrivalRoute,
- trackmiles = trackmiles)
- if True == setEnrouteTime and (None == self.Inbound.EnrouteArrivalTime or ita < self.Inbound.EnrouteArrivalTime):
- self.Inbound.EnrouteArrivalTime = ita
|