Files
aman-sys/aman/sys/aco/Ant.py

116 lines
4.8 KiB
Python

#!/usr/bin/env python
from datetime import datetime, timedelta
import math
import numpy as np
import random
import bisect
import itertools
from aman.sys.aco.Configuration import Configuration
from aman.sys.aco.RunwayManager import RunwayManager
from aman.sys.aco.Node import Node
# This class implements a single ant of the following paper:
# https://sci-hub.mksa.top/10.1109/cec.2019.8790135
class Ant:
def __init__(self, pheromoneTable : np.array, configuration : Configuration, nodes):
self.Configuration = configuration
self.Nodes = nodes
self.RunwayManager = RunwayManager(self.Configuration)
self.InboundSelected = [ False ] * len(self.Nodes)
self.InboundScore = np.zeros([ len(self.Nodes), 1 ])
self.PheromoneMatrix = pheromoneTable
self.SequenceDelay = timedelta(seconds = 0)
self.Sequence = None
# Implements function (5)
def heuristicInformation(self, current : int):
_, eta, _ = self.RunwayManager.selectArrivalRunway(self.Nodes[current], self.Configuration.EarliestArrivalTime)
inboundDelay = eta - self.Nodes[current].Inbound.InitialArrivalTime
# calculate the fraction with a mix of the unused runway time and the delay of single aircrafts
heuristic = inboundDelay.total_seconds() / 60.0
heuristic = (1.0 / (heuristic or 1)) ** self.Configuration.Beta
return heuristic
# Implements functions (3), (6)
def selectNextLandingIndex(self):
q = float(random.randint(0, 100)) / 100
weights = []
if q <= self.Configuration.PseudoRandomSelectionRate:
for i in range(0, len(self.InboundSelected)):
if False == self.InboundSelected[i]:
weights.append(self.heuristicInformation(i))
else:
# roulette selection strategy
pheromoneScale = 0.0
for i in range(0, len(self.InboundSelected)):
if False == self.InboundSelected[i]:
pheromoneScale += self.heuristicInformation(i)
for i in range(0, len(self.InboundSelected)):
if False == self.InboundSelected[i]:
weights.append(self.heuristicInformation(i) / (pheromoneScale or 1))
total = sum(weights)
cumdist = list(itertools.accumulate(weights)) + [total]
candidateIndex = bisect.bisect(cumdist, random.random() * total)
for i in range(0, len(self.InboundSelected)):
if False == self.InboundSelected[i]:
if 0 == candidateIndex:
return i
else:
candidateIndex -= 1
return None
def associateInbound(self, node : Node, earliestArrivalTime : datetime):
# prepare the statistics
rwy, eta, _ = self.RunwayManager.selectArrivalRunway(node, self.Configuration.EarliestArrivalTime)
eta = max(earliestArrivalTime, eta)
node.Inbound.PlannedRunway = rwy
node.Inbound.PlannedStar = node.ArrivalCandidates[rwy.Name].Star
node.Inbound.PlannedArrivalTime = eta
node.Inbound.PlannedArrivalRoute = node.ArrivalCandidates[rwy.Name].ArrivalRoute
node.Inbound.InitialArrivalTime = node.ArrivalCandidates[rwy.Name].InitialArrivalTime
self.RunwayManager.RunwayInbounds[rwy.Name] = node
delay = node.Inbound.PlannedArrivalTime - node.Inbound.InitialArrivalTime
if 0.0 < delay.total_seconds():
return delay, rwy
else:
return timedelta(seconds = 0), rwy
def findSolution(self, first : int):
self.Sequence = []
# select the first inbound
self.InboundSelected[first] = True
delay, _ = self.associateInbound(self.Nodes[first], self.Configuration.EarliestArrivalTime)
self.Sequence.append(first)
self.SequenceDelay += delay
while 1:
index = self.selectNextLandingIndex()
if None == index:
break
self.InboundSelected[index] = True
delay, _ = self.associateInbound(self.Nodes[index], self.Configuration.EarliestArrivalTime)
self.SequenceDelay += delay
self.Sequence.append(index)
# update the local pheromone
update = (1.0 - self.Configuration.propagationRatio) * self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]]
update += self.Configuration.propagationRatio * self.Configuration.ThetaZero
self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]] = max(self.Configuration.ThetaZero, update)
# validate that nothing went wrong
if len(self.Sequence) != len(self.Nodes):
self.SequenceDelay = None
self.Sequence = None