#!/usr/bin/env python from datetime import datetime, timedelta import math import numpy as np import random import bisect import itertools from aman.sys.aco.Configuration import Configuration from aman.sys.aco.RunwayManager import RunwayManager from aman.sys.aco.Node import Node # This class implements a single ant of the following paper: # https://sci-hub.mksa.top/10.1109/cec.2019.8790135 class Ant: def __init__(self, pheromoneTable : np.array, configuration : Configuration, nodes): self.Configuration = configuration self.Nodes = nodes self.RunwayManager = RunwayManager(self.Configuration) self.InboundSelected = [ False ] * len(self.Nodes) self.InboundScore = np.zeros([ len(self.Nodes), 1 ]) self.PheromoneMatrix = pheromoneTable self.SequenceDelay = timedelta(seconds = 0) self.Sequence = None # Implements function (5) def heuristicInformation(self, current : int): _, _, _, eta, _ = self.RunwayManager.selectArrivalRunway(self.Nodes[current], self.Configuration.EarliestArrivalTime) if None == eta: return -1.0 inboundDelay = eta - self.Nodes[current].Inbound.InitialArrivalTime # calculate the fraction with a mix of the unused runway time and the delay of single aircrafts heuristic = inboundDelay.total_seconds() / 60.0 heuristic = (1.0 / (heuristic or 1)) ** self.Configuration.Beta return heuristic # Implements functions (3), (6) def selectNextLandingIndex(self): q = float(random.randint(0, 100)) / 100 weights = [] if q <= self.Configuration.PseudoRandomSelectionRate: for i in range(0, len(self.InboundSelected)): if False == self.InboundSelected[i]: weights.append(self.heuristicInformation(i)) else: # roulette selection strategy pheromoneScale = 0.0 for i in range(0, len(self.InboundSelected)): if False == self.InboundSelected[i]: pheromoneScale += self.heuristicInformation(i) for i in range(0, len(self.InboundSelected)): if False == self.InboundSelected[i]: weights.append(self.heuristicInformation(i) / (pheromoneScale or 1)) # something was wrong in the runway selection if -1.0 in weights: return None total = sum(weights) cumdist = list(itertools.accumulate(weights)) + [total] candidateIndex = bisect.bisect(cumdist, random.random() * total) for i in range(0, len(self.InboundSelected)): if False == self.InboundSelected[i]: if 0 == candidateIndex: return i else: candidateIndex -= 1 return None def associateInbound(self, node : Node, earliestArrivalTime : datetime): # prepare the statistics _, _, rwy, eta, _ = self.RunwayManager.selectArrivalRunway(node, self.Configuration.EarliestArrivalTime) eta = max(earliestArrivalTime, eta) node.Inbound.PlannedRunway = rwy node.Inbound.PlannedStar = node.ArrivalCandidates[rwy.Name].Star node.Inbound.PlannedArrivalTime = eta node.Inbound.PlannedArrivalRoute = node.ArrivalCandidates[rwy.Name].ArrivalRoute node.Inbound.InitialArrivalTime = node.ArrivalCandidates[rwy.Name].InitialArrivalTime self.RunwayManager.registerNode(node, rwy.Name) delay = node.Inbound.PlannedArrivalTime - node.Inbound.InitialArrivalTime if 0.0 < delay.total_seconds(): return delay, rwy else: return timedelta(seconds = 0), rwy def findSolution(self, first : int): self.Sequence = [] # select the first inbound self.InboundSelected[first] = True delay, _ = self.associateInbound(self.Nodes[first], self.Configuration.EarliestArrivalTime) self.Sequence.append(first) self.SequenceDelay += delay while 1: index = self.selectNextLandingIndex() if None == index: break self.InboundSelected[index] = True delay, _ = self.associateInbound(self.Nodes[index], self.Configuration.EarliestArrivalTime) self.SequenceDelay += delay self.Sequence.append(index) # update the local pheromone update = (1.0 - self.Configuration.PropagationRatio) * self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]] update += self.Configuration.PropagationRatio * self.Configuration.ThetaZero self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]] = max(self.Configuration.ThetaZero, update) # validate that nothing went wrong if len(self.Sequence) != len(self.Nodes): self.SequenceDelay = None self.Sequence = None