Files
aman-sys/aman/types/Inbound.py
2021-10-14 14:35:27 +02:00

184 lines
9.1 KiB
Python

#!/usr/bin/env python
import pytz
import sys
from datetime import datetime, timedelta
from aman.com import AircraftReport_pb2
from aman.config.AirportSequencing import AirportSequencing
from aman.formats.SctEseFormat import SctEseFormat
from aman.sys.WeatherModel import WeatherModel
from aman.types.PerformanceData import PerformanceData
from aman.types.ArrivalRoute import ArrivalRoute
from aman.types.ArrivalTime import ArrivalTime
from aman.types.Runway import Runway
from aman.types.Waypoint import Waypoint
class Inbound:
def findArrivalRoute(self, runway : Runway, navData : SctEseFormat):
for arrivalRunway in navData.ArrivalRoutes:
if arrivalRunway == runway.Name:
stars = navData.ArrivalRoutes[arrivalRunway]
for star in stars:
if 0 != len(star.Route) and self.Report.initialApproachFix == star.Iaf.Name:
return star
return None
def __init__(self, report : AircraftReport_pb2.AircraftReport, sequencingConfig : AirportSequencing, navData : SctEseFormat,
performanceData : PerformanceData, weatherModel : WeatherModel):
self.Report = report
self.CurrentPosition = report.position
self.ReportTime = datetime.strptime(report.reportTime + '+0000', '%Y%m%d%H%M%S%z').replace(tzinfo = pytz.UTC)
self.InitialArrivalTime = None
self.EarliestArrivalTime = None
self.PlannedArrivalTime = None
self.EstimatedStarEntryTime = None
self.PlannedRunway = None
self.PlannedStar = None
self.ArrivalCandidates = {}
# search performance data -> fallback to A320
if self.Report.aircraft.type in performanceData.Aircrafts:
self.PerformanceData = performanceData.Aircrafts[self.Report.aircraft.type]
if None == self.PerformanceData:
self.PerformanceData = performanceData.Aircrafts['A320']
# calculate the timings for the different arrival runways
for identifier in sequencingConfig.ActiveArrivalRunways:
star = self.findArrivalRoute(identifier.Runway, navData)
if None != star:
flightTime, flightTimeUntilIaf, trackmiles = self.arrivalEstimation(identifier.Runway, star, weatherModel)
avgSpeed = trackmiles / (float(flightTime.seconds) / 3600.0)
ttg = flightTime - timedelta(minutes = (trackmiles / (avgSpeed * 1.1)) * 60)
ita = self.ReportTime + flightTime
earliest = ita - ttg
self.ArrivalCandidates[identifier.Runway.Name] = ArrivalTime(ttg = ttg, star = star, ita = ita, earliest = earliest,
entry = flightTimeUntilIaf, touchdown = flightTime)
# calculate the first values for later plannings
for candidate in self.ArrivalCandidates:
if None == self.EarliestArrivalTime or self.ArrivalCandidates[candidate].EarliestArrivalTime < self.EarliestArrivalTime:
self.InitialArrivalTime = self.ArrivalCandidates[candidate].InitialArrivalTime
self.EarliestArrivalTime = self.ArrivalCandidates[candidate].EarliestArrivalTime
self.EstimatedStarEntryTime = self.ReportTime + self.ArrivalCandidates[candidate].FlightTimeUntilIaf
self.PlannedStar = self.ArrivalCandidates[candidate].Star
if None != self.PlannedStar:
for runway in navData.Runways[self.Report.destination.upper()]:
if runway.Name == self.PlannedStar.Runway:
self.PlannedRunway = runway
break
def arrivalEstimation(self, runway : Runway, star : ArrivalRoute, weather : WeatherModel):
# calculate remaining trackmiles
trackmiles = self.Report.distanceToIAF
start = star.Route[0]
turnIndices = [ -1, -1 ]
constraints = []
for i in range(0, len(star.Route)):
# identified the base turn
if True == star.Route[i].BaseTurn:
turnIndices[0] = i
# identified the final turn
elif -1 != turnIndices[0] and True == star.Route[i].FinalTurn:
turnIndices[1] = i
# skip waypoints until the final turn point is found
elif -1 != turnIndices[0] and -1 == turnIndices[1]:
continue
trackmiles += start.haversine(star.Route[i]) * 0.539957
# check if a new constraint is defined
altitude = -1
speed = -1
if None != star.Route[i].Altitude:
altitude = star.Route[i].Altitude
if None != star.Route[i].Speed:
speed = star.Route[i].Speed
if -1 != altitude or -1 != speed:
constraints.append([ trackmiles, altitude, speed ])
start = star.Route[i]
# add the remaining distance from the last waypoint to the runway threshold
trackmiles += start.haversine(runway.Start)
if turnIndices[0] > turnIndices[1] or (-1 == turnIndices[1] and -1 != turnIndices[0]):
sys.stderr.write('Invalid constraint definition found for ' + star.Name)
sys.exit(-1)
# calculate descend profile
currentHeading = Waypoint(latitude = self.Report.position.latitude, longitude = self.Report.position.longitude).bearing(star.Route[0])
currentIAS = self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles)
currentPosition = [ self.Report.dynamics.altitude, self.Report.dynamics.groundSpeed ]
distanceToWaypoint = self.Report.distanceToIAF
flightTimeUntilIafSeconds = 0
flightTimeSeconds = 0
nextWaypointIndex = 0
flownDistance = 0.0
while True:
# check if a constraint cleanup is needed and if a speed-update is needed
if 0 != len(constraints) and flownDistance >= constraints[0][0]:
if -1 != constraints[0][2]:
currentIAS = min(constraints[0][2], self.PerformanceData.ias(self.Report.dynamics.altitude, trackmiles - flownDistance))
currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
constraints.pop(0)
# search next altitude constraint
altitudeDistance = 0
nextAltitude = 0
for constraint in constraints:
if -1 != constraint[1]:
altitudeDistance = constraint[0]
nextAltitude = constraint[1]
break
# check if update of altitude and speed is needed on 3° glide
if currentPosition[0] > nextAltitude and ((currentPosition[0] - nextAltitude) / 1000 * 3) > (altitudeDistance - flownDistance):
oldGroundspeed = currentPosition[1]
descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6
newAltitude = currentPosition[0] - descendRate
if 0 > newAltitude:
newAltitude = 0
currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, currentIAS, currentHeading), currentPosition[1]) ]
distance = (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6
else:
distance = currentPosition[1] / 60 / 6
# update the statistics
distanceToWaypoint -= distance
flownDistance += distance
newIAS = min(currentIAS, self.PerformanceData.ias(currentPosition[0], trackmiles - flownDistance))
if newIAS < currentIAS:
currentPosition[1] = min(weather.calculateGS(currentPosition[0], newIAS, currentHeading), currentPosition[1])
currentIAS = newIAS
flightTimeSeconds += 10
if flownDistance <= self.Report.distanceToIAF:
flightTimeUntilIafSeconds += 10
if flownDistance >= trackmiles:
break
# check if we follow a new waypoint pair
if 0 >= distanceToWaypoint:
lastWaypointIndex = nextWaypointIndex
nextWaypointIndex += 1
# check if a skip from base to final turn waypoints is needed
if -1 != turnIndices[0] and nextWaypointIndex > turnIndices[0] and nextWaypointIndex < turnIndices[1]:
nextWaypointIndex = turnIndices[1]
# update the statistics
if nextWaypointIndex < len(star.Route):
distanceToWaypoint = star.Route[lastWaypointIndex].haversine(star.Route[nextWaypointIndex]) * 0.539957
currentHeading = star.Route[lastWaypointIndex].bearing(star.Route[nextWaypointIndex])
currentPosition[1] = min(weather.calculateGS(currentPosition[0], currentIAS, currentHeading), currentPosition[1])
return timedelta(seconds = flightTimeSeconds), timedelta(seconds = flightTimeUntilIafSeconds), trackmiles