Kaynağa Gözat

rename the cost function. it manages more the runways

Sven Czarnian 3 yıl önce
ebeveyn
işleme
a7541925c7
2 değiştirilmiş dosya ile 101 ekleme ve 96 silme
  1. 0 96
      aman/sys/aco/CostFunction.py
  2. 101 0
      aman/sys/aco/RunwayManager.py

+ 0 - 96
aman/sys/aco/CostFunction.py

@@ -1,96 +0,0 @@
-#!/usr/bin/env python
-
-from datetime import timedelta
-
-from aman.sys.aco.Colony import Colony
-from aman.sys.aco.Constraints import SpacingConstraints
-from aman.types.Inbound import Inbound
-
-class CostFunction:
-    def __init__(self, colony : Colony):
-        self.Spacings = SpacingConstraints()
-        self.Colony = colony
-
-        # initialize the tracker which inbound arrives at which runway
-        self.RunwayInbounds = {}
-        for runway in self.Colony.Configuration.PreceedingInbounds:
-            self.RunwayInbounds[runway] = self.Colony.Configuration.PreceedingInbounds[runway]
-        for runway in self.Colony.Configuration.RunwayConstraints.ActiveArrivalRunways:
-            if not runway in self.RunwayInbounds:
-                self.RunwayInbounds[runway] = None
-
-        return
-
-    def validateWtc(inbound : Inbound):
-        wtc = inbound.Report.aircraft.wtc.upper()
-        if 'L' == wtc or 'M' == wtc or 'H' == None or 'J' == None:
-            return wtc
-        else:
-            return None
-
-    def calculateEta(self, runway : str, inbound : Inbound):
-        if None == self.RunwayInbounds[runway]:
-            return inbound.EstimatedArrivalTime
-
-        preceedingInbound = self.RunwayInbounds[runway]
-
-        # get the WTC constrained ETA
-        wtcPre = CostFunction.validateWtc(preceedingInbound)
-        wtcThis = CostFunction.validateWtc(inbound)
-        if None == wtcPre or None == wtcThis:
-            spacing = 3
-        else:
-            spacing = self.Spacings[wtcPre][wtcThis]
-        delay = timedelta(seconds = round(spacing / inbound.PerformanceData.speedApproach + 0.5))
-        wtcETA = self.RunwayInbounds[runway].EstimatedArrivalTime + delay
-
-        # get the staggered time spacing
-        dependentRunway = self.Colony.Configuration.RunwayConstraints.findDependentRunway(runway)
-        if None != dependentRunway and None != self.RunwayInbounds[dependentRunway.Runway.name]:
-            if preceedingInbound.EstimatedArrivalTime < self.RunwayInbounds[dependentRunway.Runway.name].EstimatedArrivalTime:
-                delay = timedelta(seconds = round(3 / inbound.PerformanceData.speedApproach + 0.5))
-                staggeredETA = self.RunwayInbounds[dependentRunway.Runway.name].EstimatedArrivalTime + delay
-            else:
-                staggeredETA = wtcETA
-        else:
-            staggeredETA = wtcETA
-
-        # get the runway time spacing
-        spacing = self.Colony.Configuration.RunwayConstraints.findRunway(runway).Spacing
-        delay = timedelta(seconds = round(spacing / inbound.PerformanceData.speedApproach + 0.5))
-        runwayETA = self.RunwayInbounds[runway].EstimatedArrivalTime + delay
-
-        # get the biggest ETA to define the maximum but ensure that we are not earlier than the ITA
-        # TODO model the TTG-concept depending on the distance to the IAF
-        candidate = max(max(wtcETA, staggeredETA), runwayETA)
-        if candidate < inbound.InitialArrivalTime:
-            return inbound.InitialArrivalTime
-        else:
-            return candidate
-
-    def selectArrivalRunway(self, inbound : Inbound):
-        availableRunways = []
-        for runway in self.Colony.Configuration.RunwayConstraints.ActiveArrivalRunways:
-            availableRunways.append(runway)
-
-        #if 1 < len(availableRunways):
-            # TODO filter based on type
-            # TODO filter based on airline
-            # ensure that at least one runway is available
-
-        # fallback to check if we have available runways
-        if 0 == len(availableRunways):
-            return self.Colony.Configuration.RunwayConstraints.ActiveArrivalRunways[0]
-
-        # start with the beginning
-        selectedRunway = self.Colony.Configuration.RunwayConstraints.findRunway(availableRunways[0])
-        eta = self.calculateEta(availableRunways[0], inbound)
-
-        # get the runway with the earliest ETA
-        for runway in availableRunways:
-            candidate = self.calculateEta(runway, inbound)
-            if eta > candidate:
-                selectedRunway = self.Colony.Configuration.RunwayConstraints.findRunway(runway)
-                eta = candidate
-
-        return selectedRunway

+ 101 - 0
aman/sys/aco/RunwayManager.py

@@ -0,0 +1,101 @@
+#!/usr/bin/env python
+
+from datetime import timedelta
+
+from aman.sys.aco.Configuration import Configuration
+from aman.sys.aco.Constraints import SpacingConstraints
+from aman.types.Inbound import Inbound
+
+class RunwayManager:
+    def __init__(self, configuration : Configuration):
+        self.Spacings = SpacingConstraints()
+        self.Configuration = configuration
+
+        # initialize the tracker which inbound arrives at which runway
+        self.RunwayInbounds = {}
+        if None != configuration.PreceedingInbounds:
+            for runway in configuration.PreceedingInbounds:
+                self.RunwayInbounds[runway] = configuration.PreceedingInbounds[runway]
+        for runway in configuration.RunwayConstraints.ActiveArrivalRunways:
+            if not runway.Runway.Name in self.RunwayInbounds:
+                self.RunwayInbounds[runway.Runway.Name] = None
+
+    def validateWtc(inbound : Inbound):
+        wtc = inbound.Report.aircraft.wtc.upper()
+        if 'L' == wtc or 'M' == wtc or 'H' == None or 'J' == None:
+            return wtc
+        else:
+            return None
+
+    def calculateEarliestArrivalTime(self, runway : str, inbound : Inbound):
+        if None == self.RunwayInbounds[runway]:
+            return inbound.ArrivalCandidates[runway].EarliestArrivalTime
+
+        preceedingInbound = self.RunwayInbounds[runway]
+
+        # get the WTC constrained ETA
+        wtcPre = RunwayManager.validateWtc(preceedingInbound)
+        wtcThis = RunwayManager.validateWtc(inbound)
+        if None == wtcPre or None == wtcThis:
+            spacing = 3
+        else:
+            spacing = self.Spacings[wtcPre][wtcThis]
+        delay = timedelta(minutes = spacing / (inbound.PerformanceData.SpeedApproach / 60))
+        wtcEarliestArrivalTime = preceedingInbound.PlannedArrivalTime + delay
+
+        # get the staggered time spacing
+        dependentRunway = self.Configuration.RunwayConstraints.findDependentRunway(runway)
+        if None != dependentRunway and None != self.RunwayInbounds[dependentRunway.Runway.Name]:
+            # check if the one on the same runway lands before the one on the parallel runway
+            if preceedingInbound.PlannedArrivalTime < self.RunwayInbounds[dependentRunway.Runway.Name].PlannedArrivalTime:
+                delay = timedelta(minutes = 3 / (inbound.PerformanceData.SpeedApproach / 60))
+                staggeredEarliestArrivalTime = self.RunwayInbounds[dependentRunway.Runway.Name].PlannedArrivalTime + delay
+            # landing on the same runway
+            else:
+                staggeredEarliestArrivalTime = wtcEarliestArrivalTime
+        # no neighbor or no dependent runway (IPA)
+        else:
+            staggeredEarliestArrivalTime = wtcEarliestArrivalTime
+
+        # get the runway time spacing
+        spacing = self.Configuration.RunwayConstraints.findRunway(runway).Spacing
+        delay = timedelta(minutes = spacing / (inbound.PerformanceData.SpeedApproach / 60))
+        runwayEarliestArrivalTime = preceedingInbound.PlannedArrivalTime + delay
+
+        # the candidate with the latest ETA is used -> ensure all safety and procedure constraints
+        candidate = max(max(wtcEarliestArrivalTime, staggeredEarliestArrivalTime), runwayEarliestArrivalTime)
+
+        # check if inbound comes later than the latest possible ETA
+        if candidate < inbound.ArrivalCandidates[runway].EarliestArrivalTime:
+            return inbound.ArrivalCandidates[runway].EarliestArrivalTime
+        # candidate fits into earliest arrival time or is later than this
+        else:
+            return candidate
+
+    def selectArrivalRunway(self, inbound : Inbound):
+        availableRunways = []
+        for runway in self.Configuration.RunwayConstraints.ActiveArrivalRunways:
+            availableRunways.append(runway)
+
+        #if 1 < len(availableRunways):
+            # TODO filter based on type
+            # TODO filter based on airline
+            # ensure that at least one runway is available
+
+        # fallback to check if we have available runways
+        if 0 == len(availableRunways):
+            runway = self.Configuration.RunwayConstraints.ActiveArrivalRunways[0]
+            return runway, self.calculateEarliestArrivalTime(runway.Runway.Name, inbound)
+
+        # start with the beginning
+        selectedRunway = None
+        eta = None
+
+        # get the runway with the earliest ETA
+        for runway in availableRunways:
+            candidate = self.calculateEarliestArrivalTime(runway.Runway.Name, inbound)
+            if None == eta or eta > candidate:
+                selectedRunway = runway.Runway
+                eta = candidate
+
+        return selectedRunway, eta