195 lines
10 KiB
Python
195 lines
10 KiB
Python
#!/usr/bin/env python
|
|
|
|
import math
|
|
import sys
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
from aman.config.AirportSequencing import AirportSequencing
|
|
from aman.formats.SctEseFormat import SctEseFormat
|
|
from aman.sys.WeatherModel import WeatherModel
|
|
from aman.types.ArrivalData import ArrivalData
|
|
from aman.types.ArrivalRoute import ArrivalRoute
|
|
from aman.types.ArrivalWaypoint import ArrivalWaypoint
|
|
from aman.types.Runway import Runway
|
|
from aman.types.Inbound import Inbound
|
|
from aman.types.Waypoint import Waypoint
|
|
|
|
class Node:
|
|
def findArrivalRoute(iaf : str, runway : Runway, navData : SctEseFormat):
|
|
for arrivalRunway in navData.ArrivalRoutes:
|
|
if arrivalRunway == runway.Name:
|
|
stars = navData.ArrivalRoutes[arrivalRunway]
|
|
for star in stars:
|
|
if 0 != len(star.Route) and iaf == star.Iaf.Name:
|
|
return star
|
|
return None
|
|
|
|
def arrivalEstimation(self, runway : Runway, star : ArrivalRoute, weather : WeatherModel):
|
|
# calculate remaining trackmiles
|
|
trackmiles = self.PredictedDistanceToIAF
|
|
start = star.Route[0]
|
|
turnIndices = [ -1, -1 ]
|
|
constraints = []
|
|
for i in range(0, len(star.Route)):
|
|
# identified the base turn
|
|
if True == star.Route[i].BaseTurn:
|
|
turnIndices[0] = i
|
|
# identified the final turn
|
|
elif -1 != turnIndices[0] and True == star.Route[i].FinalTurn:
|
|
turnIndices[1] = i
|
|
# skip waypoints until the final turn point is found
|
|
elif -1 != turnIndices[0] and -1 == turnIndices[1]:
|
|
continue
|
|
|
|
trackmiles += start.haversine(star.Route[i])
|
|
|
|
# check if a new constraint is defined
|
|
altitude = -1
|
|
speed = -1
|
|
if None != star.Route[i].Altitude:
|
|
altitude = star.Route[i].Altitude
|
|
if None != star.Route[i].Speed:
|
|
speed = star.Route[i].Speed
|
|
if -1 != altitude or -1 != speed:
|
|
constraints.append([ trackmiles, altitude, speed ])
|
|
|
|
start = star.Route[i]
|
|
|
|
# add the remaining distance from the last waypoint to the runway threshold
|
|
trackmiles += start.haversine(runway.Start)
|
|
|
|
if turnIndices[0] > turnIndices[1] or (-1 == turnIndices[1] and -1 != turnIndices[0]):
|
|
sys.stderr.write('Invalid constraint definition found for ' + star.Name)
|
|
sys.exit(-1)
|
|
|
|
# calculate descend profile
|
|
currentHeading = Waypoint(latitude = self.Inbound.Report.position.latitude, longitude = self.Inbound.Report.position.longitude).bearing(star.Route[0])
|
|
currentIAS = self.Inbound.PerformanceData.ias(self.Inbound.Report.dynamics.altitude, trackmiles)
|
|
currentPosition = [ self.Inbound.Report.dynamics.altitude, self.Inbound.Report.dynamics.groundSpeed ]
|
|
distanceToWaypoint = self.PredictedDistanceToIAF
|
|
flightTimeSeconds = 0
|
|
nextWaypointIndex = 0
|
|
flownDistance = 0.0
|
|
arrivalRoute = [ ArrivalWaypoint(waypoint = star.Route[0], trackmiles = distanceToWaypoint) ]
|
|
|
|
while True:
|
|
# check if a constraint cleanup is needed and if a speed-update is needed
|
|
if 0 != len(constraints) and flownDistance >= constraints[0][0]:
|
|
if -1 != constraints[0][2]:
|
|
currentIAS = min(constraints[0][2], self.Inbound.PerformanceData.ias(self.Inbound.Report.dynamics.altitude, trackmiles - flownDistance))
|
|
currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
|
|
constraints.pop(0)
|
|
|
|
# search next altitude constraint
|
|
altitudeDistance = 0
|
|
nextAltitude = 0
|
|
for constraint in constraints:
|
|
if -1 != constraint[1]:
|
|
altitudeDistance = constraint[0]
|
|
nextAltitude = constraint[1]
|
|
break
|
|
|
|
# check if update of altitude and speed is needed on 3° glide
|
|
if currentPosition[0] > nextAltitude and ((currentPosition[0] - nextAltitude) / 1000 * 3) > (altitudeDistance - flownDistance):
|
|
oldGroundspeed = currentPosition[1]
|
|
descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6
|
|
newAltitude = currentPosition[0] - descendRate
|
|
if 0 > newAltitude:
|
|
newAltitude = 0
|
|
|
|
currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) ]
|
|
distance = (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6
|
|
else:
|
|
distance = currentPosition[1] / 60 / 6
|
|
|
|
# update the statistics
|
|
distanceToWaypoint -= distance
|
|
flownDistance += distance
|
|
newIAS = min(currentIAS, self.Inbound.PerformanceData.ias(currentPosition[0], trackmiles - flownDistance))
|
|
if newIAS < currentIAS:
|
|
currentPosition[1] = min(weather.calculateGS(currentPosition[0], newIAS, currentHeading), currentPosition[1])
|
|
currentIAS = newIAS
|
|
|
|
flightTimeSeconds += 10
|
|
if flownDistance >= trackmiles:
|
|
break
|
|
|
|
# check if we follow a new waypoint pair
|
|
if 0 >= distanceToWaypoint:
|
|
lastWaypointIndex = nextWaypointIndex
|
|
nextWaypointIndex += 1
|
|
|
|
arrivalRoute[-1].FlightTime = timedelta(seconds = flightTimeSeconds)
|
|
arrivalRoute[-1].ETA = self.Inbound.ReportTime + arrivalRoute[-1].FlightTime
|
|
arrivalRoute[-1].PTA = arrivalRoute[-1].ETA
|
|
arrivalRoute[-1].Altitude = currentPosition[0]
|
|
arrivalRoute[-1].IndicatedAirspeed = currentIAS
|
|
arrivalRoute[-1].GroundSpeed = currentPosition[1]
|
|
|
|
# check if a skip from base to final turn waypoints is needed
|
|
if -1 != turnIndices[0] and nextWaypointIndex > turnIndices[0] and nextWaypointIndex < turnIndices[1]:
|
|
nextWaypointIndex = turnIndices[1]
|
|
|
|
# update the statistics
|
|
if nextWaypointIndex < len(star.Route):
|
|
distanceToWaypoint = star.Route[lastWaypointIndex].haversine(star.Route[nextWaypointIndex])
|
|
currentHeading = star.Route[lastWaypointIndex].bearing(star.Route[nextWaypointIndex])
|
|
currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
|
|
|
|
arrivalRoute.append(ArrivalWaypoint(waypoint = star.Route[nextWaypointIndex], trackmiles = arrivalRoute[-1].Trackmiles + distanceToWaypoint))
|
|
|
|
return timedelta(seconds = flightTimeSeconds), trackmiles, arrivalRoute
|
|
|
|
def __init__(self, inbound : Inbound, referenceTime : datetime, weatherModel : WeatherModel,
|
|
navData : SctEseFormat, sequencingConfig : AirportSequencing):
|
|
self.PredictedDistanceToIAF = inbound.Report.distanceToIAF
|
|
self.PredictedCoordinate = [ inbound.CurrentPosition.latitude, inbound.CurrentPosition.longitude ]
|
|
self.ArrivalCandidates = {}
|
|
self.Inbound = inbound
|
|
|
|
if None == referenceTime or None == sequencingConfig:
|
|
return
|
|
|
|
# predict the distance to IAF
|
|
timePrediction = (referenceTime - inbound.ReportTime).total_seconds()
|
|
if 0 != timePrediction and 0 != len(sequencingConfig.ActiveArrivalRunways):
|
|
# calculate current motion information
|
|
course = weatherModel.estimateCourse(inbound.Report.dynamics.altitude, inbound.Report.dynamics.groundSpeed, inbound.Report.dynamics.heading)
|
|
tempWaypoint = Waypoint(longitude = inbound.CurrentPosition.longitude, latitude = inbound.CurrentPosition.latitude)
|
|
gs = inbound.Report.dynamics.groundSpeed * 0.514444 # ground speed in m/s
|
|
distance = gs * timePrediction
|
|
self.PredictedCoordinate = tempWaypoint.project(course, distance)
|
|
|
|
# calculate the bearing between the current position and the IAF
|
|
star = Node.findArrivalRoute(inbound.Report.initialApproachFix, sequencingConfig.ActiveArrivalRunways[0].Runway, navData)
|
|
|
|
# calculate the distance based on the flown distance and update the predicted distance
|
|
self.PredictedDistanceToIAF = Waypoint(longitude = self.PredictedCoordinate[1], latitude = self.PredictedCoordinate[0]).haversine(star.Route[0])
|
|
if 0.0 > self.PredictedDistanceToIAF:
|
|
self.PredictedDistanceToIAF = 0.0
|
|
|
|
# calculate the timings for the different arrival runways
|
|
for identifier in sequencingConfig.ActiveArrivalRunways:
|
|
star = Node.findArrivalRoute(self.Inbound.Report.initialApproachFix, identifier.Runway, navData)
|
|
|
|
if None != star:
|
|
flightTime, trackmiles, arrivalRoute = self.arrivalEstimation(identifier.Runway, star, weatherModel)
|
|
|
|
avgSpeed = trackmiles / (float(flightTime.seconds) / 3600.0)
|
|
# the closer we get to the IAF the less time delta can be achieved by short cuts, delay vectors or speeds
|
|
ratio = min(2.0, max(0.0, self.PredictedDistanceToIAF / (trackmiles - self.PredictedDistanceToIAF)))
|
|
possibleTimeDelta = (trackmiles / (avgSpeed * 0.9)) * 60
|
|
ttg = timedelta(minutes = (possibleTimeDelta - flightTime.total_seconds() / 60) * ratio)
|
|
ttl = timedelta(minutes = (possibleTimeDelta - flightTime.total_seconds() / 60))
|
|
ita = self.Inbound.ReportTime + flightTime
|
|
earliest = ita - ttg
|
|
latest = ita + ttl
|
|
|
|
self.ArrivalCandidates[identifier.Runway.Name] = ArrivalData(ttg = ttg, star = star, ita = ita, earliest = earliest,
|
|
ttl = ttl, latest = latest, route = arrivalRoute,
|
|
trackmiles = trackmiles)
|
|
|
|
if None == self.Inbound.InitialArrivalTime:
|
|
self.Inbound.InitialArrivalTime = self.ArrivalCandidates[identifier.Runway.Name].InitialArrivalTime
|