implement the first version of the ACO algorithm

This commit is contained in:
Sven Czarnian
2021-10-17 17:12:26 +02:00
parent a468f1cc53
commit 061eb7eac6
2 changed files with 226 additions and 5 deletions

View File

@@ -1,7 +1,123 @@
#!/usr/bin/env python #!/usr/bin/env python
from aman.sys.aco.Colony import Colony from datetime import datetime, timedelta
import math
import numpy as np
import random
import bisect
import itertools
from aman.sys.aco.Configuration import Configuration
from aman.sys.aco.RunwayManager import RunwayManager
from aman.types.Inbound import Inbound
# This class implements a single ant of the following paper:
# https://sci-hub.mksa.top/10.1109/cec.2019.8790135
class Ant: class Ant:
def __init__(self, colony : Colony): def __init__(self, pheromoneTable : np.array, configuration : Configuration):
self.Colony = colony self.Configuration = configuration
self.RunwayManager = RunwayManager(self.Configuration)
self.InboundSelected = [ False ] * len(self.Configuration.Inbounds)
self.PheromoneMatrix = pheromoneTable
self.SequenceDelay = timedelta(seconds = 0)
self.Sequence = None
# Implements function (5), but adapted to the following logic:
# An adaption of the heuristic function is used:
# - Calculates the unused runway time (time between two consecutive landings)
# - Calculates a ratio between the inbound delay and the unused runway time
# - Adds the current overal sequence delay to the heuristic function
def heuristicInformation(self, preceeding : int, current : int):
rwy, eta, unusedRunwayTime = self.RunwayManager.selectArrivalRunway(self.Configuration.Inbounds[current], True, self.Configuration.EarliestArrivalTime)
inboundDelay = eta - self.Configuration.Inbounds[current].ArrivalCandidates[rwy.Name].InitialArrivalTime
if 0.0 > inboundDelay.total_seconds():
inboundDelay = timedelta(seconds = 0)
# calculate the fraction with a mix of the unused runway time and the delay of single aircrafts
fraction = self.Configuration.RunwayOccupasionRatio * unusedRunwayTime.total_seconds()
fraction += (1.0 - self.Configuration.RunwayOccupasionRatio) * inboundDelay.total_seconds()
fraction += self.SequenceDelay.total_seconds()
fraction /= 60.0
return self.PheromoneMatrix[preceeding, current] * ((1.0 / (fraction or 1)) ** self.Configuration.Beta)
# Implements functions (3), (6)
def selectNextLandingIndex(self, preceedingIndex : int):
q = float(random.randint(0, 100)) / 100
weights = []
if q <= self.Configuration.PseudoRandomSelectionRate:
for i in range(0, len(self.InboundSelected)):
if False == self.InboundSelected[i]:
weights.append(self.heuristicInformation(preceedingIndex, i))
else:
# roulette selection strategy
pheromoneScale = 0.0
for i in range(0, len(self.InboundSelected)):
if False == self.InboundSelected[i]:
pheromoneScale += self.heuristicInformation(preceedingIndex, i)
# fallback
if 0.0 >= pheromoneScale:
pheromoneScale = 1.0
for i in range(0, len(self.InboundSelected)):
if False == self.InboundSelected[i]:
weights.append(self.heuristicInformation(preceedingIndex, i) / pheromoneScale)
total = sum(weights)
cumdist = list(itertools.accumulate(weights)) + [total]
candidateIndex = bisect.bisect(cumdist, random.random() * total)
for i in range(0, len(self.InboundSelected)):
if False == self.InboundSelected[i]:
if 0 == candidateIndex:
return i
else:
candidateIndex -= 1
return None
def associateInbound(self, inbound : Inbound, earliestArrivalTime : datetime):
# prepare the statistics
rwy, eta, _ = self.RunwayManager.selectArrivalRunway(inbound, True, self.Configuration.EarliestArrivalTime)
eta = max(earliestArrivalTime, eta)
inbound.PlannedRunway = rwy
inbound.PlannedStar = inbound.ArrivalCandidates[rwy.Name].Star
inbound.PlannedArrivalTime = eta
inbound.InitialArrivalTime = inbound.ArrivalCandidates[rwy.Name].InitialArrivalTime
self.RunwayManager.RunwayInbounds[rwy.Name] = inbound
delay = inbound.PlannedArrivalTime - inbound.InitialArrivalTime
if 0.0 < delay.total_seconds():
return delay
else:
return timedelta(seconds = 0)
def findSolution(self, first : int):
self.Sequence = []
# select the first inbound
self.InboundSelected[first] = True
self.Sequence.append(first)
self.SequenceDelay += self.associateInbound(self.Configuration.Inbounds[first], self.Configuration.EarliestArrivalTime)
while True:
index = self.selectNextLandingIndex(self.Sequence[-1])
if None == index:
break
self.InboundSelected[index] = True
self.SequenceDelay += self.associateInbound(self.Configuration.Inbounds[index], self.Configuration.EarliestArrivalTime)
self.Sequence.append(index)
# update the local pheromone
update = (1.0 - self.Configuration.propagationRatio) * self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]]
update += self.Configuration.propagationRatio * self.Configuration.ThetaZero
self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]] = max(self.Configuration.ThetaZero, update)
# validate that nothing went wrong
if len(self.Sequence) != len(self.Configuration.Inbounds):
self.SequenceDelay = None
self.Sequence = None

View File

@@ -1,12 +1,117 @@
#!/usr/bin/env python #!/usr/bin/env python
from datetime import datetime, timedelta
import numpy as np import numpy as np
import random
import sys
import pytz
from aman.sys.aco.Ant import Ant from aman.sys.aco.Ant import Ant
from aman.sys.aco.Configuration import Configuration from aman.sys.aco.Configuration import Configuration
from aman.sys.aco.CostFunction import CostFunction from aman.sys.aco.RunwayManager import RunwayManager
from aman.types.Inbound import Inbound
# This class implements the ant colony of the following paper:
# https://sci-hub.mksa.top/10.1109/cec.2019.8790135
class Colony: class Colony:
def associateInbound(rwyManager : RunwayManager, inbound : Inbound, earliestArrivalTime : datetime, useITA : bool):
rwy, eta, _ = rwyManager.selectArrivalRunway(inbound, useITA, earliestArrivalTime)
eta = max(earliestArrivalTime, eta)
inbound.PlannedRunway = rwy
inbound.PlannedStar = inbound.ArrivalCandidates[rwy.Name].Star
inbound.PlannedArrivalTime = eta
inbound.InitialArrivalTime = inbound.ArrivalCandidates[rwy.Name].InitialArrivalTime
rwyManager.RunwayInbounds[rwy.Name] = inbound
def calculateInitialCosts(rwyManager : RunwayManager, inbounds, earliestArrivalTime : datetime):
overallDelay = timedelta(seconds = 0)
# assume that the inbounds are sorted in FCFS order
print('FCFS-Sequence:')
tmp = datetime.now().replace(tzinfo = pytz.UTC) + timedelta(seconds = 50 * len(inbounds))
for inbound in inbounds:
for runway in inbound.ArrivalCandidates:
inbound.ArrivalCandidates[runway].InitialArrivalTime = tmp
inbound.ArrivalCandidates[runway].EarliestArrivalTime = tmp
tmp += timedelta(seconds = 20)
Colony.associateInbound(rwyManager, inbound, earliestArrivalTime, False)
overallDelay += inbound.PlannedArrivalTime - inbound.InitialArrivalTime
print(' ' + inbound.Report.aircraft.callsign + ': ' + inbound.PlannedRunway.Name + ' @ ' + str(inbound.PlannedArrivalTime) +
' dt=' + str((inbound.PlannedArrivalTime - inbound.InitialArrivalTime).total_seconds()))
return overallDelay
def __init__(self, configuration : Configuration): def __init__(self, configuration : Configuration):
self.Configuration = configuration self.Configuration = configuration
self.PheromoneMatrix = np.zeros(( len(configuration.Inbounds), len(configuration.Inbounds) ), dtype=float) self.ResultDelay = None
self.Result = None
rwyManager = RunwayManager(self.Configuration)
delay = Colony.calculateInitialCosts(rwyManager, self.Configuration.Inbounds, self.Configuration.EarliestArrivalTime)
self.FcfsDelay = delay
# check if FCFS is the ideal solution
if 0.0 >= delay.total_seconds():
self.Result = self.Configuration.Inbounds
self.ResultDelay = delay
return
# initial value for the optimization
self.Configuration.ThetaZero = 1.0 / (len(self.Configuration.Inbounds) * (delay.total_seconds() / 60.0))
self.PheromoneMatrix = np.ones(( len(self.Configuration.Inbounds), len(self.Configuration.Inbounds) ), dtype=float) * self.Configuration.ThetaZero
def optimize(self):
# FCFS is the best solution
if None != self.Result:
return
# define the tracking variables
bestSequence = None
# run the optimization loops
for _ in range(0, self.Configuration.ExplorationRuns):
# select the first inbound
index = random.randint(1, len(self.Configuration.Inbounds)) - 1
candidates = []
for _ in range(0, self.Configuration.AntCount):
# let the ant find a solution
ant = Ant(self.PheromoneMatrix, self.Configuration)
ant.findSolution(index)
# fallback to check if findSolution was successful
if None == ant.SequenceDelay or None == ant.Sequence:
sys.stderr.write('Invalid ANT run detected!')
sys.exit(-1)
candidates.append([ ant.SequenceDelay, ant.Sequence ])
# find the best solution in all candidates of this generation
bestCandidate = None
for candidate in candidates:
if None == bestCandidate or candidate[0] < bestCandidate[0]:
bestCandidate = candidate
dTheta = 1.0 / (candidate[0].total_seconds() / 60.0)
for i in range(1, len(candidate[1])):
update = (1.0 - self.Configuration.Epsilon) * self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] + dTheta
self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] = max(update, self.Configuration.ThetaZero)
# check if we find a new best candidate
if None != bestCandidate:
if None == bestSequence or bestCandidate[0] < bestSequence[0]:
bestSequence = bestCandidate
print(self.PheromoneMatrix)
# create the final sequence
if None != bestSequence:
# create the resulting sequence
self.ResultDelay = bestSequence[0]
self.Result = []
# finalize the sequence
rwyManager = RunwayManager(self.Configuration)
for i in range(0, len(bestSequence[1])):
self.Result.append(self.Configuration.Inbounds[bestSequence[1][i]])
Colony.associateInbound(rwyManager, self.Result[-1], self.Configuration.EarliestArrivalTime, True)