#!/usr/bin/env python from datetime import datetime, timedelta import numpy as np import random import sys import pytz from aman.sys.aco.Ant import Ant from aman.sys.aco.Configuration import Configuration from aman.sys.aco.RunwayManager import RunwayManager from aman.types.Inbound import Inbound # This class implements the ant colony of the following paper: # https://sci-hub.mksa.top/10.1109/cec.2019.8790135 class Colony: def associateInbound(rwyManager : RunwayManager, inbound : Inbound, earliestArrivalTime : datetime, useITA : bool): rwy, eta, _ = rwyManager.selectArrivalRunway(inbound, useITA, earliestArrivalTime) eta = max(earliestArrivalTime, eta) inbound.PlannedRunway = rwy inbound.PlannedStar = inbound.ArrivalCandidates[rwy.Name].Star inbound.PlannedArrivalTime = eta inbound.InitialArrivalTime = inbound.ArrivalCandidates[rwy.Name].InitialArrivalTime rwyManager.RunwayInbounds[rwy.Name] = inbound def calculateInitialCosts(rwyManager : RunwayManager, inbounds, earliestArrivalTime : datetime): overallDelay = timedelta(seconds = 0) # assume that the inbounds are sorted in FCFS order print('FCFS-Sequence:') # TODO remove this after testing and optimization tmp = datetime.now().replace(tzinfo = pytz.UTC) + timedelta(seconds = 50 * len(inbounds)) for inbound in inbounds: # TODO remove this after testing and optimization for runway in inbound.ArrivalCandidates: inbound.ArrivalCandidates[runway].InitialArrivalTime = tmp inbound.ArrivalCandidates[runway].EarliestArrivalTime = tmp - inbound.ArrivalCandidates[runway].MaximumTimeToGain inbound.ArrivalCandidates[runway].LatestArrivalTime = tmp + inbound.ArrivalCandidates[runway].MaximumTimeToLose tmp += timedelta(seconds = 20) Colony.associateInbound(rwyManager, inbound, earliestArrivalTime, False) overallDelay += inbound.PlannedArrivalTime - inbound.InitialArrivalTime print(' ' + inbound.Callsign + ': ' + inbound.PlannedRunway.Name + ' @ ' + str(inbound.PlannedArrivalTime) + ' dt=' + str((inbound.PlannedArrivalTime - inbound.InitialArrivalTime).total_seconds())) return overallDelay def __init__(self, configuration : Configuration): self.Configuration = configuration self.ResultDelay = None self.Result = None rwyManager = RunwayManager(self.Configuration) delay = Colony.calculateInitialCosts(rwyManager, self.Configuration.Inbounds, self.Configuration.EarliestArrivalTime) self.FcfsDelay = delay # check if FCFS is the ideal solution if 0.0 >= delay.total_seconds(): self.Result = self.Configuration.Inbounds self.ResultDelay = delay return # initial value for the optimization self.Configuration.ThetaZero = 1.0 / (len(self.Configuration.Inbounds) * (delay.total_seconds() / 60.0)) self.PheromoneMatrix = np.ones(( len(self.Configuration.Inbounds), len(self.Configuration.Inbounds) ), dtype=float) * self.Configuration.ThetaZero def optimize(self): # FCFS is the best solution if None != self.Result: return # define the tracking variables bestSequence = None # run the optimization loops for _ in range(0, self.Configuration.ExplorationRuns): # select the first inbound index = random.randint(1, len(self.Configuration.Inbounds)) - 1 candidates = [] for _ in range(0, self.Configuration.AntCount): # let the ant find a solution ant = Ant(self.PheromoneMatrix, self.Configuration) ant.findSolution(index) # fallback to check if findSolution was successful if None == ant.SequenceDelay or None == ant.Sequence or None == ant.SequenceScore: sys.stderr.write('Invalid ANT run detected!') sys.exit(-1) candidates.append( [ ant.SequenceDelay, ant.Sequence, ant.SequenceScore, ant.SequenceDelay.total_seconds() / ant.SequenceScore ] ) # find the best solution in all candidates of this generation bestCandidate = None for candidate in candidates: if None == bestCandidate or candidate[3] < bestCandidate[3]: bestCandidate = candidate dTheta = 1.0 / ((candidate[0].total_seconds() / 60.0) or 1.0) for i in range(1, len(candidate[1])): update = (1.0 - self.Configuration.Epsilon) * self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] + dTheta self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] = max(update, self.Configuration.ThetaZero) # check if we find a new best candidate if None != bestCandidate: if None == bestSequence or bestCandidate[0] < bestSequence[0]: bestSequence = bestCandidate # create the final sequence if None != bestSequence: # create the resulting sequence self.ResultDelay = bestSequence[0] self.Result = [] # finalize the sequence rwyManager = RunwayManager(self.Configuration) for i in range(0, len(bestSequence[1])): self.Result.append(self.Configuration.Inbounds[bestSequence[1][i]]) Colony.associateInbound(rwyManager, self.Result[-1], self.Configuration.EarliestArrivalTime, True)