Преглед изворни кода

introduce classes for the ACO algorithm

Sven Czarnian пре 3 година
родитељ
комит
9d69a60396

+ 7 - 0
aman/sys/aco/Ant.py

@@ -0,0 +1,7 @@
+#!/usr/bin/env python
+
+from aman.sys.aco.Colony import Colony
+
+class Ant:
+    def __init__(self, colony : Colony):
+        self.Colony = colony

+ 12 - 0
aman/sys/aco/Colony.py

@@ -0,0 +1,12 @@
+#!/usr/bin/env python
+
+import numpy as np
+
+from aman.sys.aco.Ant import Ant
+from aman.sys.aco.Configuration import Configuration
+from aman.sys.aco.CostFunction import CostFunction
+
+class Colony:
+    def __init__(self, configuration : Configuration):
+        self.Configuration = configuration
+        self.PheromoneMatrix = np.zeros(( len(configuration.Inbounds), len(configuration.Inbounds) ), dtype=float)

+ 15 - 0
aman/sys/aco/Configuration.py

@@ -0,0 +1,15 @@
+#!/usr/bin/env python
+
+from aman.config.AirportSequencing import AirportSequencing
+
+class Configuration:
+    def __init__(self, runwayInfo : AirportSequencing, antCount : int, explorationCount : int):
+        # the AMAN specific information
+        self.RunwayConstraints = runwayInfo
+        self.PreceedingInbounds = None
+        self.Inbounds = None
+
+        # the ACO specific information
+        self.AntCount = antCount
+        self.ExplorationRuns = explorationCount
+        self.PheromoneEvaporationRate = 0.9

+ 26 - 0
aman/sys/aco/Constraints.py

@@ -0,0 +1,26 @@
+#!/usr/bin/env python
+
+class SpacingConstraints:
+    def __init__(self):
+        self.WtcSpacing = {}
+        self.WtcSpacing['L'] = {}
+        self.WtcSpacing['M'] = {}
+        self.WtcSpacing['H'] = {}
+        self.WtcSpacing['J'] = {}
+
+        self.WtcSpacing['L']['L'] = 3
+        self.WtcSpacing['L']['M'] = 3
+        self.WtcSpacing['L']['S'] = 3
+        self.WtcSpacing['L']['J'] = 3
+        self.WtcSpacing['M']['L'] = 5
+        self.WtcSpacing['M']['M'] = 3
+        self.WtcSpacing['M']['S'] = 3
+        self.WtcSpacing['M']['J'] = 3
+        self.WtcSpacing['H']['L'] = 6
+        self.WtcSpacing['H']['M'] = 5
+        self.WtcSpacing['H']['S'] = 4
+        self.WtcSpacing['H']['J'] = 4
+        self.WtcSpacing['J']['L'] = 8
+        self.WtcSpacing['J']['M'] = 7
+        self.WtcSpacing['J']['S'] = 6
+        self.WtcSpacing['J']['J'] = 6

+ 96 - 0
aman/sys/aco/CostFunction.py

@@ -0,0 +1,96 @@
+#!/usr/bin/env python
+
+from datetime import timedelta
+
+from aman.sys.aco.Colony import Colony
+from aman.sys.aco.Constraints import SpacingConstraints
+from aman.types.Inbound import Inbound
+
+class CostFunction:
+    def __init__(self, colony : Colony):
+        self.Spacings = SpacingConstraints()
+        self.Colony = colony
+
+        # initialize the tracker which inbound arrives at which runway
+        self.RunwayInbounds = {}
+        for runway in self.Colony.Configuration.PreceedingInbounds:
+            self.RunwayInbounds[runway] = self.Colony.Configuration.PreceedingInbounds[runway]
+        for runway in self.Colony.Configuration.RunwayConstraints.ActiveArrivalRunways:
+            if not runway in self.RunwayInbounds:
+                self.RunwayInbounds[runway] = None
+
+        return
+
+    def validateWtc(inbound : Inbound):
+        wtc = inbound.Report.aircraft.wtc.upper()
+        if 'L' == wtc or 'M' == wtc or 'H' == None or 'J' == None:
+            return wtc
+        else:
+            return None
+
+    def calculateEta(self, runway : str, inbound : Inbound):
+        if None == self.RunwayInbounds[runway]:
+            return inbound.EstimatedArrivalTime
+
+        preceedingInbound = self.RunwayInbounds[runway]
+
+        # get the WTC constrained ETA
+        wtcPre = CostFunction.validateWtc(preceedingInbound)
+        wtcThis = CostFunction.validateWtc(inbound)
+        if None == wtcPre or None == wtcThis:
+            spacing = 3
+        else:
+            spacing = self.Spacings[wtcPre][wtcThis]
+        delay = timedelta(seconds = round(spacing / inbound.PerformanceData.speedApproach + 0.5))
+        wtcETA = self.RunwayInbounds[runway].EstimatedArrivalTime + delay
+
+        # get the staggered time spacing
+        dependentRunway = self.Colony.Configuration.RunwayConstraints.findDependentRunway(runway)
+        if None != dependentRunway and None != self.RunwayInbounds[dependentRunway.Runway.name]:
+            if preceedingInbound.EstimatedArrivalTime < self.RunwayInbounds[dependentRunway.Runway.name].EstimatedArrivalTime:
+                delay = timedelta(seconds = round(3 / inbound.PerformanceData.speedApproach + 0.5))
+                staggeredETA = self.RunwayInbounds[dependentRunway.Runway.name].EstimatedArrivalTime + delay
+            else:
+                staggeredETA = wtcETA
+        else:
+            staggeredETA = wtcETA
+
+        # get the runway time spacing
+        spacing = self.Colony.Configuration.RunwayConstraints.findRunway(runway).Spacing
+        delay = timedelta(seconds = round(spacing / inbound.PerformanceData.speedApproach + 0.5))
+        runwayETA = self.RunwayInbounds[runway].EstimatedArrivalTime + delay
+
+        # get the biggest ETA to define the maximum but ensure that we are not earlier than the ITA
+        # TODO model the TTG-concept depending on the distance to the IAF
+        candidate = max(max(wtcETA, staggeredETA), runwayETA)
+        if candidate < inbound.InitialArrivalTime:
+            return inbound.InitialArrivalTime
+        else:
+            return candidate
+
+    def selectArrivalRunway(self, inbound : Inbound):
+        availableRunways = []
+        for runway in self.Colony.Configuration.RunwayConstraints.ActiveArrivalRunways:
+            availableRunways.append(runway)
+
+        #if 1 < len(availableRunways):
+            # TODO filter based on type
+            # TODO filter based on airline
+            # ensure that at least one runway is available
+
+        # fallback to check if we have available runways
+        if 0 == len(availableRunways):
+            return self.Colony.Configuration.RunwayConstraints.ActiveArrivalRunways[0]
+
+        # start with the beginning
+        selectedRunway = self.Colony.Configuration.RunwayConstraints.findRunway(availableRunways[0])
+        eta = self.calculateEta(availableRunways[0], inbound)
+
+        # get the runway with the earliest ETA
+        for runway in availableRunways:
+            candidate = self.calculateEta(runway, inbound)
+            if eta > candidate:
+                selectedRunway = self.Colony.Configuration.RunwayConstraints.findRunway(runway)
+                eta = candidate
+
+        return selectedRunway

+ 0 - 0
aman/sys/aco/__init__.py


+ 1 - 0
setup.py

@@ -77,6 +77,7 @@ setup(
         'aman.config',
         'aman.formats',
         'aman.sys',
+        'aman.sys.aco',
         'aman.tools',
         'aman.types'
     ],