#!/usr/bin/env python from datetime import datetime, timedelta import math import numpy as np import random import bisect import itertools from aman.sys.aco.Configuration import Configuration from aman.sys.aco.RunwayManager import RunwayManager from aman.types.Inbound import Inbound # This class implements a single ant of the following paper: # https://sci-hub.mksa.top/10.1109/cec.2019.8790135 class Ant: def __init__(self, pheromoneTable : np.array, configuration : Configuration): self.Configuration = configuration self.RunwayManager = RunwayManager(self.Configuration) self.InboundSelected = [ False ] * len(self.Configuration.Inbounds) self.PheromoneMatrix = pheromoneTable self.SequenceDelay = timedelta(seconds = 0) self.Sequence = None # Implements function (5), but adapted to the following logic: # An adaption of the heuristic function is used: # - Calculates the unused runway time (time between two consecutive landings) # - Calculates a ratio between the inbound delay and the unused runway time # - Adds the current overal sequence delay to the heuristic function def heuristicInformation(self, preceeding : int, current : int): rwy, eta, unusedRunwayTime = self.RunwayManager.selectArrivalRunway(self.Configuration.Inbounds[current], True, self.Configuration.EarliestArrivalTime) inboundDelay = eta - self.Configuration.Inbounds[current].ArrivalCandidates[rwy.Name].InitialArrivalTime if 0.0 > inboundDelay.total_seconds(): inboundDelay = timedelta(seconds = 0) # calculate the fraction with a mix of the unused runway time and the delay of single aircrafts fraction = self.Configuration.RunwayOccupasionRatio * unusedRunwayTime.total_seconds() fraction += (1.0 - self.Configuration.RunwayOccupasionRatio) * inboundDelay.total_seconds() fraction += self.SequenceDelay.total_seconds() fraction /= 60.0 return self.PheromoneMatrix[preceeding, current] * ((1.0 / (fraction or 1)) ** self.Configuration.Beta) # Implements functions (3), (6) def selectNextLandingIndex(self, preceedingIndex : int): q = float(random.randint(0, 100)) / 100 weights = [] if q <= self.Configuration.PseudoRandomSelectionRate: for i in range(0, len(self.InboundSelected)): if False == self.InboundSelected[i]: weights.append(self.heuristicInformation(preceedingIndex, i)) else: # roulette selection strategy pheromoneScale = 0.0 for i in range(0, len(self.InboundSelected)): if False == self.InboundSelected[i]: pheromoneScale += self.heuristicInformation(preceedingIndex, i) # fallback if 0.0 >= pheromoneScale: pheromoneScale = 1.0 for i in range(0, len(self.InboundSelected)): if False == self.InboundSelected[i]: weights.append(self.heuristicInformation(preceedingIndex, i) / pheromoneScale) total = sum(weights) cumdist = list(itertools.accumulate(weights)) + [total] candidateIndex = bisect.bisect(cumdist, random.random() * total) for i in range(0, len(self.InboundSelected)): if False == self.InboundSelected[i]: if 0 == candidateIndex: return i else: candidateIndex -= 1 return None def associateInbound(self, inbound : Inbound, earliestArrivalTime : datetime): # prepare the statistics rwy, eta, _ = self.RunwayManager.selectArrivalRunway(inbound, True, self.Configuration.EarliestArrivalTime) eta = max(earliestArrivalTime, eta) inbound.PlannedRunway = rwy inbound.PlannedStar = inbound.ArrivalCandidates[rwy.Name].Star inbound.PlannedArrivalTime = eta inbound.InitialArrivalTime = inbound.ArrivalCandidates[rwy.Name].InitialArrivalTime self.RunwayManager.RunwayInbounds[rwy.Name] = inbound delay = inbound.PlannedArrivalTime - inbound.InitialArrivalTime if 0.0 < delay.total_seconds(): return delay else: return timedelta(seconds = 0) def findSolution(self, first : int): self.Sequence = [] # select the first inbound self.InboundSelected[first] = True self.Sequence.append(first) self.SequenceDelay += self.associateInbound(self.Configuration.Inbounds[first], self.Configuration.EarliestArrivalTime) while 1: index = self.selectNextLandingIndex(self.Sequence[-1]) if None == index: break self.InboundSelected[index] = True self.SequenceDelay += self.associateInbound(self.Configuration.Inbounds[index], self.Configuration.EarliestArrivalTime) self.Sequence.append(index) # update the local pheromone update = (1.0 - self.Configuration.propagationRatio) * self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]] update += self.Configuration.propagationRatio * self.Configuration.ThetaZero self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]] = max(self.Configuration.ThetaZero, update) # validate that nothing went wrong if len(self.Sequence) != len(self.Configuration.Inbounds): self.SequenceDelay = None self.Sequence = None