124 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			124 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| 
 | |
| from datetime import datetime, timedelta
 | |
| import math
 | |
| import numpy as np
 | |
| import random
 | |
| import bisect
 | |
| import itertools
 | |
| 
 | |
| from aman.sys.aco.Configuration import Configuration
 | |
| from aman.sys.aco.RunwayManager import RunwayManager
 | |
| from aman.types.Inbound import Inbound
 | |
| 
 | |
| # This class implements a single ant of the following paper:
 | |
| # https://sci-hub.mksa.top/10.1109/cec.2019.8790135
 | |
| class Ant:
 | |
|     def __init__(self, pheromoneTable : np.array, configuration : Configuration):
 | |
|         self.Configuration = configuration
 | |
|         self.RunwayManager = RunwayManager(self.Configuration)
 | |
|         self.InboundSelected = [ False ] * len(self.Configuration.Inbounds)
 | |
|         self.PheromoneMatrix = pheromoneTable
 | |
|         self.SequenceDelay = timedelta(seconds = 0)
 | |
|         self.Sequence = None
 | |
| 
 | |
|     # Implements function (5), but adapted to the following logic:
 | |
|     # An adaption of the heuristic function is used:
 | |
|     #   - Calculates the unused runway time (time between two consecutive landings)
 | |
|     #   - Calculates a ratio between the inbound delay and the unused runway time
 | |
|     #   - Adds the current overal sequence delay to the heuristic function
 | |
|     def heuristicInformation(self, preceeding : int, current : int):
 | |
|         rwy, eta, unusedRunwayTime = self.RunwayManager.selectArrivalRunway(self.Configuration.Inbounds[current], True, self.Configuration.EarliestArrivalTime)
 | |
|         inboundDelay = eta - self.Configuration.Inbounds[current].ArrivalCandidates[rwy.Name].InitialArrivalTime
 | |
|         if 0.0 > inboundDelay.total_seconds():
 | |
|             inboundDelay = timedelta(seconds = 0)
 | |
| 
 | |
|         # calculate the fraction with a mix of the unused runway time and the delay of single aircrafts
 | |
|         fraction = self.Configuration.RunwayOccupasionRatio * unusedRunwayTime.total_seconds()
 | |
|         fraction += (1.0 - self.Configuration.RunwayOccupasionRatio) * inboundDelay.total_seconds()
 | |
|         fraction += self.SequenceDelay.total_seconds()
 | |
|         fraction /= 60.0
 | |
| 
 | |
|         return self.PheromoneMatrix[preceeding, current] * ((1.0 / (fraction or 1)) ** self.Configuration.Beta)
 | |
| 
 | |
|     # Implements functions (3), (6)
 | |
|     def selectNextLandingIndex(self, preceedingIndex : int):
 | |
|         q = float(random.randint(0, 100)) / 100
 | |
|         weights = []
 | |
| 
 | |
|         if q <= self.Configuration.PseudoRandomSelectionRate:
 | |
|             for i in range(0, len(self.InboundSelected)):
 | |
|                 if False == self.InboundSelected[i]:
 | |
|                     weights.append(self.heuristicInformation(preceedingIndex, i))
 | |
|         else:
 | |
|             # roulette selection strategy
 | |
|             pheromoneScale = 0.0
 | |
|             for i in range(0, len(self.InboundSelected)):
 | |
|                 if False == self.InboundSelected[i]:
 | |
|                     pheromoneScale += self.heuristicInformation(preceedingIndex, i)
 | |
| 
 | |
|             # fallback
 | |
|             if 0.0 >= pheromoneScale:
 | |
|                 pheromoneScale = 1.0
 | |
| 
 | |
|             for i in range(0, len(self.InboundSelected)):
 | |
|                 if False == self.InboundSelected[i]:
 | |
|                     weights.append(self.heuristicInformation(preceedingIndex, i) / pheromoneScale)
 | |
| 
 | |
|         total = sum(weights)
 | |
|         cumdist = list(itertools.accumulate(weights)) + [total]
 | |
|         candidateIndex = bisect.bisect(cumdist, random.random() * total)
 | |
| 
 | |
|         for i in range(0, len(self.InboundSelected)):
 | |
|             if False == self.InboundSelected[i]:
 | |
|                 if 0 == candidateIndex:
 | |
|                     return i
 | |
|                 else:
 | |
|                     candidateIndex -= 1
 | |
| 
 | |
|         return None
 | |
| 
 | |
|     def associateInbound(self, inbound : Inbound, earliestArrivalTime : datetime):
 | |
|         # prepare the statistics
 | |
|         rwy, eta, _ = self.RunwayManager.selectArrivalRunway(inbound, True, self.Configuration.EarliestArrivalTime)
 | |
|         eta = max(earliestArrivalTime, eta)
 | |
| 
 | |
|         inbound.PlannedRunway = rwy
 | |
|         inbound.PlannedStar = inbound.ArrivalCandidates[rwy.Name].Star
 | |
|         inbound.PlannedArrivalTime = eta
 | |
|         inbound.InitialArrivalTime = inbound.ArrivalCandidates[rwy.Name].InitialArrivalTime
 | |
|         self.RunwayManager.RunwayInbounds[rwy.Name] = inbound
 | |
| 
 | |
|         delay = inbound.PlannedArrivalTime - inbound.InitialArrivalTime 
 | |
|         if 0.0 < delay.total_seconds():
 | |
|             return delay
 | |
|         else:
 | |
|             return timedelta(seconds = 0)
 | |
| 
 | |
|     def findSolution(self, first : int):
 | |
|         self.Sequence = []
 | |
| 
 | |
|         # select the first inbound
 | |
|         self.InboundSelected[first] = True
 | |
|         self.Sequence.append(first)
 | |
|         self.SequenceDelay += self.associateInbound(self.Configuration.Inbounds[first], self.Configuration.EarliestArrivalTime)
 | |
| 
 | |
|         while 1:
 | |
|             index = self.selectNextLandingIndex(self.Sequence[-1])
 | |
|             if None == index:
 | |
|                 break
 | |
| 
 | |
|             self.InboundSelected[index] = True
 | |
|             self.SequenceDelay += self.associateInbound(self.Configuration.Inbounds[index], self.Configuration.EarliestArrivalTime)
 | |
|             self.Sequence.append(index)
 | |
| 
 | |
|             # update the local pheromone
 | |
|             update = (1.0 - self.Configuration.propagationRatio) * self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]]
 | |
|             update += self.Configuration.propagationRatio * self.Configuration.ThetaZero
 | |
|             self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]] = max(self.Configuration.ThetaZero, update)
 | |
| 
 | |
|         # validate that nothing went wrong
 | |
|         if len(self.Sequence) != len(self.Configuration.Inbounds):
 | |
|             self.SequenceDelay = None
 | |
|             self.Sequence = None
 |