123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- #!/usr/bin/env python
- from datetime import datetime as dt
- from datetime import datetime, timedelta
- import numpy as np
- import pytz
- import random
- import sys
- import time
- from aman.sys.aco.Ant import Ant
- from aman.sys.aco.Configuration import Configuration
- from aman.sys.aco.Node import Node
- from aman.sys.aco.RunwayManager import RunwayManager
- from aman.types.Inbound import Inbound
- # This class implements the ant colony of the following paper:
- # https://sci-hub.mksa.top/10.1109/cec.2019.8790135
- class Colony:
- def associateInbound(rwyManager : RunwayManager, node : Node, earliestArrivalTime : datetime):
- type, expectedRwy, rwy, eta, _ = rwyManager.selectArrivalRunway(node, earliestArrivalTime)
- if None == eta:
- return False
- eta = max(earliestArrivalTime, eta)
- node.Inbound.PlannedRunway = rwy
- node.Inbound.PlannedStar = node.ArrivalCandidates[rwy.Name].Star
- node.Inbound.PlannedArrivalRoute = node.ArrivalCandidates[rwy.Name].ArrivalRoute
- node.Inbound.PlannedArrivalTime = eta
- node.Inbound.InitialArrivalTime = node.ArrivalCandidates[rwy.Name].InitialArrivalTime
- node.Inbound.PlannedTrackmiles = node.ArrivalCandidates[rwy.Name].Trackmiles
- node.Inbound.AssignmentMode = type
- node.Inbound.ExpectedRunway = expectedRwy
- rwyManager.registerNode(node, rwy.Name)
- return True
- def calculateInitialCosts(rwyManager : RunwayManager, nodes, earliestArrivalTime : datetime):
- overallDelay = timedelta(seconds = 0)
- # assume that the nodes are sorted in FCFS order
- for node in nodes:
- if False == Colony.associateInbound(rwyManager, node, earliestArrivalTime):
- return None
- overallDelay += node.Inbound.PlannedArrivalTime - node.Inbound.InitialArrivalTime
- return overallDelay
- def __init__(self, inbounds, configuration : Configuration):
- self.Configuration = configuration
- self.ResultDelay = None
- self.FcfsDelay = None
- self.Result = None
- self.Nodes = []
- # create the new planning instances
- currentTime = dt.utcfromtimestamp(int(time.time())).replace(tzinfo = pytz.UTC)
- for inbound in inbounds:
- self.Nodes.append(Node(inbound, currentTime, self.Configuration.WeatherModel, self.Configuration.AirportConfiguration, self.Configuration.RunwayConstraints))
- rwyManager = RunwayManager(self.Configuration)
- delay = Colony.calculateInitialCosts(rwyManager, self.Nodes, self.Configuration.EarliestArrivalTime)
- if None == delay:
- return
- self.FcfsDelay = delay
- # run the optimization in every cycle to ensure optimal spacings based on TTG
- if 0.0 >= delay.total_seconds():
- delay = timedelta(seconds = 1.0)
- # initial value for the optimization
- self.Configuration.ThetaZero = 1.0 / (len(self.Nodes) * (delay.total_seconds() / 60.0))
- self.PheromoneMatrix = np.ones(( len(self.Nodes), len(self.Nodes) ), dtype=float) * self.Configuration.ThetaZero
- def sequenceAndPredictInbound(self, rwyManager : RunwayManager, node : Node):
- self.Result.append(node)
- Colony.associateInbound(rwyManager, node, self.Configuration.EarliestArrivalTime)
- reqTimeDelta = self.Result[-1].Inbound.EnrouteArrivalTime - self.Result[-1].Inbound.PlannedArrivalTime
- self.Result[-1].Inbound.PlannedArrivalRoute[0].PTA = self.Result[-1].Inbound.PlannedArrivalRoute[0].ETA - reqTimeDelta
- for i in range(1, len(self.Result[-1].Inbound.PlannedArrivalRoute)):
- prev = self.Result[-1].Inbound.PlannedArrivalRoute[i - 1]
- current = self.Result[-1].Inbound.PlannedArrivalRoute[i]
- current.PTA = prev.PTA + (current.ETA - prev.ETA)
- def optimize(self):
- if None == self.FcfsDelay:
- return False
- # define the tracking variables
- bestSequence = None
- # run the optimization loops
- for _ in range(0, self.Configuration.ExplorationRuns):
- # select the first inbound
- index = random.randint(1, len(self.Nodes)) - 1
- candidates = []
- for _ in range(0, self.Configuration.AntCount):
- # let the ant find a solution
- ant = Ant(self.PheromoneMatrix, self.Configuration, self.Nodes)
- ant.findSolution(index)
- # fallback to check if findSolution was successful
- if None == ant.SequenceDelay or None == ant.Sequence:
- sys.stderr.write('Invalid ANT run detected!')
- break
- candidates.append(
- [
- ant.SequenceDelay,
- ant.Sequence
- ]
- )
- # find the best solution in all candidates of this generation
- bestCandidate = None
- for candidate in candidates:
- if None == bestCandidate or candidate[0] < bestCandidate[0]:
- bestCandidate = candidate
- if None != bestSequence:
- dTheta = 1.0 / ((bestSequence[0].total_seconds() / 60.0) or 1.0)
- for i in range(1, len(bestSequence[1])):
- update = (1.0 - self.Configuration.Epsilon) * self.PheromoneMatrix[bestSequence[1][i - 1], bestSequence[1][i]] + self.Configuration.Epsilon * dTheta
- self.PheromoneMatrix[bestSequence[1][i - 1], bestSequence[1][i]] = max(update, self.Configuration.ThetaZero)
- # check if we find a new best candidate
- if None != bestCandidate:
- if None == bestSequence or bestCandidate[0] < bestSequence[0]:
- bestSequence = bestCandidate
- # found the optimal solution
- if 1 >= bestSequence[0].total_seconds():
- break
- # create the final sequence
- self.Result = []
- rwyManager = RunwayManager(self.Configuration)
- # use the optimized sequence
- if None != bestSequence and self.FcfsDelay >= bestSequence[0]:
- # create the resulting sequence
- self.ResultDelay = bestSequence[0]
- # finalize the sequence
- for idx in bestSequence[1]:
- self.sequenceAndPredictInbound(rwyManager, self.Nodes[idx])
- # use the FCFS sequence
- else:
- self.ResultDelay = self.FcfsDelay
- for node in self.Nodes:
- self.sequenceAndPredictInbound(rwyManager, node)
- return True
|