Files
aman-sys/aman/sys/aco/Colony.py
2021-11-29 17:56:28 +01:00

132 lines
6.0 KiB
Python

#!/usr/bin/env python
from datetime import datetime as dt
from datetime import datetime, timedelta
import numpy as np
import pytz
import random
import sys
import time
from aman.sys.aco.Ant import Ant
from aman.sys.aco.Configuration import Configuration
from aman.sys.aco.Node import Node
from aman.sys.aco.RunwayManager import RunwayManager
from aman.types.Inbound import Inbound
# This class implements the ant colony of the following paper:
# https://sci-hub.mksa.top/10.1109/cec.2019.8790135
class Colony:
def associateInbound(rwyManager : RunwayManager, node : Node, earliestArrivalTime : datetime, useITA : bool):
rwy, eta, _ = rwyManager.selectArrivalRunway(node, useITA, earliestArrivalTime)
eta = max(earliestArrivalTime, eta)
node.Inbound.PlannedRunway = rwy
node.Inbound.PlannedStar = node.ArrivalCandidates[rwy.Name].Star
node.Inbound.PlannedArrivalRoute = node.ArrivalCandidates[rwy.Name].ArrivalRoute
node.Inbound.PlannedArrivalTime = eta
node.Inbound.InitialArrivalTime = node.ArrivalCandidates[rwy.Name].InitialArrivalTime
node.Inbound.PlannedTrackmiles = node.ArrivalCandidates[rwy.Name].Trackmiles
rwyManager.RunwayInbounds[rwy.Name] = node
def calculateInitialCosts(rwyManager : RunwayManager, nodes, earliestArrivalTime : datetime):
overallDelay = timedelta(seconds = 0)
# assume that the nodes are sorted in FCFS order
for node in nodes:
Colony.associateInbound(rwyManager, node, earliestArrivalTime, False)
overallDelay += node.Inbound.PlannedArrivalTime - node.Inbound.InitialArrivalTime
return overallDelay
def __init__(self, inbounds, configuration : Configuration):
self.Configuration = configuration
self.ResultDelay = None
self.Result = None
self.Nodes = []
# create the new planning instances
currentTime = dt.utcfromtimestamp(int(time.time())).replace(tzinfo = pytz.UTC)
for inbound in inbounds:
self.Nodes.append(Node(inbound, currentTime, self.Configuration.WeatherModel, self.Configuration.NavData, self.Configuration.RunwayConstraints))
rwyManager = RunwayManager(self.Configuration)
delay = Colony.calculateInitialCosts(rwyManager, self.Nodes, self.Configuration.EarliestArrivalTime)
self.FcfsDelay = delay
# run the optimization in every cycle to ensure optimal spacings based on TTG
if 0.0 >= delay.total_seconds():
delay = timedelta(seconds = 1.0)
# initial value for the optimization
self.Configuration.ThetaZero = 1.0 / (len(self.Nodes) * (delay.total_seconds() / 60.0))
self.PheromoneMatrix = np.ones(( len(self.Nodes), len(self.Nodes) ), dtype=float) * self.Configuration.ThetaZero
def optimize(self):
# FCFS is the best solution
if None != self.Result:
return
# define the tracking variables
bestSequence = None
# run the optimization loops
for _ in range(0, self.Configuration.ExplorationRuns):
# select the first inbound
index = random.randint(1, len(self.Nodes)) - 1
candidates = []
for _ in range(0, self.Configuration.AntCount):
# let the ant find a solution
ant = Ant(self.PheromoneMatrix, self.Configuration, self.Nodes)
ant.findSolution(index)
# fallback to check if findSolution was successful
if None == ant.SequenceDelay or None == ant.Sequence or None == ant.SequenceScore:
sys.stderr.write('Invalid ANT run detected!')
sys.exit(-1)
candidates.append(
[
ant.SequenceDelay,
ant.Sequence,
ant.SequenceScore,
ant.SequenceDelay.total_seconds() / ant.SequenceScore
]
)
# find the best solution in all candidates of this generation
bestCandidate = None
for candidate in candidates:
if None == bestCandidate or candidate[3] < bestCandidate[3]:
bestCandidate = candidate
dTheta = 1.0 / ((candidate[0].total_seconds() / 60.0) or 1.0)
for i in range(1, len(candidate[1])):
update = (1.0 - self.Configuration.Epsilon) * self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] + dTheta
self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] = max(update, self.Configuration.ThetaZero)
# check if we find a new best candidate
if None != bestCandidate:
if None == bestSequence or bestCandidate[0] < bestSequence[0]:
bestSequence = bestCandidate
# create the final sequence
if None != bestSequence:
# create the resulting sequence
self.ResultDelay = bestSequence[0]
self.Result = []
# finalize the sequence
rwyManager = RunwayManager(self.Configuration)
for i in range(0, len(bestSequence[1])):
self.Result.append(self.Nodes[bestSequence[1][i]])
Colony.associateInbound(rwyManager, self.Nodes[bestSequence[1][i]], self.Configuration.EarliestArrivalTime, True)
reqTimeDelta = self.Result[-1].Inbound.InitialArrivalTime - self.Result[-1].Inbound.PlannedArrivalTime
self.Result[-1].Inbound.PlannedArrivalRoute[0].PTA = self.Result[-1].Inbound.PlannedArrivalRoute[0].ETA - reqTimeDelta
for i in range(1, len(self.Result[-1].Inbound.PlannedArrivalRoute)):
prev = self.Result[-1].Inbound.PlannedArrivalRoute[i - 1]
current = self.Result[-1].Inbound.PlannedArrivalRoute[i]
current.PTA = prev.PTA + (current.ETA - prev.ETA)