From cd5c21d099fa4a721cb0595296c4f261a10352ae Mon Sep 17 00:00:00 2001 From: Sven Czarnian Date: Thu, 16 Dec 2021 10:22:43 +0100 Subject: [PATCH] use the shortcut-path as the ITA for the optimization, but use the full path as the ITA itself --- aman/sys/aco/Ant.py | 55 +++++++++------------------------ aman/sys/aco/Colony.py | 50 +++++++++++++++++------------- aman/sys/aco/Node.py | 10 +----- aman/sys/aco/RunwayManager.py | 58 +++++++++++++---------------------- aman/types/ArrivalData.py | 30 +----------------- 5 files changed, 67 insertions(+), 136 deletions(-) diff --git a/aman/sys/aco/Ant.py b/aman/sys/aco/Ant.py index 5e7c8f1..324897c 100644 --- a/aman/sys/aco/Ant.py +++ b/aman/sys/aco/Ant.py @@ -24,55 +24,35 @@ class Ant: self.SequenceDelay = timedelta(seconds = 0) self.Sequence = None - def qualifyDelay(delay, node, runway): - if 0.0 > delay.total_seconds(): - delay = timedelta(seconds = 0) - - # calculate the heuristic scaling to punish increased delays for single inbounds - scaledDelay = delay.total_seconds() / node.ArrivalCandidates[runway.Name].MaximumTimeToLose.total_seconds() - return max(1.0 / (99.0 * (scaledDelay ** 2) + 1), 0.01) - - # Implements function (5), but adapted to the following logic: - # An adaption of the heuristic function is used: - # - Calculates the unused runway time (time between two consecutive landings) - # - Calculates a ratio between the inbound delay and the unused runway time - # - Weight the overall ratio based on maximum time to lose to punish high time to lose rates while other flights are gaining time - def heuristicInformation(self, preceeding : int, current : int): - rwy, eta, unusedRunwayTime = self.RunwayManager.selectArrivalRunway(self.Nodes[current], True, self.Configuration.EarliestArrivalTime) - inboundDelay = eta - self.Nodes[current].ArrivalCandidates[rwy.Name].InitialArrivalTime - if 0.0 > inboundDelay.total_seconds(): - inboundDelay = timedelta(seconds = 0) + # Implements function (5) + def heuristicInformation(self, current : int): + _, eta, _ = self.RunwayManager.selectArrivalRunway(self.Nodes[current], self.Configuration.EarliestArrivalTime) + inboundDelay = eta - self.Nodes[current].Inbound.InitialArrivalTime # calculate the fraction with a mix of the unused runway time and the delay of single aircrafts - fraction = self.Configuration.RunwayOccupasionRatio * unusedRunwayTime.total_seconds() - fraction += (1.0 - self.Configuration.RunwayOccupasionRatio) * inboundDelay.total_seconds() - fraction += self.SequenceDelay.total_seconds() - fraction /= 60.0 - - # calculate the heuristic scaling to punish increased delays for single inbounds - weight = Ant.qualifyDelay(inboundDelay, self.Nodes[current], rwy) - - return weight * self.PheromoneMatrix[preceeding, current] * ((1.0 / (fraction or 1)) ** self.Configuration.Beta) + heuristic = inboundDelay.total_seconds() / 60.0 + heuristic = (1.0 / (heuristic or 1)) ** self.Configuration.Beta + return heuristic # Implements functions (3), (6) - def selectNextLandingIndex(self, preceedingIndex : int): + def selectNextLandingIndex(self): q = float(random.randint(0, 100)) / 100 weights = [] if q <= self.Configuration.PseudoRandomSelectionRate: for i in range(0, len(self.InboundSelected)): if False == self.InboundSelected[i]: - weights.append(self.heuristicInformation(preceedingIndex, i)) + weights.append(self.heuristicInformation(i)) else: # roulette selection strategy pheromoneScale = 0.0 for i in range(0, len(self.InboundSelected)): if False == self.InboundSelected[i]: - pheromoneScale += self.heuristicInformation(preceedingIndex, i) + pheromoneScale += self.heuristicInformation(i) for i in range(0, len(self.InboundSelected)): if False == self.InboundSelected[i]: - weights.append(self.heuristicInformation(preceedingIndex, i) / (pheromoneScale or 1)) + weights.append(self.heuristicInformation(i) / (pheromoneScale or 1)) total = sum(weights) cumdist = list(itertools.accumulate(weights)) + [total] @@ -89,7 +69,7 @@ class Ant: def associateInbound(self, node : Node, earliestArrivalTime : datetime): # prepare the statistics - rwy, eta, _ = self.RunwayManager.selectArrivalRunway(node, True, self.Configuration.EarliestArrivalTime) + rwy, eta, _ = self.RunwayManager.selectArrivalRunway(node, self.Configuration.EarliestArrivalTime) eta = max(earliestArrivalTime, eta) node.Inbound.PlannedRunway = rwy @@ -110,20 +90,18 @@ class Ant: # select the first inbound self.InboundSelected[first] = True - delay, rwy = self.associateInbound(self.Nodes[first], self.Configuration.EarliestArrivalTime) - self.InboundScore[0] = Ant.qualifyDelay(delay, self.Nodes[first], rwy) + delay, _ = self.associateInbound(self.Nodes[first], self.Configuration.EarliestArrivalTime) self.Sequence.append(first) self.SequenceDelay += delay while 1: - index = self.selectNextLandingIndex(self.Sequence[-1]) + index = self.selectNextLandingIndex() if None == index: break self.InboundSelected[index] = True - delay, rwy = self.associateInbound(self.Nodes[index], self.Configuration.EarliestArrivalTime) + delay, _ = self.associateInbound(self.Nodes[index], self.Configuration.EarliestArrivalTime) self.SequenceDelay += delay - self.InboundScore[len(self.Sequence)] = Ant.qualifyDelay(delay, self.Nodes[index], rwy) self.Sequence.append(index) # update the local pheromone @@ -134,7 +112,4 @@ class Ant: # validate that nothing went wrong if len(self.Sequence) != len(self.Nodes): self.SequenceDelay = None - self.SequenceScore = None self.Sequence = None - else: - self.SequenceScore = np.median(self.InboundScore) diff --git a/aman/sys/aco/Colony.py b/aman/sys/aco/Colony.py index c957b3e..b3526f6 100644 --- a/aman/sys/aco/Colony.py +++ b/aman/sys/aco/Colony.py @@ -17,8 +17,8 @@ from aman.types.Inbound import Inbound # This class implements the ant colony of the following paper: # https://sci-hub.mksa.top/10.1109/cec.2019.8790135 class Colony: - def associateInbound(rwyManager : RunwayManager, node : Node, earliestArrivalTime : datetime, useITA : bool): - rwy, eta, _ = rwyManager.selectArrivalRunway(node, useITA, earliestArrivalTime) + def associateInbound(rwyManager : RunwayManager, node : Node, earliestArrivalTime : datetime): + rwy, eta, _ = rwyManager.selectArrivalRunway(node, earliestArrivalTime) eta = max(earliestArrivalTime, eta) node.Inbound.PlannedRunway = rwy @@ -34,7 +34,7 @@ class Colony: # assume that the nodes are sorted in FCFS order for node in nodes: - Colony.associateInbound(rwyManager, node, earliestArrivalTime, False) + Colony.associateInbound(rwyManager, node, earliestArrivalTime) overallDelay += node.Inbound.PlannedArrivalTime - node.Inbound.InitialArrivalTime return overallDelay @@ -62,6 +62,17 @@ class Colony: self.Configuration.ThetaZero = 1.0 / (len(self.Nodes) * (delay.total_seconds() / 60.0)) self.PheromoneMatrix = np.ones(( len(self.Nodes), len(self.Nodes) ), dtype=float) * self.Configuration.ThetaZero + def sequenceAndPredictInbound(self, rwyManager : RunwayManager, node : Node): + self.Result.append(node) + Colony.associateInbound(rwyManager, node, self.Configuration.EarliestArrivalTime) + + reqTimeDelta = self.Result[-1].Inbound.InitialArrivalTime - self.Result[-1].Inbound.PlannedArrivalTime + self.Result[-1].Inbound.PlannedArrivalRoute[0].PTA = self.Result[-1].Inbound.PlannedArrivalRoute[0].ETA - reqTimeDelta + for i in range(1, len(self.Result[-1].Inbound.PlannedArrivalRoute)): + prev = self.Result[-1].Inbound.PlannedArrivalRoute[i - 1] + current = self.Result[-1].Inbound.PlannedArrivalRoute[i] + current.PTA = prev.PTA + (current.ETA - prev.ETA) + def optimize(self): # FCFS is the best solution if None != self.Result: @@ -82,23 +93,21 @@ class Colony: ant.findSolution(index) # fallback to check if findSolution was successful - if None == ant.SequenceDelay or None == ant.Sequence or None == ant.SequenceScore: + if None == ant.SequenceDelay or None == ant.Sequence: sys.stderr.write('Invalid ANT run detected!') sys.exit(-1) candidates.append( [ ant.SequenceDelay, - ant.Sequence, - ant.SequenceScore, - ant.SequenceDelay.total_seconds() / ant.SequenceScore + ant.Sequence ] ) # find the best solution in all candidates of this generation bestCandidate = None for candidate in candidates: - if None == bestCandidate or candidate[3] < bestCandidate[3]: + if None == bestCandidate or candidate[0] < bestCandidate[0]: bestCandidate = candidate dTheta = 1.0 / ((candidate[0].total_seconds() / 60.0) or 1.0) @@ -112,20 +121,19 @@ class Colony: bestSequence = bestCandidate # create the final sequence - if None != bestSequence: + self.Result = [] + rwyManager = RunwayManager(self.Configuration) + + # use the optimized sequence + if None != bestSequence and self.FcfsDelay >= bestSequence[0]: # create the resulting sequence self.ResultDelay = bestSequence[0] - self.Result = [] # finalize the sequence - rwyManager = RunwayManager(self.Configuration) - for i in range(0, len(bestSequence[1])): - self.Result.append(self.Nodes[bestSequence[1][i]]) - Colony.associateInbound(rwyManager, self.Nodes[bestSequence[1][i]], self.Configuration.EarliestArrivalTime, True) - - reqTimeDelta = self.Result[-1].Inbound.InitialArrivalTime - self.Result[-1].Inbound.PlannedArrivalTime - self.Result[-1].Inbound.PlannedArrivalRoute[0].PTA = self.Result[-1].Inbound.PlannedArrivalRoute[0].ETA - reqTimeDelta - for i in range(1, len(self.Result[-1].Inbound.PlannedArrivalRoute)): - prev = self.Result[-1].Inbound.PlannedArrivalRoute[i - 1] - current = self.Result[-1].Inbound.PlannedArrivalRoute[i] - current.PTA = prev.PTA + (current.ETA - prev.ETA) + for idx in bestSequence[1]: + self.sequenceAndPredictInbound(rwyManager, self.Nodes[idx]) + # use the FCFS sequence + else: + self.ResultDelay = self.FcfsDelay + for node in self.Nodes: + self.sequenceAndPredictInbound(node) diff --git a/aman/sys/aco/Node.py b/aman/sys/aco/Node.py index 329cc2f..49ffbb2 100644 --- a/aman/sys/aco/Node.py +++ b/aman/sys/aco/Node.py @@ -193,11 +193,6 @@ class Node: if None != star: flightTime, trackmiles, arrivalRoute, flightTimeOnStar = self.arrivalEstimation(identifier.Runway, star, weatherModel) - # calculate average speed gain - avgSpeed = trackmiles / (flightTime.total_seconds() / 3600.0) - avgSpeedDecrease = avgSpeed * 0.80 - decreasedSpeedFlighttime = (trackmiles / avgSpeedDecrease) * 3600.0 # given in seconds - # use the the distance to the IAF for optimizations timeUntilIAF = flightTime - flightTimeOnStar if 0.0 > timeUntilIAF.total_seconds(): @@ -213,13 +208,10 @@ class Node: ttg = timedelta(seconds = timeUntilIAF.total_seconds() * ttgRatio) if (ttg.total_seconds() > ttgMax): ttg = timedelta(seconds = ttgMax) - ttl = timedelta(seconds = decreasedSpeedFlighttime - flightTime.total_seconds()) ita = self.Inbound.ReportTime + flightTime earliest = ita - ttg - latest = ita + ttl - self.ArrivalCandidates[identifier.Runway.Name] = ArrivalData(ttg = ttg, star = star, ita = ita, earliest = earliest, - ttl = ttl, latest = latest, route = arrivalRoute, + self.ArrivalCandidates[identifier.Runway.Name] = ArrivalData(star = star, ita = earliest, route = arrivalRoute, trackmiles = trackmiles) ita = self.ArrivalCandidates[identifier.Runway.Name].InitialArrivalTime diff --git a/aman/sys/aco/RunwayManager.py b/aman/sys/aco/RunwayManager.py index 7d4cf03..f7f21ab 100644 --- a/aman/sys/aco/RunwayManager.py +++ b/aman/sys/aco/RunwayManager.py @@ -21,7 +21,7 @@ class RunwayManager: if not runway.Runway.Name in self.RunwayInbounds: self.RunwayInbounds[runway.Runway.Name] = None - def calculateEarliestArrivalTime(self, runway : str, node : Node, useETA : bool, earliestArrivalTime : datetime): + def calculateEarliestArrivalTime(self, runway : str, node : Node, earliestArrivalTime : datetime): constrainedETA = None if None != self.RunwayInbounds[runway]: @@ -42,40 +42,29 @@ class RunwayManager: if None == constrainedETA or candidate > constrainedETA: constrainedETA = candidate - # get the arrival time on the runway of the inbound - if True == useETA: - arrivalTime = node.ArrivalCandidates[runway].EarliestArrivalTime - else: - arrivalTime = node.ArrivalCandidates[runway].InitialArrivalTime - if None == constrainedETA: - eta = max(arrivalTime, earliestArrivalTime) + eta = max(node.ArrivalCandidates[runway].InitialArrivalTime, earliestArrivalTime) else: - eta = max(arrivalTime, max(constrainedETA, earliestArrivalTime)) + eta = max(node.ArrivalCandidates[runway].InitialArrivalTime, max(constrainedETA, earliestArrivalTime)) - return eta, eta - arrivalTime + return eta, eta - node.ArrivalCandidates[runway].InitialArrivalTime - def selectShallShouldMayArrivalRunway(self, node : Node, runways, useETA : bool, earliestArrivalTime : datetime): + def selectShallShouldMayArrivalRunway(self, node : Node, runways, earliestArrivalTime : datetime): candidate = None delay = None for runway in runways: - if True == useETA: - reference = node.ArrivalCandidates[runway.Runway.Name].EarliestArrivalTime - else: - reference = node.ArrivalCandidates[runway.Runway.Name].InitialArrivalTime - - eta, _ = self.calculateEarliestArrivalTime(runway.Runway.Name, node, useETA, earliestArrivalTime) + eta, _ = self.calculateEarliestArrivalTime(runway.Runway.Name, node, earliestArrivalTime) if None == delay: - delay = eta - reference + delay = eta - node.ArrivalCandidates[runway.Runway.Name].InitialArrivalTime candidate = runway - elif delay > (eta - reference): - delay = eta- reference + elif delay > (eta - node.ArrivalCandidates[runway.Runway.Name].InitialArrivalTime): + delay = eta- node.ArrivalCandidates[runway.Runway.Name].InitialArrivalTime candidate = runway return candidate - def executeShallShouldMayAssignment(self, node : Node, useETA : bool, earliestArrivalTime : datetime): + def executeShallShouldMayAssignment(self, node : Node, earliestArrivalTime : datetime): shallRunways = [] shouldRunways = [] mayRunways = [] @@ -97,46 +86,41 @@ class RunwayManager: if node.Inbound.Report.plannedGate in runway.ShouldAssignments[RunwayAssignmentType.GateAssignment]: shouldRunways.append(runway) - if True == useETA: - reference = node.ArrivalCandidates[runway.Runway.Name].EarliestArrivalTime - else: - reference = node.ArrivalCandidates[runway.Runway.Name].InitialArrivalTime - # test the may assignments if RunwayAssignmentType.AircraftType in runway.MayAssignments: if node.Inbound.Report.aircraft.type in runway.MayAssignments[RunwayAssignmentType.AircraftType]: - eta, _ = self.calculateEarliestArrivalTime(runway.Runway.Name, node, useETA, earliestArrivalTime) - if (eta - reference) <= self.Configuration.AirportConfiguration.MaxDelayMay: + eta, _ = self.calculateEarliestArrivalTime(runway.Runway.Name, node, earliestArrivalTime) + if (eta - node.ArrivalCandidates[runway.Runway.Name].InitialArrivalTime) <= self.Configuration.AirportConfiguration.MaxDelayMay: mayRunways.append(runway) if RunwayAssignmentType.GateAssignment in runway.MayAssignments: if node.Inbound.Report.plannedGate in runway.MayAssignments[RunwayAssignmentType.GateAssignment]: - eta, _ = self.calculateEarliestArrivalTime(runway.Runway.Name, node, useETA, earliestArrivalTime) - if (eta - reference) <= self.Configuration.AirportConfiguration.MaxDelayMay: + eta, _ = self.calculateEarliestArrivalTime(runway.Runway.Name, node, earliestArrivalTime) + if (eta - node.ArrivalCandidates[runway.Runway.Name].InitialArrivalTime) <= self.Configuration.AirportConfiguration.MaxDelayMay: mayRunways.append(runway) - runway = self.selectShallShouldMayArrivalRunway(node, shallRunways, useETA, earliestArrivalTime) + runway = self.selectShallShouldMayArrivalRunway(node, shallRunways, earliestArrivalTime) if None != runway: return [ runway ] - runway = self.selectShallShouldMayArrivalRunway(node, shouldRunways, useETA, earliestArrivalTime) + runway = self.selectShallShouldMayArrivalRunway(node, shouldRunways, earliestArrivalTime) if None != runway: return [ runway ] - runway = self.selectShallShouldMayArrivalRunway(node, mayRunways, useETA, earliestArrivalTime) + runway = self.selectShallShouldMayArrivalRunway(node, mayRunways, earliestArrivalTime) if None != runway: return [ runway ] return self.Configuration.RunwayConstraints.ActiveArrivalRunways - def selectArrivalRunway(self, node : Node, useETA : bool, earliestArrivalTime : datetime): + def selectArrivalRunway(self, node : Node, earliestArrivalTime : datetime): availableRunways = self.Configuration.RunwayConstraints.ActiveArrivalRunways if True == self.Configuration.RunwayConstraints.UseShallShouldMay: - availableRunways = self.executeShallShouldMayAssignment(node, useETA, earliestArrivalTime) + availableRunways = self.executeShallShouldMayAssignment(node, earliestArrivalTime) else: availableRunways = self.Configuration.RunwayConstraints.ActiveArrivalRunways if 0 == len(availableRunways): runway = self.Configuration.RunwayConstraints.ActiveArrivalRunways[0] - return runway, self.calculateEarliestArrivalTime(runway.Runway.Name, node, useETA, earliestArrivalTime) + return runway, self.calculateEarliestArrivalTime(runway.Runway.Name, node, earliestArrivalTime) # start with the beginning selectedRunway = None @@ -145,7 +129,7 @@ class RunwayManager: # get the runway with the earliest ETA for runway in availableRunways: - candidate, delta = self.calculateEarliestArrivalTime(runway.Runway.Name, node, useETA, earliestArrivalTime) + candidate, delta = self.calculateEarliestArrivalTime(runway.Runway.Name, node, earliestArrivalTime) if None == eta or eta > candidate: selectedRunway = runway.Runway lostTime = delta diff --git a/aman/types/ArrivalData.py b/aman/types/ArrivalData.py index 8ff7e47..7afedfa 100644 --- a/aman/types/ArrivalData.py +++ b/aman/types/ArrivalData.py @@ -7,30 +7,12 @@ from aman.types.ArrivalRoute import ArrivalRoute class ArrivalData: def __init__(self, **kargs): self.Star = None - self.MaximumTimeToGain = None - self.MaximumTimeToLose = None self.InitialArrivalTime = None - self.EarliestArrivalTime = None - self.LatestArrivalTime = None self.ArrivalRoute = None self.Trackmiles = None for key, value in kargs.items(): - if 'ttg' == key: - if True == isinstance(value, timedelta): - self.MaximumTimeToGain = value - elif True == isinstance(value, (int, float)): - self.MaximumTimeToGain = timedelta(seconds = float(value)) - else: - raise Exception('Invalid type for ttg') - elif 'ttl' == key: - if True == isinstance(value, timedelta): - self.MaximumTimeToLose = value - elif True == isinstance(value, (int, float)): - self.MaximumTimeToLose = timedelta(seconds = float(value)) - else: - raise Exception('Invalid type for ttl') - elif 'star' == key: + if 'star' == key: if True == isinstance(value, ArrivalRoute): self.Star = value else: @@ -40,16 +22,6 @@ class ArrivalData: self.InitialArrivalTime = value else: raise Exception('Invalid type for ita') - elif 'earliest' == key: - if True == isinstance(value, datetime): - self.EarliestArrivalTime = value - else: - raise Exception('Invalid type for earliest') - elif 'latest' == key: - if True == isinstance(value, datetime): - self.LatestArrivalTime = value - else: - raise Exception('Invalid type for latest') elif 'route' == key: self.ArrivalRoute = value elif 'trackmiles' == key: