extend the inbound
This commit is contained in:
@@ -1,7 +1,111 @@
|
|||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
import pytz
|
||||||
|
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
from aman.com import AircraftReport_pb2
|
from aman.com import AircraftReport_pb2
|
||||||
|
from aman.config.AirportSequencing import AirportSequencing
|
||||||
|
from aman.formats.SctEseFormat import SctEseFormat
|
||||||
|
from aman.sys.WeatherModel import WeatherModel
|
||||||
|
from aman.types.PerformanceData import PerformanceData
|
||||||
|
from aman.types.Waypoint import Waypoint
|
||||||
|
|
||||||
class Inbound:
|
class Inbound:
|
||||||
def __init__(self, report : AircraftReport_pb2.AircraftReport):
|
def __init__(self, report : AircraftReport_pb2.AircraftReport, sequencingConfig : AirportSequencing, navData : SctEseFormat,
|
||||||
self.report = report
|
performanceData : PerformanceData, weatherModel : WeatherModel):
|
||||||
|
self.Report = report
|
||||||
|
self.CurrentPosition = report.position
|
||||||
|
self.ReportTime = datetime.strptime(report.reportTime + '+0000', '%Y%m%d%H%M%S%z').replace(tzinfo = pytz.UTC)
|
||||||
|
|
||||||
|
# search performance data -> fallback to A320
|
||||||
|
if self.Report.aircraft.type in performanceData.Aircrafts:
|
||||||
|
self.PerformanceData = performanceData.Aircrafts[self.Report.aircraft.type]
|
||||||
|
if None == self.PerformanceData:
|
||||||
|
self.PerformanceData = performanceData.Aircrafts['A320']
|
||||||
|
|
||||||
|
self.findArrivalRunway(sequencingConfig)
|
||||||
|
self.findArrivalRoute(navData)
|
||||||
|
|
||||||
|
duration = self.secondsUntilTouchdown(weatherModel)
|
||||||
|
self.InitialArrivalTime = self.ReportTime + duration
|
||||||
|
self.EstimatedArrivalTime = self.InitialArrivalTime
|
||||||
|
self.EstimatedStarEntryTime = None
|
||||||
|
|
||||||
|
def findArrivalRunway(self, sequencingConfig : AirportSequencing):
|
||||||
|
self.PlannedRunway = None
|
||||||
|
|
||||||
|
# find the nearest runway for an initial guess
|
||||||
|
distance = 100000.0
|
||||||
|
currentPosition = Waypoint('', self.Report.position.latitude, self.Report.position.longitude)
|
||||||
|
for runway in sequencingConfig.ActiveArrivalRunways:
|
||||||
|
candidateDistance = runway.Runway.start.haversine(currentPosition)
|
||||||
|
if distance > candidateDistance:
|
||||||
|
self.PlannedRunway = runway
|
||||||
|
distance = candidateDistance
|
||||||
|
|
||||||
|
def findArrivalRoute(self, navData : SctEseFormat):
|
||||||
|
self.PlannedStar = None
|
||||||
|
if None == self.PlannedRunway:
|
||||||
|
return
|
||||||
|
|
||||||
|
for arrivalRunway in navData.arrivalRoutes:
|
||||||
|
if arrivalRunway == self.PlannedRunway.Runway.name:
|
||||||
|
stars = navData.arrivalRoutes[arrivalRunway]
|
||||||
|
for star in stars:
|
||||||
|
if 0 != len(star.route) and self.Report.initialApproachFix == star.iaf.name:
|
||||||
|
self.PlannedStar = star
|
||||||
|
return
|
||||||
|
|
||||||
|
def secondsUntilTouchdown(self, weather : WeatherModel):
|
||||||
|
if None == self.PlannedRunway or None == self.PlannedStar:
|
||||||
|
return timedelta(seconds = 0)
|
||||||
|
|
||||||
|
# calculate remaining trackmiles
|
||||||
|
remainingDistanceNM = self.Report.distanceToIAF
|
||||||
|
start = self.PlannedStar.route[0]
|
||||||
|
for i in range(1, len(self.PlannedStar.route)):
|
||||||
|
remainingDistanceNM += start.haversine(self.PlannedStar.route[i]) * 0.539957
|
||||||
|
start = self.PlannedStar.route[i]
|
||||||
|
|
||||||
|
# calculate descend profile
|
||||||
|
flightTimeSeconds = 0
|
||||||
|
currentHeading = Waypoint('', self.Report.position.latitude, self.Report.position.longitude).bearing(self.PlannedStar.route[0])
|
||||||
|
distanceToWaypoint = self.Report.distanceToIAF
|
||||||
|
nextWaypointIndex = 0
|
||||||
|
currentPosition = [ self.Report.dynamics.altitude, self.Report.dynamics.groundSpeed ]
|
||||||
|
while 0 < currentPosition[0]:
|
||||||
|
lastDistance = remainingDistanceNM
|
||||||
|
|
||||||
|
# TODO integrate speed and altitude constraints
|
||||||
|
|
||||||
|
# calculate the next position after 10 seconds
|
||||||
|
if (currentPosition[0] / 1000 * 3) > remainingDistanceNM:
|
||||||
|
oldGroundspeed = currentPosition[1]
|
||||||
|
descendRate = (currentPosition[1] / 60) / 3 * 1000 / 6
|
||||||
|
newAltitude = currentPosition[0] - descendRate
|
||||||
|
if 0 > newAltitude:
|
||||||
|
newAltitude = 0
|
||||||
|
|
||||||
|
# get the planned IAS and calculate the new altitude and GS out of the predicted information
|
||||||
|
# we assume that the aircraft only decelerates
|
||||||
|
ias = self.PerformanceData.ias(newAltitude, remainingDistanceNM)
|
||||||
|
currentPosition = [ newAltitude, min(weather.calculateGS(newAltitude, int(ias), currentHeading), currentPosition[1]) ]
|
||||||
|
|
||||||
|
# use the average between last and current speed to estimate the remaining distance
|
||||||
|
remainingDistanceNM -= (currentPosition[1] + oldGroundspeed) / 2 / 60 / 6
|
||||||
|
else:
|
||||||
|
remainingDistanceNM -= currentPosition[1] / 60 / 6
|
||||||
|
|
||||||
|
flightTimeSeconds += 10
|
||||||
|
if 0 > remainingDistanceNM:
|
||||||
|
break
|
||||||
|
|
||||||
|
# check if we follow a new waypoint pair
|
||||||
|
distanceToWaypoint -= abs(lastDistance - remainingDistanceNM)
|
||||||
|
if 0 >= distanceToWaypoint:
|
||||||
|
nextWaypointIndex += 1
|
||||||
|
if nextWaypointIndex < len(self.PlannedStar.route):
|
||||||
|
currentHeading = self.PlannedStar.route[nextWaypointIndex - 1].bearing(self.PlannedStar.route[nextWaypointIndex])
|
||||||
|
|
||||||
|
return timedelta(seconds = flightTimeSeconds)
|
||||||
|
|||||||
Reference in New Issue
Block a user