123 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			123 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| 
 | |
| from datetime import datetime, timedelta
 | |
| import math
 | |
| import numpy as np
 | |
| import random
 | |
| import bisect
 | |
| import itertools
 | |
| 
 | |
| from aman.sys.aco.Configuration import Configuration
 | |
| from aman.sys.aco.RunwayManager import RunwayManager
 | |
| from aman.sys.aco.Node import Node
 | |
| 
 | |
| # This class implements a single ant of the following paper:
 | |
| # https://sci-hub.mksa.top/10.1109/cec.2019.8790135
 | |
| class Ant:
 | |
|     def __init__(self, pheromoneTable : np.array, configuration : Configuration, nodes):
 | |
|         self.Configuration = configuration
 | |
|         self.Nodes = nodes
 | |
|         self.RunwayManager = RunwayManager(self.Configuration)
 | |
|         self.InboundSelected = [ False ] * len(self.Nodes)
 | |
|         self.InboundScore = np.zeros([ len(self.Nodes), 1 ])
 | |
|         self.PheromoneMatrix = pheromoneTable
 | |
|         self.SequenceDelay = timedelta(seconds = 0)
 | |
|         self.Sequence = None
 | |
| 
 | |
|     # Implements function (5)
 | |
|     def heuristicInformation(self, current : int):
 | |
|         _, eta, _ = self.RunwayManager.selectArrivalRunway(self.Nodes[current], self.Configuration.EarliestArrivalTime)
 | |
|         if None == eta:
 | |
|             return -1.0
 | |
| 
 | |
|         inboundDelay = eta - self.Nodes[current].Inbound.InitialArrivalTime
 | |
| 
 | |
|         # calculate the fraction with a mix of the unused runway time and the delay of single aircrafts
 | |
|         heuristic = inboundDelay.total_seconds() / 60.0
 | |
|         heuristic = (1.0 / (heuristic or 1)) ** self.Configuration.Beta
 | |
|         return heuristic
 | |
| 
 | |
|     # Implements functions (3), (6)
 | |
|     def selectNextLandingIndex(self):
 | |
|         q = float(random.randint(0, 100)) / 100
 | |
|         weights = []
 | |
| 
 | |
|         if q <= self.Configuration.PseudoRandomSelectionRate:
 | |
|             for i in range(0, len(self.InboundSelected)):
 | |
|                 if False == self.InboundSelected[i]:
 | |
|                     weights.append(self.heuristicInformation(i))
 | |
|         else:
 | |
|             # roulette selection strategy
 | |
|             pheromoneScale = 0.0
 | |
|             for i in range(0, len(self.InboundSelected)):
 | |
|                 if False == self.InboundSelected[i]:
 | |
|                     pheromoneScale += self.heuristicInformation(i)
 | |
| 
 | |
|             for i in range(0, len(self.InboundSelected)):
 | |
|                 if False == self.InboundSelected[i]:
 | |
|                     weights.append(self.heuristicInformation(i) / (pheromoneScale or 1))
 | |
| 
 | |
|         # something was wrong in the runway selection
 | |
|         if -1.0 in weights:
 | |
|             return None
 | |
| 
 | |
|         total = sum(weights)
 | |
|         cumdist = list(itertools.accumulate(weights)) + [total]
 | |
|         candidateIndex = bisect.bisect(cumdist, random.random() * total)
 | |
| 
 | |
|         for i in range(0, len(self.InboundSelected)):
 | |
|             if False == self.InboundSelected[i]:
 | |
|                 if 0 == candidateIndex:
 | |
|                     return i
 | |
|                 else:
 | |
|                     candidateIndex -= 1
 | |
| 
 | |
|         return None
 | |
| 
 | |
|     def associateInbound(self, node : Node, earliestArrivalTime : datetime):
 | |
|         # prepare the statistics
 | |
|         rwy, eta, _ = self.RunwayManager.selectArrivalRunway(node, self.Configuration.EarliestArrivalTime)
 | |
|         eta = max(earliestArrivalTime, eta)
 | |
| 
 | |
|         node.Inbound.PlannedRunway = rwy
 | |
|         node.Inbound.PlannedStar = node.ArrivalCandidates[rwy.Name].Star
 | |
|         node.Inbound.PlannedArrivalTime = eta
 | |
|         node.Inbound.PlannedArrivalRoute = node.ArrivalCandidates[rwy.Name].ArrivalRoute
 | |
|         node.Inbound.InitialArrivalTime = node.ArrivalCandidates[rwy.Name].InitialArrivalTime
 | |
|         self.RunwayManager.registerNode(node, rwy.Name)
 | |
| 
 | |
|         delay = node.Inbound.PlannedArrivalTime - node.Inbound.InitialArrivalTime 
 | |
|         if 0.0 < delay.total_seconds():
 | |
|             return delay, rwy
 | |
|         else:
 | |
|             return timedelta(seconds = 0), rwy
 | |
| 
 | |
|     def findSolution(self, first : int):
 | |
|         self.Sequence = []
 | |
| 
 | |
|         # select the first inbound
 | |
|         self.InboundSelected[first] = True
 | |
|         delay, _ = self.associateInbound(self.Nodes[first], self.Configuration.EarliestArrivalTime)
 | |
|         self.Sequence.append(first)
 | |
|         self.SequenceDelay += delay
 | |
| 
 | |
|         while 1:
 | |
|             index = self.selectNextLandingIndex()
 | |
|             if None == index:
 | |
|                 break
 | |
| 
 | |
|             self.InboundSelected[index] = True
 | |
|             delay, _ = self.associateInbound(self.Nodes[index], self.Configuration.EarliestArrivalTime)
 | |
|             self.SequenceDelay += delay
 | |
|             self.Sequence.append(index)
 | |
| 
 | |
|             # update the local pheromone
 | |
|             update = (1.0 - self.Configuration.PropagationRatio) * self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]]
 | |
|             update += self.Configuration.PropagationRatio * self.Configuration.ThetaZero
 | |
|             self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]] = max(self.Configuration.ThetaZero, update)
 | |
| 
 | |
|         # validate that nothing went wrong
 | |
|         if len(self.Sequence) != len(self.Nodes):
 | |
|             self.SequenceDelay = None
 | |
|             self.Sequence = None
 |