Files
aman-sys/aman/sys/aco/Colony.py
2021-12-22 13:28:56 +01:00

154 lines
6.5 KiB
Python

#!/usr/bin/env python
from datetime import datetime as dt
from datetime import datetime, timedelta
import numpy as np
import pytz
import random
import sys
import time
from aman.sys.aco.Ant import Ant
from aman.sys.aco.Configuration import Configuration
from aman.sys.aco.Node import Node
from aman.sys.aco.RunwayManager import RunwayManager
from aman.types.Inbound import Inbound
# This class implements the ant colony of the following paper:
# https://sci-hub.mksa.top/10.1109/cec.2019.8790135
class Colony:
def associateInbound(rwyManager : RunwayManager, node : Node, earliestArrivalTime : datetime):
rwy, eta, _ = rwyManager.selectArrivalRunway(node, earliestArrivalTime)
if None == eta:
return False
eta = max(earliestArrivalTime, eta)
node.Inbound.PlannedRunway = rwy
node.Inbound.PlannedStar = node.ArrivalCandidates[rwy.Name].Star
node.Inbound.PlannedArrivalRoute = node.ArrivalCandidates[rwy.Name].ArrivalRoute
node.Inbound.PlannedArrivalTime = eta
node.Inbound.InitialArrivalTime = node.ArrivalCandidates[rwy.Name].InitialArrivalTime
node.Inbound.PlannedTrackmiles = node.ArrivalCandidates[rwy.Name].Trackmiles
rwyManager.registerNode(node, rwy.Name)
return True
def calculateInitialCosts(rwyManager : RunwayManager, nodes, earliestArrivalTime : datetime):
overallDelay = timedelta(seconds = 0)
# assume that the nodes are sorted in FCFS order
for node in nodes:
if False == Colony.associateInbound(rwyManager, node, earliestArrivalTime):
return None
overallDelay += node.Inbound.PlannedArrivalTime - node.Inbound.InitialArrivalTime
return overallDelay
def __init__(self, inbounds, configuration : Configuration):
self.Configuration = configuration
self.ResultDelay = None
self.FcfsDelay = None
self.Result = None
self.Nodes = []
# create the new planning instances
currentTime = dt.utcfromtimestamp(int(time.time())).replace(tzinfo = pytz.UTC)
for inbound in inbounds:
self.Nodes.append(Node(inbound, currentTime, self.Configuration.WeatherModel, self.Configuration.AirportConfiguration, self.Configuration.RunwayConstraints))
rwyManager = RunwayManager(self.Configuration)
delay = Colony.calculateInitialCosts(rwyManager, self.Nodes, self.Configuration.EarliestArrivalTime)
if None == delay:
return
self.FcfsDelay = delay
# run the optimization in every cycle to ensure optimal spacings based on TTG
if 0.0 >= delay.total_seconds():
delay = timedelta(seconds = 1.0)
# initial value for the optimization
self.Configuration.ThetaZero = 1.0 / (len(self.Nodes) * (delay.total_seconds() / 60.0))
self.PheromoneMatrix = np.ones(( len(self.Nodes), len(self.Nodes) ), dtype=float) * self.Configuration.ThetaZero
def sequenceAndPredictInbound(self, rwyManager : RunwayManager, node : Node):
self.Result.append(node)
Colony.associateInbound(rwyManager, node, self.Configuration.EarliestArrivalTime)
reqTimeDelta = self.Result[-1].Inbound.EnrouteArrivalTime - self.Result[-1].Inbound.PlannedArrivalTime
self.Result[-1].Inbound.PlannedArrivalRoute[0].PTA = self.Result[-1].Inbound.PlannedArrivalRoute[0].ETA - reqTimeDelta
for i in range(1, len(self.Result[-1].Inbound.PlannedArrivalRoute)):
prev = self.Result[-1].Inbound.PlannedArrivalRoute[i - 1]
current = self.Result[-1].Inbound.PlannedArrivalRoute[i]
current.PTA = prev.PTA + (current.ETA - prev.ETA)
def optimize(self):
if None == self.FcfsDelay:
return False
# define the tracking variables
bestSequence = None
# run the optimization loops
for _ in range(0, self.Configuration.ExplorationRuns):
# select the first inbound
index = random.randint(1, len(self.Nodes)) - 1
candidates = []
for _ in range(0, self.Configuration.AntCount):
# let the ant find a solution
ant = Ant(self.PheromoneMatrix, self.Configuration, self.Nodes)
ant.findSolution(index)
# fallback to check if findSolution was successful
if None == ant.SequenceDelay or None == ant.Sequence:
sys.stderr.write('Invalid ANT run detected!')
break
candidates.append(
[
ant.SequenceDelay,
ant.Sequence
]
)
# find the best solution in all candidates of this generation
bestCandidate = None
for candidate in candidates:
if None == bestCandidate or candidate[0] < bestCandidate[0]:
bestCandidate = candidate
if None != bestSequence:
dTheta = 1.0 / ((bestSequence[0].total_seconds() / 60.0) or 1.0)
for i in range(1, len(bestSequence[1])):
update = (1.0 - self.Configuration.Epsilon) * self.PheromoneMatrix[bestSequence[1][i - 1], bestSequence[1][i]] + self.Configuration.Epsilon * dTheta
self.PheromoneMatrix[bestSequence[1][i - 1], bestSequence[1][i]] = max(update, self.Configuration.ThetaZero)
# check if we find a new best candidate
if None != bestCandidate:
if None == bestSequence or bestCandidate[0] < bestSequence[0]:
bestSequence = bestCandidate
# found the optimal solution
if 1 >= bestSequence[0].total_seconds():
break
# create the final sequence
self.Result = []
rwyManager = RunwayManager(self.Configuration)
# use the optimized sequence
if None != bestSequence and self.FcfsDelay >= bestSequence[0]:
# create the resulting sequence
self.ResultDelay = bestSequence[0]
# finalize the sequence
for idx in bestSequence[1]:
self.sequenceAndPredictInbound(rwyManager, self.Nodes[idx])
# use the FCFS sequence
else:
self.ResultDelay = self.FcfsDelay
for node in self.Nodes:
self.sequenceAndPredictInbound(rwyManager, node)
return True