diff --git a/aman/sys/aco/Ant.py b/aman/sys/aco/Ant.py index 807ebbf..7b2ba0e 100644 --- a/aman/sys/aco/Ant.py +++ b/aman/sys/aco/Ant.py @@ -1,7 +1,123 @@ #!/usr/bin/env python -from aman.sys.aco.Colony import Colony +from datetime import datetime, timedelta +import math +import numpy as np +import random +import bisect +import itertools +from aman.sys.aco.Configuration import Configuration +from aman.sys.aco.RunwayManager import RunwayManager +from aman.types.Inbound import Inbound + +# This class implements a single ant of the following paper: +# https://sci-hub.mksa.top/10.1109/cec.2019.8790135 class Ant: - def __init__(self, colony : Colony): - self.Colony = colony \ No newline at end of file + def __init__(self, pheromoneTable : np.array, configuration : Configuration): + self.Configuration = configuration + self.RunwayManager = RunwayManager(self.Configuration) + self.InboundSelected = [ False ] * len(self.Configuration.Inbounds) + self.PheromoneMatrix = pheromoneTable + self.SequenceDelay = timedelta(seconds = 0) + self.Sequence = None + + # Implements function (5), but adapted to the following logic: + # An adaption of the heuristic function is used: + # - Calculates the unused runway time (time between two consecutive landings) + # - Calculates a ratio between the inbound delay and the unused runway time + # - Adds the current overal sequence delay to the heuristic function + def heuristicInformation(self, preceeding : int, current : int): + rwy, eta, unusedRunwayTime = self.RunwayManager.selectArrivalRunway(self.Configuration.Inbounds[current], True, self.Configuration.EarliestArrivalTime) + inboundDelay = eta - self.Configuration.Inbounds[current].ArrivalCandidates[rwy.Name].InitialArrivalTime + if 0.0 > inboundDelay.total_seconds(): + inboundDelay = timedelta(seconds = 0) + + # calculate the fraction with a mix of the unused runway time and the delay of single aircrafts + fraction = self.Configuration.RunwayOccupasionRatio * unusedRunwayTime.total_seconds() + fraction += (1.0 - self.Configuration.RunwayOccupasionRatio) * inboundDelay.total_seconds() + fraction += self.SequenceDelay.total_seconds() + fraction /= 60.0 + + return self.PheromoneMatrix[preceeding, current] * ((1.0 / (fraction or 1)) ** self.Configuration.Beta) + + # Implements functions (3), (6) + def selectNextLandingIndex(self, preceedingIndex : int): + q = float(random.randint(0, 100)) / 100 + weights = [] + + if q <= self.Configuration.PseudoRandomSelectionRate: + for i in range(0, len(self.InboundSelected)): + if False == self.InboundSelected[i]: + weights.append(self.heuristicInformation(preceedingIndex, i)) + else: + # roulette selection strategy + pheromoneScale = 0.0 + for i in range(0, len(self.InboundSelected)): + if False == self.InboundSelected[i]: + pheromoneScale += self.heuristicInformation(preceedingIndex, i) + + # fallback + if 0.0 >= pheromoneScale: + pheromoneScale = 1.0 + + for i in range(0, len(self.InboundSelected)): + if False == self.InboundSelected[i]: + weights.append(self.heuristicInformation(preceedingIndex, i) / pheromoneScale) + + total = sum(weights) + cumdist = list(itertools.accumulate(weights)) + [total] + candidateIndex = bisect.bisect(cumdist, random.random() * total) + + for i in range(0, len(self.InboundSelected)): + if False == self.InboundSelected[i]: + if 0 == candidateIndex: + return i + else: + candidateIndex -= 1 + + return None + + def associateInbound(self, inbound : Inbound, earliestArrivalTime : datetime): + # prepare the statistics + rwy, eta, _ = self.RunwayManager.selectArrivalRunway(inbound, True, self.Configuration.EarliestArrivalTime) + eta = max(earliestArrivalTime, eta) + + inbound.PlannedRunway = rwy + inbound.PlannedStar = inbound.ArrivalCandidates[rwy.Name].Star + inbound.PlannedArrivalTime = eta + inbound.InitialArrivalTime = inbound.ArrivalCandidates[rwy.Name].InitialArrivalTime + self.RunwayManager.RunwayInbounds[rwy.Name] = inbound + + delay = inbound.PlannedArrivalTime - inbound.InitialArrivalTime + if 0.0 < delay.total_seconds(): + return delay + else: + return timedelta(seconds = 0) + + def findSolution(self, first : int): + self.Sequence = [] + + # select the first inbound + self.InboundSelected[first] = True + self.Sequence.append(first) + self.SequenceDelay += self.associateInbound(self.Configuration.Inbounds[first], self.Configuration.EarliestArrivalTime) + + while True: + index = self.selectNextLandingIndex(self.Sequence[-1]) + if None == index: + break + + self.InboundSelected[index] = True + self.SequenceDelay += self.associateInbound(self.Configuration.Inbounds[index], self.Configuration.EarliestArrivalTime) + self.Sequence.append(index) + + # update the local pheromone + update = (1.0 - self.Configuration.propagationRatio) * self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]] + update += self.Configuration.propagationRatio * self.Configuration.ThetaZero + self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]] = max(self.Configuration.ThetaZero, update) + + # validate that nothing went wrong + if len(self.Sequence) != len(self.Configuration.Inbounds): + self.SequenceDelay = None + self.Sequence = None diff --git a/aman/sys/aco/Colony.py b/aman/sys/aco/Colony.py index 8676439..590ceaa 100644 --- a/aman/sys/aco/Colony.py +++ b/aman/sys/aco/Colony.py @@ -1,12 +1,117 @@ #!/usr/bin/env python +from datetime import datetime, timedelta import numpy as np +import random +import sys +import pytz from aman.sys.aco.Ant import Ant from aman.sys.aco.Configuration import Configuration -from aman.sys.aco.CostFunction import CostFunction +from aman.sys.aco.RunwayManager import RunwayManager +from aman.types.Inbound import Inbound +# This class implements the ant colony of the following paper: +# https://sci-hub.mksa.top/10.1109/cec.2019.8790135 class Colony: + def associateInbound(rwyManager : RunwayManager, inbound : Inbound, earliestArrivalTime : datetime, useITA : bool): + rwy, eta, _ = rwyManager.selectArrivalRunway(inbound, useITA, earliestArrivalTime) + eta = max(earliestArrivalTime, eta) + + inbound.PlannedRunway = rwy + inbound.PlannedStar = inbound.ArrivalCandidates[rwy.Name].Star + inbound.PlannedArrivalTime = eta + inbound.InitialArrivalTime = inbound.ArrivalCandidates[rwy.Name].InitialArrivalTime + rwyManager.RunwayInbounds[rwy.Name] = inbound + + def calculateInitialCosts(rwyManager : RunwayManager, inbounds, earliestArrivalTime : datetime): + overallDelay = timedelta(seconds = 0) + + # assume that the inbounds are sorted in FCFS order + print('FCFS-Sequence:') + tmp = datetime.now().replace(tzinfo = pytz.UTC) + timedelta(seconds = 50 * len(inbounds)) + for inbound in inbounds: + for runway in inbound.ArrivalCandidates: + inbound.ArrivalCandidates[runway].InitialArrivalTime = tmp + inbound.ArrivalCandidates[runway].EarliestArrivalTime = tmp + tmp += timedelta(seconds = 20) + Colony.associateInbound(rwyManager, inbound, earliestArrivalTime, False) + overallDelay += inbound.PlannedArrivalTime - inbound.InitialArrivalTime + print(' ' + inbound.Report.aircraft.callsign + ': ' + inbound.PlannedRunway.Name + ' @ ' + str(inbound.PlannedArrivalTime) + + ' dt=' + str((inbound.PlannedArrivalTime - inbound.InitialArrivalTime).total_seconds())) + + return overallDelay + def __init__(self, configuration : Configuration): self.Configuration = configuration - self.PheromoneMatrix = np.zeros(( len(configuration.Inbounds), len(configuration.Inbounds) ), dtype=float) \ No newline at end of file + self.ResultDelay = None + self.Result = None + + rwyManager = RunwayManager(self.Configuration) + delay = Colony.calculateInitialCosts(rwyManager, self.Configuration.Inbounds, self.Configuration.EarliestArrivalTime) + self.FcfsDelay = delay + + # check if FCFS is the ideal solution + if 0.0 >= delay.total_seconds(): + self.Result = self.Configuration.Inbounds + self.ResultDelay = delay + return + + # initial value for the optimization + self.Configuration.ThetaZero = 1.0 / (len(self.Configuration.Inbounds) * (delay.total_seconds() / 60.0)) + self.PheromoneMatrix = np.ones(( len(self.Configuration.Inbounds), len(self.Configuration.Inbounds) ), dtype=float) * self.Configuration.ThetaZero + + def optimize(self): + # FCFS is the best solution + if None != self.Result: + return + + # define the tracking variables + bestSequence = None + + # run the optimization loops + for _ in range(0, self.Configuration.ExplorationRuns): + # select the first inbound + index = random.randint(1, len(self.Configuration.Inbounds)) - 1 + candidates = [] + + for _ in range(0, self.Configuration.AntCount): + # let the ant find a solution + ant = Ant(self.PheromoneMatrix, self.Configuration) + ant.findSolution(index) + + # fallback to check if findSolution was successful + if None == ant.SequenceDelay or None == ant.Sequence: + sys.stderr.write('Invalid ANT run detected!') + sys.exit(-1) + + candidates.append([ ant.SequenceDelay, ant.Sequence ]) + + # find the best solution in all candidates of this generation + bestCandidate = None + for candidate in candidates: + if None == bestCandidate or candidate[0] < bestCandidate[0]: + bestCandidate = candidate + + dTheta = 1.0 / (candidate[0].total_seconds() / 60.0) + for i in range(1, len(candidate[1])): + update = (1.0 - self.Configuration.Epsilon) * self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] + dTheta + self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] = max(update, self.Configuration.ThetaZero) + + # check if we find a new best candidate + if None != bestCandidate: + if None == bestSequence or bestCandidate[0] < bestSequence[0]: + bestSequence = bestCandidate + + print(self.PheromoneMatrix) + # create the final sequence + if None != bestSequence: + # create the resulting sequence + self.ResultDelay = bestSequence[0] + self.Result = [] + + # finalize the sequence + rwyManager = RunwayManager(self.Configuration) + for i in range(0, len(bestSequence[1])): + self.Result.append(self.Configuration.Inbounds[bestSequence[1][i]]) + Colony.associateInbound(rwyManager, self.Result[-1], self.Configuration.EarliestArrivalTime, True)