154 lines
		
	
	
		
			6.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			154 lines
		
	
	
		
			6.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| 
 | |
| from datetime import datetime as dt
 | |
| from datetime import datetime, timedelta
 | |
| import numpy as np
 | |
| import pytz
 | |
| import random
 | |
| import sys
 | |
| import time
 | |
| 
 | |
| from aman.sys.aco.Ant import Ant
 | |
| from aman.sys.aco.Configuration import Configuration
 | |
| from aman.sys.aco.Node import Node
 | |
| from aman.sys.aco.RunwayManager import RunwayManager
 | |
| from aman.types.Inbound import Inbound
 | |
| 
 | |
| # This class implements the ant colony of the following paper:
 | |
| # https://sci-hub.mksa.top/10.1109/cec.2019.8790135
 | |
| class Colony:
 | |
|     def associateInbound(rwyManager : RunwayManager, node : Node, earliestArrivalTime : datetime):
 | |
|         rwy, eta, _ = rwyManager.selectArrivalRunway(node, earliestArrivalTime)
 | |
|         if None == eta:
 | |
|             return False
 | |
|         eta = max(earliestArrivalTime, eta)
 | |
| 
 | |
|         node.Inbound.PlannedRunway = rwy
 | |
|         node.Inbound.PlannedStar = node.ArrivalCandidates[rwy.Name].Star
 | |
|         node.Inbound.PlannedArrivalRoute = node.ArrivalCandidates[rwy.Name].ArrivalRoute
 | |
|         node.Inbound.PlannedArrivalTime = eta
 | |
|         node.Inbound.InitialArrivalTime = node.ArrivalCandidates[rwy.Name].InitialArrivalTime
 | |
|         node.Inbound.PlannedTrackmiles = node.ArrivalCandidates[rwy.Name].Trackmiles
 | |
|         rwyManager.registerNode(node, rwy.Name)
 | |
| 
 | |
|         return True
 | |
| 
 | |
|     def calculateInitialCosts(rwyManager : RunwayManager, nodes, earliestArrivalTime : datetime):
 | |
|         overallDelay = timedelta(seconds = 0)
 | |
| 
 | |
|         # assume that the nodes are sorted in FCFS order
 | |
|         for node in nodes:
 | |
|             if False == Colony.associateInbound(rwyManager, node, earliestArrivalTime):
 | |
|                 return None
 | |
|             overallDelay += node.Inbound.PlannedArrivalTime - node.Inbound.InitialArrivalTime
 | |
| 
 | |
|         return overallDelay
 | |
| 
 | |
|     def __init__(self, inbounds, configuration : Configuration):
 | |
|         self.Configuration = configuration
 | |
|         self.ResultDelay = None
 | |
|         self.FcfsDelay = None
 | |
|         self.Result = None
 | |
|         self.Nodes = []
 | |
| 
 | |
|         # create the new planning instances
 | |
|         currentTime = dt.utcfromtimestamp(int(time.time())).replace(tzinfo = pytz.UTC)
 | |
|         for inbound in inbounds:
 | |
|             self.Nodes.append(Node(inbound, currentTime, self.Configuration.WeatherModel, self.Configuration.AirportConfiguration, self.Configuration.RunwayConstraints))
 | |
| 
 | |
|         rwyManager = RunwayManager(self.Configuration)
 | |
|         delay = Colony.calculateInitialCosts(rwyManager, self.Nodes, self.Configuration.EarliestArrivalTime)
 | |
|         if None == delay:
 | |
|             return
 | |
|         self.FcfsDelay = delay
 | |
| 
 | |
|         # run the optimization in every cycle to ensure optimal spacings based on TTG
 | |
|         if 0.0 >= delay.total_seconds():
 | |
|             delay = timedelta(seconds = 1.0)
 | |
| 
 | |
|         # initial value for the optimization
 | |
|         self.Configuration.ThetaZero = 1.0 / (len(self.Nodes) * (delay.total_seconds() / 60.0))
 | |
|         self.PheromoneMatrix = np.ones(( len(self.Nodes), len(self.Nodes) ), dtype=float) * self.Configuration.ThetaZero
 | |
| 
 | |
|     def sequenceAndPredictInbound(self, rwyManager : RunwayManager, node : Node):
 | |
|         self.Result.append(node)
 | |
|         Colony.associateInbound(rwyManager, node, self.Configuration.EarliestArrivalTime)
 | |
| 
 | |
|         reqTimeDelta = self.Result[-1].Inbound.EnrouteArrivalTime - self.Result[-1].Inbound.PlannedArrivalTime
 | |
|         self.Result[-1].Inbound.PlannedArrivalRoute[0].PTA = self.Result[-1].Inbound.PlannedArrivalRoute[0].ETA - reqTimeDelta
 | |
|         for i in range(1, len(self.Result[-1].Inbound.PlannedArrivalRoute)):
 | |
|             prev = self.Result[-1].Inbound.PlannedArrivalRoute[i - 1]
 | |
|             current = self.Result[-1].Inbound.PlannedArrivalRoute[i]
 | |
|             current.PTA = prev.PTA + (current.ETA - prev.ETA)
 | |
| 
 | |
|     def optimize(self):
 | |
|         if None == self.FcfsDelay:
 | |
|             return False
 | |
| 
 | |
|         # define the tracking variables
 | |
|         bestSequence = None
 | |
| 
 | |
|         # run the optimization loops
 | |
|         for _ in range(0, self.Configuration.ExplorationRuns):
 | |
|             # select the first inbound
 | |
|             index = random.randint(1, len(self.Nodes)) - 1
 | |
|             candidates = []
 | |
| 
 | |
|             for _ in range(0, self.Configuration.AntCount):
 | |
|                 # let the ant find a solution
 | |
|                 ant = Ant(self.PheromoneMatrix, self.Configuration, self.Nodes)
 | |
|                 ant.findSolution(index)
 | |
| 
 | |
|                 # fallback to check if findSolution was successful
 | |
|                 if None == ant.SequenceDelay or None == ant.Sequence:
 | |
|                     sys.stderr.write('Invalid ANT run detected!')
 | |
|                     break
 | |
| 
 | |
|                 candidates.append(
 | |
|                     [
 | |
|                         ant.SequenceDelay,
 | |
|                         ant.Sequence
 | |
|                     ]
 | |
|                 )
 | |
| 
 | |
|             # find the best solution in all candidates of this generation
 | |
|             bestCandidate = None
 | |
|             for candidate in candidates:
 | |
|                 if None == bestCandidate or candidate[0] < bestCandidate[0]:
 | |
|                     bestCandidate = candidate
 | |
| 
 | |
|                 if None != bestSequence:
 | |
|                     dTheta = 1.0 / ((bestSequence[0].total_seconds() / 60.0) or 1.0)
 | |
|                     for i in range(1, len(bestSequence[1])):
 | |
|                         update = (1.0 - self.Configuration.Epsilon) * self.PheromoneMatrix[bestSequence[1][i - 1], bestSequence[1][i]] +  self.Configuration.Epsilon * dTheta
 | |
|                         self.PheromoneMatrix[bestSequence[1][i - 1], bestSequence[1][i]] = max(update, self.Configuration.ThetaZero)
 | |
| 
 | |
|             # check if we find a new best candidate
 | |
|             if None != bestCandidate:
 | |
|                 if None == bestSequence or bestCandidate[0] < bestSequence[0]:
 | |
|                     bestSequence = bestCandidate
 | |
| 
 | |
|                 # found the optimal solution
 | |
|                 if 1 >= bestSequence[0].total_seconds():
 | |
|                     break
 | |
| 
 | |
|         # create the final sequence
 | |
|         self.Result = []
 | |
|         rwyManager = RunwayManager(self.Configuration)
 | |
| 
 | |
|         # use the optimized sequence
 | |
|         if None != bestSequence and self.FcfsDelay >= bestSequence[0]:
 | |
|             # create the resulting sequence
 | |
|             self.ResultDelay = bestSequence[0]
 | |
| 
 | |
|             # finalize the sequence
 | |
|             for idx in bestSequence[1]:
 | |
|                 self.sequenceAndPredictInbound(rwyManager, self.Nodes[idx])
 | |
|         # use the FCFS sequence
 | |
|         else:
 | |
|             self.ResultDelay = self.FcfsDelay
 | |
|             for node in self.Nodes:
 | |
|                 self.sequenceAndPredictInbound(rwyManager, node)
 | |
| 
 | |
|         return True
 |