123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- #!/usr/bin/env python
- from datetime import datetime, timedelta
- import math
- import numpy as np
- import random
- import bisect
- import itertools
- from aman.sys.aco.Configuration import Configuration
- from aman.sys.aco.RunwayManager import RunwayManager
- from aman.types.Inbound import Inbound
- # This class implements a single ant of the following paper:
- # https://sci-hub.mksa.top/10.1109/cec.2019.8790135
- class Ant:
- def __init__(self, pheromoneTable : np.array, configuration : Configuration):
- self.Configuration = configuration
- self.RunwayManager = RunwayManager(self.Configuration)
- self.InboundSelected = [ False ] * len(self.Configuration.Inbounds)
- self.PheromoneMatrix = pheromoneTable
- self.SequenceDelay = timedelta(seconds = 0)
- self.Sequence = None
- # Implements function (5), but adapted to the following logic:
- # An adaption of the heuristic function is used:
- # - Calculates the unused runway time (time between two consecutive landings)
- # - Calculates a ratio between the inbound delay and the unused runway time
- # - Adds the current overal sequence delay to the heuristic function
- def heuristicInformation(self, preceeding : int, current : int):
- rwy, eta, unusedRunwayTime = self.RunwayManager.selectArrivalRunway(self.Configuration.Inbounds[current], True, self.Configuration.EarliestArrivalTime)
- inboundDelay = eta - self.Configuration.Inbounds[current].ArrivalCandidates[rwy.Name].InitialArrivalTime
- if 0.0 > inboundDelay.total_seconds():
- inboundDelay = timedelta(seconds = 0)
- # calculate the fraction with a mix of the unused runway time and the delay of single aircrafts
- fraction = self.Configuration.RunwayOccupasionRatio * unusedRunwayTime.total_seconds()
- fraction += (1.0 - self.Configuration.RunwayOccupasionRatio) * inboundDelay.total_seconds()
- fraction += self.SequenceDelay.total_seconds()
- fraction /= 60.0
- return self.PheromoneMatrix[preceeding, current] * ((1.0 / (fraction or 1)) ** self.Configuration.Beta)
- # Implements functions (3), (6)
- def selectNextLandingIndex(self, preceedingIndex : int):
- q = float(random.randint(0, 100)) / 100
- weights = []
- if q <= self.Configuration.PseudoRandomSelectionRate:
- for i in range(0, len(self.InboundSelected)):
- if False == self.InboundSelected[i]:
- weights.append(self.heuristicInformation(preceedingIndex, i))
- else:
- # roulette selection strategy
- pheromoneScale = 0.0
- for i in range(0, len(self.InboundSelected)):
- if False == self.InboundSelected[i]:
- pheromoneScale += self.heuristicInformation(preceedingIndex, i)
- # fallback
- if 0.0 >= pheromoneScale:
- pheromoneScale = 1.0
- for i in range(0, len(self.InboundSelected)):
- if False == self.InboundSelected[i]:
- weights.append(self.heuristicInformation(preceedingIndex, i) / pheromoneScale)
- total = sum(weights)
- cumdist = list(itertools.accumulate(weights)) + [total]
- candidateIndex = bisect.bisect(cumdist, random.random() * total)
- for i in range(0, len(self.InboundSelected)):
- if False == self.InboundSelected[i]:
- if 0 == candidateIndex:
- return i
- else:
- candidateIndex -= 1
- return None
- def associateInbound(self, inbound : Inbound, earliestArrivalTime : datetime):
- # prepare the statistics
- rwy, eta, _ = self.RunwayManager.selectArrivalRunway(inbound, True, self.Configuration.EarliestArrivalTime)
- eta = max(earliestArrivalTime, eta)
- inbound.PlannedRunway = rwy
- inbound.PlannedStar = inbound.ArrivalCandidates[rwy.Name].Star
- inbound.PlannedArrivalTime = eta
- inbound.InitialArrivalTime = inbound.ArrivalCandidates[rwy.Name].InitialArrivalTime
- self.RunwayManager.RunwayInbounds[rwy.Name] = inbound
- delay = inbound.PlannedArrivalTime - inbound.InitialArrivalTime
- if 0.0 < delay.total_seconds():
- return delay
- else:
- return timedelta(seconds = 0)
- def findSolution(self, first : int):
- self.Sequence = []
- # select the first inbound
- self.InboundSelected[first] = True
- self.Sequence.append(first)
- self.SequenceDelay += self.associateInbound(self.Configuration.Inbounds[first], self.Configuration.EarliestArrivalTime)
- while 1:
- index = self.selectNextLandingIndex(self.Sequence[-1])
- if None == index:
- break
- self.InboundSelected[index] = True
- self.SequenceDelay += self.associateInbound(self.Configuration.Inbounds[index], self.Configuration.EarliestArrivalTime)
- self.Sequence.append(index)
- # update the local pheromone
- update = (1.0 - self.Configuration.propagationRatio) * self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]]
- update += self.Configuration.propagationRatio * self.Configuration.ThetaZero
- self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]] = max(self.Configuration.ThetaZero, update)
- # validate that nothing went wrong
- if len(self.Sequence) != len(self.Configuration.Inbounds):
- self.SequenceDelay = None
- self.Sequence = None
|