#!/usr/bin/env python from datetime import datetime, timedelta import math import numpy as np import random import bisect import itertools from aman.sys.aco.Configuration import Configuration from aman.sys.aco.RunwayManager import RunwayManager from aman.sys.aco.Node import Node # This class implements a single ant of the following paper: # https://sci-hub.mksa.top/10.1109/cec.2019.8790135 class Ant: def __init__(self, pheromoneTable : np.array, configuration : Configuration, nodes): self.Configuration = configuration self.Nodes = nodes self.RunwayManager = RunwayManager(self.Configuration) self.InboundSelected = [ False ] * len(self.Nodes) self.InboundScore = np.zeros([ len(self.Nodes), 1 ]) self.PheromoneMatrix = pheromoneTable self.SequenceDelay = timedelta(seconds = 0) self.Sequence = None def qualifyDelay(delay, node, runway): if 0.0 > delay.total_seconds(): delay = timedelta(seconds = 0) # calculate the heuristic scaling to punish increased delays for single inbounds scaledDelay = delay.total_seconds() / node.ArrivalCandidates[runway.Name].MaximumTimeToLose.total_seconds() return max(1.0 / (99.0 * (scaledDelay ** 2) + 1), 0.01) # Implements function (5), but adapted to the following logic: # An adaption of the heuristic function is used: # - Calculates the unused runway time (time between two consecutive landings) # - Calculates a ratio between the inbound delay and the unused runway time # - Weight the overall ratio based on maximum time to lose to punish high time to lose rates while other flights are gaining time def heuristicInformation(self, preceeding : int, current : int): rwy, eta, unusedRunwayTime = self.RunwayManager.selectArrivalRunway(self.Nodes[current], True, self.Configuration.EarliestArrivalTime) inboundDelay = eta - self.Nodes[current].ArrivalCandidates[rwy.Name].InitialArrivalTime if 0.0 > inboundDelay.total_seconds(): inboundDelay = timedelta(seconds = 0) # calculate the fraction with a mix of the unused runway time and the delay of single aircrafts fraction = self.Configuration.RunwayOccupasionRatio * unusedRunwayTime.total_seconds() fraction += (1.0 - self.Configuration.RunwayOccupasionRatio) * inboundDelay.total_seconds() fraction += self.SequenceDelay.total_seconds() fraction /= 60.0 # calculate the heuristic scaling to punish increased delays for single inbounds weight = Ant.qualifyDelay(inboundDelay, self.Nodes[current], rwy) return weight * self.PheromoneMatrix[preceeding, current] * ((1.0 / (fraction or 1)) ** self.Configuration.Beta) # Implements functions (3), (6) def selectNextLandingIndex(self, preceedingIndex : int): q = float(random.randint(0, 100)) / 100 weights = [] if q <= self.Configuration.PseudoRandomSelectionRate: for i in range(0, len(self.InboundSelected)): if False == self.InboundSelected[i]: weights.append(self.heuristicInformation(preceedingIndex, i)) else: # roulette selection strategy pheromoneScale = 0.0 for i in range(0, len(self.InboundSelected)): if False == self.InboundSelected[i]: pheromoneScale += self.heuristicInformation(preceedingIndex, i) for i in range(0, len(self.InboundSelected)): if False == self.InboundSelected[i]: weights.append(self.heuristicInformation(preceedingIndex, i) / (pheromoneScale or 1)) total = sum(weights) cumdist = list(itertools.accumulate(weights)) + [total] candidateIndex = bisect.bisect(cumdist, random.random() * total) for i in range(0, len(self.InboundSelected)): if False == self.InboundSelected[i]: if 0 == candidateIndex: return i else: candidateIndex -= 1 return None def associateInbound(self, node : Node, earliestArrivalTime : datetime): # prepare the statistics rwy, eta, _ = self.RunwayManager.selectArrivalRunway(node, True, self.Configuration.EarliestArrivalTime) eta = max(earliestArrivalTime, eta) node.Inbound.PlannedRunway = rwy node.Inbound.PlannedStar = node.ArrivalCandidates[rwy.Name].Star node.Inbound.PlannedArrivalTime = eta node.Inbound.PlannedArrivalRoute = node.ArrivalCandidates[rwy.Name].ArrivalRoute node.Inbound.InitialArrivalTime = node.ArrivalCandidates[rwy.Name].InitialArrivalTime self.RunwayManager.RunwayInbounds[rwy.Name] = node delay = node.Inbound.PlannedArrivalTime - node.Inbound.InitialArrivalTime if 0.0 < delay.total_seconds(): return delay, rwy else: return timedelta(seconds = 0), rwy def findSolution(self, first : int): self.Sequence = [] # select the first inbound self.InboundSelected[first] = True delay, rwy = self.associateInbound(self.Nodes[first], self.Configuration.EarliestArrivalTime) self.InboundScore[0] = Ant.qualifyDelay(delay, self.Nodes[first], rwy) self.Sequence.append(first) self.SequenceDelay += delay while 1: index = self.selectNextLandingIndex(self.Sequence[-1]) if None == index: break self.InboundSelected[index] = True delay, rwy = self.associateInbound(self.Nodes[index], self.Configuration.EarliestArrivalTime) self.SequenceDelay += delay self.InboundScore[len(self.Sequence)] = Ant.qualifyDelay(delay, self.Nodes[index], rwy) self.Sequence.append(index) # update the local pheromone update = (1.0 - self.Configuration.propagationRatio) * self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]] update += self.Configuration.propagationRatio * self.Configuration.ThetaZero self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]] = max(self.Configuration.ThetaZero, update) # validate that nothing went wrong if len(self.Sequence) != len(self.Nodes): self.SequenceDelay = None self.SequenceScore = None self.Sequence = None else: self.SequenceScore = np.median(self.InboundScore)