123 lines
4.9 KiB
Python
123 lines
4.9 KiB
Python
#!/usr/bin/env python
|
|
|
|
from datetime import datetime, timedelta
|
|
import math
|
|
import numpy as np
|
|
import random
|
|
import bisect
|
|
import itertools
|
|
|
|
from aman.sys.aco.Configuration import Configuration
|
|
from aman.sys.aco.RunwayManager import RunwayManager
|
|
from aman.sys.aco.Node import Node
|
|
|
|
# This class implements a single ant of the following paper:
|
|
# https://sci-hub.mksa.top/10.1109/cec.2019.8790135
|
|
class Ant:
|
|
def __init__(self, pheromoneTable : np.array, configuration : Configuration, nodes):
|
|
self.Configuration = configuration
|
|
self.Nodes = nodes
|
|
self.RunwayManager = RunwayManager(self.Configuration)
|
|
self.InboundSelected = [ False ] * len(self.Nodes)
|
|
self.InboundScore = np.zeros([ len(self.Nodes), 1 ])
|
|
self.PheromoneMatrix = pheromoneTable
|
|
self.SequenceDelay = timedelta(seconds = 0)
|
|
self.Sequence = None
|
|
|
|
# Implements function (5)
|
|
def heuristicInformation(self, current : int):
|
|
_, _, _, eta, _ = self.RunwayManager.selectArrivalRunway(self.Nodes[current], self.Configuration.EarliestArrivalTime)
|
|
if None == eta:
|
|
return -1.0
|
|
|
|
inboundDelay = eta - self.Nodes[current].Inbound.InitialArrivalTime
|
|
|
|
# calculate the fraction with a mix of the unused runway time and the delay of single aircrafts
|
|
heuristic = inboundDelay.total_seconds() / 60.0
|
|
heuristic = (1.0 / (heuristic or 1)) ** self.Configuration.Beta
|
|
return heuristic
|
|
|
|
# Implements functions (3), (6)
|
|
def selectNextLandingIndex(self):
|
|
q = float(random.randint(0, 100)) / 100
|
|
weights = []
|
|
|
|
if q <= self.Configuration.PseudoRandomSelectionRate:
|
|
for i in range(0, len(self.InboundSelected)):
|
|
if False == self.InboundSelected[i]:
|
|
weights.append(self.heuristicInformation(i))
|
|
else:
|
|
# roulette selection strategy
|
|
pheromoneScale = 0.0
|
|
for i in range(0, len(self.InboundSelected)):
|
|
if False == self.InboundSelected[i]:
|
|
pheromoneScale += self.heuristicInformation(i)
|
|
|
|
for i in range(0, len(self.InboundSelected)):
|
|
if False == self.InboundSelected[i]:
|
|
weights.append(self.heuristicInformation(i) / (pheromoneScale or 1))
|
|
|
|
# something was wrong in the runway selection
|
|
if -1.0 in weights:
|
|
return None
|
|
|
|
total = sum(weights)
|
|
cumdist = list(itertools.accumulate(weights)) + [total]
|
|
candidateIndex = bisect.bisect(cumdist, random.random() * total)
|
|
|
|
for i in range(0, len(self.InboundSelected)):
|
|
if False == self.InboundSelected[i]:
|
|
if 0 == candidateIndex:
|
|
return i
|
|
else:
|
|
candidateIndex -= 1
|
|
|
|
return None
|
|
|
|
def associateInbound(self, node : Node, earliestArrivalTime : datetime):
|
|
# prepare the statistics
|
|
_, _, rwy, eta, _ = self.RunwayManager.selectArrivalRunway(node, self.Configuration.EarliestArrivalTime)
|
|
eta = max(earliestArrivalTime, eta)
|
|
|
|
node.Inbound.PlannedRunway = rwy
|
|
node.Inbound.PlannedStar = node.ArrivalCandidates[rwy.Name].Star
|
|
node.Inbound.PlannedArrivalTime = eta
|
|
node.Inbound.PlannedArrivalRoute = node.ArrivalCandidates[rwy.Name].ArrivalRoute
|
|
node.Inbound.InitialArrivalTime = node.ArrivalCandidates[rwy.Name].InitialArrivalTime
|
|
self.RunwayManager.registerNode(node, rwy.Name)
|
|
|
|
delay = node.Inbound.PlannedArrivalTime - node.Inbound.InitialArrivalTime
|
|
if 0.0 < delay.total_seconds():
|
|
return delay, rwy
|
|
else:
|
|
return timedelta(seconds = 0), rwy
|
|
|
|
def findSolution(self, first : int):
|
|
self.Sequence = []
|
|
|
|
# select the first inbound
|
|
self.InboundSelected[first] = True
|
|
delay, _ = self.associateInbound(self.Nodes[first], self.Configuration.EarliestArrivalTime)
|
|
self.Sequence.append(first)
|
|
self.SequenceDelay += delay
|
|
|
|
while 1:
|
|
index = self.selectNextLandingIndex()
|
|
if None == index:
|
|
break
|
|
|
|
self.InboundSelected[index] = True
|
|
delay, _ = self.associateInbound(self.Nodes[index], self.Configuration.EarliestArrivalTime)
|
|
self.SequenceDelay += delay
|
|
self.Sequence.append(index)
|
|
|
|
# update the local pheromone
|
|
update = (1.0 - self.Configuration.PropagationRatio) * self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]]
|
|
update += self.Configuration.PropagationRatio * self.Configuration.ThetaZero
|
|
self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]] = max(self.Configuration.ThetaZero, update)
|
|
|
|
# validate that nothing went wrong
|
|
if len(self.Sequence) != len(self.Nodes):
|
|
self.SequenceDelay = None
|
|
self.Sequence = None
|