use the shortcut-path as the ITA for the optimization, but use the full path as the ITA itself

This commit is contained in:
Sven Czarnian
2021-12-16 10:22:43 +01:00
vanhempi 6fd4f4da1b
commit cd5c21d099
5 muutettua tiedostoa jossa 67 lisäystä ja 136 poistoa

Näytä tiedosto

@@ -24,55 +24,35 @@ class Ant:
self.SequenceDelay = timedelta(seconds = 0)
self.Sequence = None
def qualifyDelay(delay, node, runway):
if 0.0 > delay.total_seconds():
delay = timedelta(seconds = 0)
# calculate the heuristic scaling to punish increased delays for single inbounds
scaledDelay = delay.total_seconds() / node.ArrivalCandidates[runway.Name].MaximumTimeToLose.total_seconds()
return max(1.0 / (99.0 * (scaledDelay ** 2) + 1), 0.01)
# Implements function (5), but adapted to the following logic:
# An adaption of the heuristic function is used:
# - Calculates the unused runway time (time between two consecutive landings)
# - Calculates a ratio between the inbound delay and the unused runway time
# - Weight the overall ratio based on maximum time to lose to punish high time to lose rates while other flights are gaining time
def heuristicInformation(self, preceeding : int, current : int):
rwy, eta, unusedRunwayTime = self.RunwayManager.selectArrivalRunway(self.Nodes[current], True, self.Configuration.EarliestArrivalTime)
inboundDelay = eta - self.Nodes[current].ArrivalCandidates[rwy.Name].InitialArrivalTime
if 0.0 > inboundDelay.total_seconds():
inboundDelay = timedelta(seconds = 0)
# Implements function (5)
def heuristicInformation(self, current : int):
_, eta, _ = self.RunwayManager.selectArrivalRunway(self.Nodes[current], self.Configuration.EarliestArrivalTime)
inboundDelay = eta - self.Nodes[current].Inbound.InitialArrivalTime
# calculate the fraction with a mix of the unused runway time and the delay of single aircrafts
fraction = self.Configuration.RunwayOccupasionRatio * unusedRunwayTime.total_seconds()
fraction += (1.0 - self.Configuration.RunwayOccupasionRatio) * inboundDelay.total_seconds()
fraction += self.SequenceDelay.total_seconds()
fraction /= 60.0
# calculate the heuristic scaling to punish increased delays for single inbounds
weight = Ant.qualifyDelay(inboundDelay, self.Nodes[current], rwy)
return weight * self.PheromoneMatrix[preceeding, current] * ((1.0 / (fraction or 1)) ** self.Configuration.Beta)
heuristic = inboundDelay.total_seconds() / 60.0
heuristic = (1.0 / (heuristic or 1)) ** self.Configuration.Beta
return heuristic
# Implements functions (3), (6)
def selectNextLandingIndex(self, preceedingIndex : int):
def selectNextLandingIndex(self):
q = float(random.randint(0, 100)) / 100
weights = []
if q <= self.Configuration.PseudoRandomSelectionRate:
for i in range(0, len(self.InboundSelected)):
if False == self.InboundSelected[i]:
weights.append(self.heuristicInformation(preceedingIndex, i))
weights.append(self.heuristicInformation(i))
else:
# roulette selection strategy
pheromoneScale = 0.0
for i in range(0, len(self.InboundSelected)):
if False == self.InboundSelected[i]:
pheromoneScale += self.heuristicInformation(preceedingIndex, i)
pheromoneScale += self.heuristicInformation(i)
for i in range(0, len(self.InboundSelected)):
if False == self.InboundSelected[i]:
weights.append(self.heuristicInformation(preceedingIndex, i) / (pheromoneScale or 1))
weights.append(self.heuristicInformation(i) / (pheromoneScale or 1))
total = sum(weights)
cumdist = list(itertools.accumulate(weights)) + [total]
@@ -89,7 +69,7 @@ class Ant:
def associateInbound(self, node : Node, earliestArrivalTime : datetime):
# prepare the statistics
rwy, eta, _ = self.RunwayManager.selectArrivalRunway(node, True, self.Configuration.EarliestArrivalTime)
rwy, eta, _ = self.RunwayManager.selectArrivalRunway(node, self.Configuration.EarliestArrivalTime)
eta = max(earliestArrivalTime, eta)
node.Inbound.PlannedRunway = rwy
@@ -110,20 +90,18 @@ class Ant:
# select the first inbound
self.InboundSelected[first] = True
delay, rwy = self.associateInbound(self.Nodes[first], self.Configuration.EarliestArrivalTime)
self.InboundScore[0] = Ant.qualifyDelay(delay, self.Nodes[first], rwy)
delay, _ = self.associateInbound(self.Nodes[first], self.Configuration.EarliestArrivalTime)
self.Sequence.append(first)
self.SequenceDelay += delay
while 1:
index = self.selectNextLandingIndex(self.Sequence[-1])
index = self.selectNextLandingIndex()
if None == index:
break
self.InboundSelected[index] = True
delay, rwy = self.associateInbound(self.Nodes[index], self.Configuration.EarliestArrivalTime)
delay, _ = self.associateInbound(self.Nodes[index], self.Configuration.EarliestArrivalTime)
self.SequenceDelay += delay
self.InboundScore[len(self.Sequence)] = Ant.qualifyDelay(delay, self.Nodes[index], rwy)
self.Sequence.append(index)
# update the local pheromone
@@ -134,7 +112,4 @@ class Ant:
# validate that nothing went wrong
if len(self.Sequence) != len(self.Nodes):
self.SequenceDelay = None
self.SequenceScore = None
self.Sequence = None
else:
self.SequenceScore = np.median(self.InboundScore)

Näytä tiedosto

@@ -17,8 +17,8 @@ from aman.types.Inbound import Inbound
# This class implements the ant colony of the following paper:
# https://sci-hub.mksa.top/10.1109/cec.2019.8790135
class Colony:
def associateInbound(rwyManager : RunwayManager, node : Node, earliestArrivalTime : datetime, useITA : bool):
rwy, eta, _ = rwyManager.selectArrivalRunway(node, useITA, earliestArrivalTime)
def associateInbound(rwyManager : RunwayManager, node : Node, earliestArrivalTime : datetime):
rwy, eta, _ = rwyManager.selectArrivalRunway(node, earliestArrivalTime)
eta = max(earliestArrivalTime, eta)
node.Inbound.PlannedRunway = rwy
@@ -34,7 +34,7 @@ class Colony:
# assume that the nodes are sorted in FCFS order
for node in nodes:
Colony.associateInbound(rwyManager, node, earliestArrivalTime, False)
Colony.associateInbound(rwyManager, node, earliestArrivalTime)
overallDelay += node.Inbound.PlannedArrivalTime - node.Inbound.InitialArrivalTime
return overallDelay
@@ -62,6 +62,17 @@ class Colony:
self.Configuration.ThetaZero = 1.0 / (len(self.Nodes) * (delay.total_seconds() / 60.0))
self.PheromoneMatrix = np.ones(( len(self.Nodes), len(self.Nodes) ), dtype=float) * self.Configuration.ThetaZero
def sequenceAndPredictInbound(self, rwyManager : RunwayManager, node : Node):
self.Result.append(node)
Colony.associateInbound(rwyManager, node, self.Configuration.EarliestArrivalTime)
reqTimeDelta = self.Result[-1].Inbound.InitialArrivalTime - self.Result[-1].Inbound.PlannedArrivalTime
self.Result[-1].Inbound.PlannedArrivalRoute[0].PTA = self.Result[-1].Inbound.PlannedArrivalRoute[0].ETA - reqTimeDelta
for i in range(1, len(self.Result[-1].Inbound.PlannedArrivalRoute)):
prev = self.Result[-1].Inbound.PlannedArrivalRoute[i - 1]
current = self.Result[-1].Inbound.PlannedArrivalRoute[i]
current.PTA = prev.PTA + (current.ETA - prev.ETA)
def optimize(self):
# FCFS is the best solution
if None != self.Result:
@@ -82,23 +93,21 @@ class Colony:
ant.findSolution(index)
# fallback to check if findSolution was successful
if None == ant.SequenceDelay or None == ant.Sequence or None == ant.SequenceScore:
if None == ant.SequenceDelay or None == ant.Sequence:
sys.stderr.write('Invalid ANT run detected!')
sys.exit(-1)
candidates.append(
[
ant.SequenceDelay,
ant.Sequence,
ant.SequenceScore,
ant.SequenceDelay.total_seconds() / ant.SequenceScore
ant.Sequence
]
)
# find the best solution in all candidates of this generation
bestCandidate = None
for candidate in candidates:
if None == bestCandidate or candidate[3] < bestCandidate[3]:
if None == bestCandidate or candidate[0] < bestCandidate[0]:
bestCandidate = candidate
dTheta = 1.0 / ((candidate[0].total_seconds() / 60.0) or 1.0)
@@ -112,20 +121,19 @@ class Colony:
bestSequence = bestCandidate
# create the final sequence
if None != bestSequence:
self.Result = []
rwyManager = RunwayManager(self.Configuration)
# use the optimized sequence
if None != bestSequence and self.FcfsDelay >= bestSequence[0]:
# create the resulting sequence
self.ResultDelay = bestSequence[0]
self.Result = []
# finalize the sequence
rwyManager = RunwayManager(self.Configuration)
for i in range(0, len(bestSequence[1])):
self.Result.append(self.Nodes[bestSequence[1][i]])
Colony.associateInbound(rwyManager, self.Nodes[bestSequence[1][i]], self.Configuration.EarliestArrivalTime, True)
reqTimeDelta = self.Result[-1].Inbound.InitialArrivalTime - self.Result[-1].Inbound.PlannedArrivalTime
self.Result[-1].Inbound.PlannedArrivalRoute[0].PTA = self.Result[-1].Inbound.PlannedArrivalRoute[0].ETA - reqTimeDelta
for i in range(1, len(self.Result[-1].Inbound.PlannedArrivalRoute)):
prev = self.Result[-1].Inbound.PlannedArrivalRoute[i - 1]
current = self.Result[-1].Inbound.PlannedArrivalRoute[i]
current.PTA = prev.PTA + (current.ETA - prev.ETA)
for idx in bestSequence[1]:
self.sequenceAndPredictInbound(rwyManager, self.Nodes[idx])
# use the FCFS sequence
else:
self.ResultDelay = self.FcfsDelay
for node in self.Nodes:
self.sequenceAndPredictInbound(node)

Näytä tiedosto

@@ -193,11 +193,6 @@ class Node:
if None != star:
flightTime, trackmiles, arrivalRoute, flightTimeOnStar = self.arrivalEstimation(identifier.Runway, star, weatherModel)
# calculate average speed gain
avgSpeed = trackmiles / (flightTime.total_seconds() / 3600.0)
avgSpeedDecrease = avgSpeed * 0.80
decreasedSpeedFlighttime = (trackmiles / avgSpeedDecrease) * 3600.0 # given in seconds
# use the the distance to the IAF for optimizations
timeUntilIAF = flightTime - flightTimeOnStar
if 0.0 > timeUntilIAF.total_seconds():
@@ -213,13 +208,10 @@ class Node:
ttg = timedelta(seconds = timeUntilIAF.total_seconds() * ttgRatio)
if (ttg.total_seconds() > ttgMax):
ttg = timedelta(seconds = ttgMax)
ttl = timedelta(seconds = decreasedSpeedFlighttime - flightTime.total_seconds())
ita = self.Inbound.ReportTime + flightTime
earliest = ita - ttg
latest = ita + ttl
self.ArrivalCandidates[identifier.Runway.Name] = ArrivalData(ttg = ttg, star = star, ita = ita, earliest = earliest,
ttl = ttl, latest = latest, route = arrivalRoute,
self.ArrivalCandidates[identifier.Runway.Name] = ArrivalData(star = star, ita = earliest, route = arrivalRoute,
trackmiles = trackmiles)
ita = self.ArrivalCandidates[identifier.Runway.Name].InitialArrivalTime

Näytä tiedosto

@@ -21,7 +21,7 @@ class RunwayManager:
if not runway.Runway.Name in self.RunwayInbounds:
self.RunwayInbounds[runway.Runway.Name] = None
def calculateEarliestArrivalTime(self, runway : str, node : Node, useETA : bool, earliestArrivalTime : datetime):
def calculateEarliestArrivalTime(self, runway : str, node : Node, earliestArrivalTime : datetime):
constrainedETA = None
if None != self.RunwayInbounds[runway]:
@@ -42,40 +42,29 @@ class RunwayManager:
if None == constrainedETA or candidate > constrainedETA:
constrainedETA = candidate
# get the arrival time on the runway of the inbound
if True == useETA:
arrivalTime = node.ArrivalCandidates[runway].EarliestArrivalTime
else:
arrivalTime = node.ArrivalCandidates[runway].InitialArrivalTime
if None == constrainedETA:
eta = max(arrivalTime, earliestArrivalTime)
eta = max(node.ArrivalCandidates[runway].InitialArrivalTime, earliestArrivalTime)
else:
eta = max(arrivalTime, max(constrainedETA, earliestArrivalTime))
eta = max(node.ArrivalCandidates[runway].InitialArrivalTime, max(constrainedETA, earliestArrivalTime))
return eta, eta - arrivalTime
return eta, eta - node.ArrivalCandidates[runway].InitialArrivalTime
def selectShallShouldMayArrivalRunway(self, node : Node, runways, useETA : bool, earliestArrivalTime : datetime):
def selectShallShouldMayArrivalRunway(self, node : Node, runways, earliestArrivalTime : datetime):
candidate = None
delay = None
for runway in runways:
if True == useETA:
reference = node.ArrivalCandidates[runway.Runway.Name].EarliestArrivalTime
else:
reference = node.ArrivalCandidates[runway.Runway.Name].InitialArrivalTime
eta, _ = self.calculateEarliestArrivalTime(runway.Runway.Name, node, useETA, earliestArrivalTime)
eta, _ = self.calculateEarliestArrivalTime(runway.Runway.Name, node, earliestArrivalTime)
if None == delay:
delay = eta - reference
delay = eta - node.ArrivalCandidates[runway.Runway.Name].InitialArrivalTime
candidate = runway
elif delay > (eta - reference):
delay = eta- reference
elif delay > (eta - node.ArrivalCandidates[runway.Runway.Name].InitialArrivalTime):
delay = eta- node.ArrivalCandidates[runway.Runway.Name].InitialArrivalTime
candidate = runway
return candidate
def executeShallShouldMayAssignment(self, node : Node, useETA : bool, earliestArrivalTime : datetime):
def executeShallShouldMayAssignment(self, node : Node, earliestArrivalTime : datetime):
shallRunways = []
shouldRunways = []
mayRunways = []
@@ -97,46 +86,41 @@ class RunwayManager:
if node.Inbound.Report.plannedGate in runway.ShouldAssignments[RunwayAssignmentType.GateAssignment]:
shouldRunways.append(runway)
if True == useETA:
reference = node.ArrivalCandidates[runway.Runway.Name].EarliestArrivalTime
else:
reference = node.ArrivalCandidates[runway.Runway.Name].InitialArrivalTime
# test the may assignments
if RunwayAssignmentType.AircraftType in runway.MayAssignments:
if node.Inbound.Report.aircraft.type in runway.MayAssignments[RunwayAssignmentType.AircraftType]:
eta, _ = self.calculateEarliestArrivalTime(runway.Runway.Name, node, useETA, earliestArrivalTime)
if (eta - reference) <= self.Configuration.AirportConfiguration.MaxDelayMay:
eta, _ = self.calculateEarliestArrivalTime(runway.Runway.Name, node, earliestArrivalTime)
if (eta - node.ArrivalCandidates[runway.Runway.Name].InitialArrivalTime) <= self.Configuration.AirportConfiguration.MaxDelayMay:
mayRunways.append(runway)
if RunwayAssignmentType.GateAssignment in runway.MayAssignments:
if node.Inbound.Report.plannedGate in runway.MayAssignments[RunwayAssignmentType.GateAssignment]:
eta, _ = self.calculateEarliestArrivalTime(runway.Runway.Name, node, useETA, earliestArrivalTime)
if (eta - reference) <= self.Configuration.AirportConfiguration.MaxDelayMay:
eta, _ = self.calculateEarliestArrivalTime(runway.Runway.Name, node, earliestArrivalTime)
if (eta - node.ArrivalCandidates[runway.Runway.Name].InitialArrivalTime) <= self.Configuration.AirportConfiguration.MaxDelayMay:
mayRunways.append(runway)
runway = self.selectShallShouldMayArrivalRunway(node, shallRunways, useETA, earliestArrivalTime)
runway = self.selectShallShouldMayArrivalRunway(node, shallRunways, earliestArrivalTime)
if None != runway:
return [ runway ]
runway = self.selectShallShouldMayArrivalRunway(node, shouldRunways, useETA, earliestArrivalTime)
runway = self.selectShallShouldMayArrivalRunway(node, shouldRunways, earliestArrivalTime)
if None != runway:
return [ runway ]
runway = self.selectShallShouldMayArrivalRunway(node, mayRunways, useETA, earliestArrivalTime)
runway = self.selectShallShouldMayArrivalRunway(node, mayRunways, earliestArrivalTime)
if None != runway:
return [ runway ]
return self.Configuration.RunwayConstraints.ActiveArrivalRunways
def selectArrivalRunway(self, node : Node, useETA : bool, earliestArrivalTime : datetime):
def selectArrivalRunway(self, node : Node, earliestArrivalTime : datetime):
availableRunways = self.Configuration.RunwayConstraints.ActiveArrivalRunways
if True == self.Configuration.RunwayConstraints.UseShallShouldMay:
availableRunways = self.executeShallShouldMayAssignment(node, useETA, earliestArrivalTime)
availableRunways = self.executeShallShouldMayAssignment(node, earliestArrivalTime)
else:
availableRunways = self.Configuration.RunwayConstraints.ActiveArrivalRunways
if 0 == len(availableRunways):
runway = self.Configuration.RunwayConstraints.ActiveArrivalRunways[0]
return runway, self.calculateEarliestArrivalTime(runway.Runway.Name, node, useETA, earliestArrivalTime)
return runway, self.calculateEarliestArrivalTime(runway.Runway.Name, node, earliestArrivalTime)
# start with the beginning
selectedRunway = None
@@ -145,7 +129,7 @@ class RunwayManager:
# get the runway with the earliest ETA
for runway in availableRunways:
candidate, delta = self.calculateEarliestArrivalTime(runway.Runway.Name, node, useETA, earliestArrivalTime)
candidate, delta = self.calculateEarliestArrivalTime(runway.Runway.Name, node, earliestArrivalTime)
if None == eta or eta > candidate:
selectedRunway = runway.Runway
lostTime = delta

Näytä tiedosto

@@ -7,30 +7,12 @@ from aman.types.ArrivalRoute import ArrivalRoute
class ArrivalData:
def __init__(self, **kargs):
self.Star = None
self.MaximumTimeToGain = None
self.MaximumTimeToLose = None
self.InitialArrivalTime = None
self.EarliestArrivalTime = None
self.LatestArrivalTime = None
self.ArrivalRoute = None
self.Trackmiles = None
for key, value in kargs.items():
if 'ttg' == key:
if True == isinstance(value, timedelta):
self.MaximumTimeToGain = value
elif True == isinstance(value, (int, float)):
self.MaximumTimeToGain = timedelta(seconds = float(value))
else:
raise Exception('Invalid type for ttg')
elif 'ttl' == key:
if True == isinstance(value, timedelta):
self.MaximumTimeToLose = value
elif True == isinstance(value, (int, float)):
self.MaximumTimeToLose = timedelta(seconds = float(value))
else:
raise Exception('Invalid type for ttl')
elif 'star' == key:
if 'star' == key:
if True == isinstance(value, ArrivalRoute):
self.Star = value
else:
@@ -40,16 +22,6 @@ class ArrivalData:
self.InitialArrivalTime = value
else:
raise Exception('Invalid type for ita')
elif 'earliest' == key:
if True == isinstance(value, datetime):
self.EarliestArrivalTime = value
else:
raise Exception('Invalid type for earliest')
elif 'latest' == key:
if True == isinstance(value, datetime):
self.LatestArrivalTime = value
else:
raise Exception('Invalid type for latest')
elif 'route' == key:
self.ArrivalRoute = value
elif 'trackmiles' == key: