#!/usr/bin/env python from datetime import datetime, timedelta import numpy as np import random import sys import pytz from aman.sys.aco.Ant import Ant from aman.sys.aco.Configuration import Configuration from aman.sys.aco.RunwayManager import RunwayManager from aman.types.Inbound import Inbound # This class implements the ant colony of the following paper: # https://sci-hub.mksa.top/10.1109/cec.2019.8790135 class Colony: def associateInbound(rwyManager : RunwayManager, inbound : Inbound, earliestArrivalTime : datetime, useITA : bool): rwy, eta, _ = rwyManager.selectArrivalRunway(inbound, useITA, earliestArrivalTime) eta = max(earliestArrivalTime, eta) inbound.PlannedRunway = rwy inbound.PlannedStar = inbound.ArrivalCandidates[rwy.Name].Star inbound.PlannedArrivalRoute = inbound.ArrivalCandidates[rwy.Name].ArrivalRoute inbound.PlannedArrivalTime = eta inbound.InitialArrivalTime = inbound.ArrivalCandidates[rwy.Name].InitialArrivalTime inbound.PlannedTrackmiles = inbound.ArrivalCandidates[rwy.Name].Trackmiles rwyManager.RunwayInbounds[rwy.Name] = inbound def calculateInitialCosts(rwyManager : RunwayManager, inbounds, earliestArrivalTime : datetime): overallDelay = timedelta(seconds = 0) # assume that the inbounds are sorted in FCFS order for inbound in inbounds: Colony.associateInbound(rwyManager, inbound, earliestArrivalTime, False) overallDelay += inbound.PlannedArrivalTime - inbound.InitialArrivalTime return overallDelay def __init__(self, configuration : Configuration): self.Configuration = configuration self.ResultDelay = None self.Result = None rwyManager = RunwayManager(self.Configuration) delay = Colony.calculateInitialCosts(rwyManager, self.Configuration.Inbounds, self.Configuration.EarliestArrivalTime) self.FcfsDelay = delay # run the optimization in every cycle to ensure optimal spacings based on TTG if 0.0 >= delay.total_seconds(): delay = timedelta(seconds = 1.0) # initial value for the optimization self.Configuration.ThetaZero = 1.0 / (len(self.Configuration.Inbounds) * (delay.total_seconds() / 60.0)) self.PheromoneMatrix = np.ones(( len(self.Configuration.Inbounds), len(self.Configuration.Inbounds) ), dtype=float) * self.Configuration.ThetaZero def optimize(self): # FCFS is the best solution if None != self.Result: return # define the tracking variables bestSequence = None # run the optimization loops for _ in range(0, self.Configuration.ExplorationRuns): # select the first inbound index = random.randint(1, len(self.Configuration.Inbounds)) - 1 candidates = [] for _ in range(0, self.Configuration.AntCount): # let the ant find a solution ant = Ant(self.PheromoneMatrix, self.Configuration) ant.findSolution(index) # fallback to check if findSolution was successful if None == ant.SequenceDelay or None == ant.Sequence or None == ant.SequenceScore: sys.stderr.write('Invalid ANT run detected!') sys.exit(-1) candidates.append( [ ant.SequenceDelay, ant.Sequence, ant.SequenceScore, ant.SequenceDelay.total_seconds() / ant.SequenceScore ] ) # find the best solution in all candidates of this generation bestCandidate = None for candidate in candidates: if None == bestCandidate or candidate[3] < bestCandidate[3]: bestCandidate = candidate dTheta = 1.0 / ((candidate[0].total_seconds() / 60.0) or 1.0) for i in range(1, len(candidate[1])): update = (1.0 - self.Configuration.Epsilon) * self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] + dTheta self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] = max(update, self.Configuration.ThetaZero) # check if we find a new best candidate if None != bestCandidate: if None == bestSequence or bestCandidate[0] < bestSequence[0]: bestSequence = bestCandidate # create the final sequence if None != bestSequence: # create the resulting sequence self.ResultDelay = bestSequence[0] self.Result = [] # finalize the sequence rwyManager = RunwayManager(self.Configuration) for i in range(0, len(bestSequence[1])): self.Result.append(self.Configuration.Inbounds[bestSequence[1][i]]) Colony.associateInbound(rwyManager, self.Result[-1], self.Configuration.EarliestArrivalTime, True) # the idea behind the TTL/TTG per waypoint is that the TTL and TTG needs to be achieved in the # first 2/3 of the estimated trackmiles and assign it with a linear function to the waypoints # calculate the TTL/TTG for all the waypoints (TTG is positive) reqTimeDelta = abs((self.Result[-1].InitialArrivalTime - self.Result[-1].PlannedArrivalTime).total_seconds()) gainTime = self.Result[-1].InitialArrivalTime >= self.Result[-1].PlannedArrivalTime m = -3 * reqTimeDelta / (2 * self.Result[-1].PlannedTrackmiles) timeDelta = 0.0 for waypoint in self.Result[-1].PlannedArrivalRoute: waypointDT = m * waypoint.Trackmiles + reqTimeDelta timeDelta += waypointDT if timeDelta > reqTimeDelta: waypointDT -= timeDelta - reqTimeDelta waypointDT = timedelta(seconds = (waypointDT if False == gainTime else -1.0 * waypointDT)) waypoint.PTA = waypoint.ETA + waypointDT # reached the PTA at the waypoint if timeDelta >= reqTimeDelta: break