use weights to find better sequence with TTG and TTL constraints

This commit is contained in:
Sven Czarnian
2021-11-10 22:45:30 +01:00
parent dd9e725fc2
commit 97a2f24f28
2 changed files with 39 additions and 16 deletions

View File

@@ -18,15 +18,24 @@ class Ant:
self.Configuration = configuration self.Configuration = configuration
self.RunwayManager = RunwayManager(self.Configuration) self.RunwayManager = RunwayManager(self.Configuration)
self.InboundSelected = [ False ] * len(self.Configuration.Inbounds) self.InboundSelected = [ False ] * len(self.Configuration.Inbounds)
self.InboundScore = np.zeros([ len(self.Configuration.Inbounds), 1 ])
self.PheromoneMatrix = pheromoneTable self.PheromoneMatrix = pheromoneTable
self.SequenceDelay = timedelta(seconds = 0) self.SequenceDelay = timedelta(seconds = 0)
self.Sequence = None self.Sequence = None
def qualifyDelay(delay, inbound, runway):
if 0.0 > delay.total_seconds():
delay = timedelta(seconds = 0)
# calculate the heuristic scaling to punish increased delays for single inbounds
scaledDelay = delay.total_seconds() / inbound.ArrivalCandidates[runway.Name].MaximumTimeToLose.total_seconds()
return max(1.0 / (99.0 * (scaledDelay ** 2) + 1), 0.01)
# Implements function (5), but adapted to the following logic: # Implements function (5), but adapted to the following logic:
# An adaption of the heuristic function is used: # An adaption of the heuristic function is used:
# - Calculates the unused runway time (time between two consecutive landings) # - Calculates the unused runway time (time between two consecutive landings)
# - Calculates a ratio between the inbound delay and the unused runway time # - Calculates a ratio between the inbound delay and the unused runway time
# - Adds the current overal sequence delay to the heuristic function # - Weight the overall ratio based on maximum time to lose to punish high time to lose rates while other flights are gaining time
def heuristicInformation(self, preceeding : int, current : int): def heuristicInformation(self, preceeding : int, current : int):
rwy, eta, unusedRunwayTime = self.RunwayManager.selectArrivalRunway(self.Configuration.Inbounds[current], True, self.Configuration.EarliestArrivalTime) rwy, eta, unusedRunwayTime = self.RunwayManager.selectArrivalRunway(self.Configuration.Inbounds[current], True, self.Configuration.EarliestArrivalTime)
inboundDelay = eta - self.Configuration.Inbounds[current].ArrivalCandidates[rwy.Name].InitialArrivalTime inboundDelay = eta - self.Configuration.Inbounds[current].ArrivalCandidates[rwy.Name].InitialArrivalTime
@@ -39,7 +48,10 @@ class Ant:
fraction += self.SequenceDelay.total_seconds() fraction += self.SequenceDelay.total_seconds()
fraction /= 60.0 fraction /= 60.0
return self.PheromoneMatrix[preceeding, current] * ((1.0 / (fraction or 1)) ** self.Configuration.Beta) # calculate the heuristic scaling to punish increased delays for single inbounds
weight = Ant.qualifyDelay(inboundDelay, self.Configuration.Inbounds[current], rwy)
return weight * self.PheromoneMatrix[preceeding, current] * ((1.0 / (fraction or 1)) ** self.Configuration.Beta)
# Implements functions (3), (6) # Implements functions (3), (6)
def selectNextLandingIndex(self, preceedingIndex : int): def selectNextLandingIndex(self, preceedingIndex : int):
@@ -57,13 +69,9 @@ class Ant:
if False == self.InboundSelected[i]: if False == self.InboundSelected[i]:
pheromoneScale += self.heuristicInformation(preceedingIndex, i) pheromoneScale += self.heuristicInformation(preceedingIndex, i)
# fallback
if 0.0 >= pheromoneScale:
pheromoneScale = 1.0
for i in range(0, len(self.InboundSelected)): for i in range(0, len(self.InboundSelected)):
if False == self.InboundSelected[i]: if False == self.InboundSelected[i]:
weights.append(self.heuristicInformation(preceedingIndex, i) / pheromoneScale) weights.append(self.heuristicInformation(preceedingIndex, i) / (pheromoneScale or 1))
total = sum(weights) total = sum(weights)
cumdist = list(itertools.accumulate(weights)) + [total] cumdist = list(itertools.accumulate(weights)) + [total]
@@ -91,17 +99,19 @@ class Ant:
delay = inbound.PlannedArrivalTime - inbound.InitialArrivalTime delay = inbound.PlannedArrivalTime - inbound.InitialArrivalTime
if 0.0 < delay.total_seconds(): if 0.0 < delay.total_seconds():
return delay return delay, rwy
else: else:
return timedelta(seconds = 0) return timedelta(seconds = 0), rwy
def findSolution(self, first : int): def findSolution(self, first : int):
self.Sequence = [] self.Sequence = []
# select the first inbound # select the first inbound
self.InboundSelected[first] = True self.InboundSelected[first] = True
delay, rwy = self.associateInbound(self.Configuration.Inbounds[first], self.Configuration.EarliestArrivalTime)
self.InboundScore[0] = Ant.qualifyDelay(delay, self.Configuration.Inbounds[first], rwy)
self.Sequence.append(first) self.Sequence.append(first)
self.SequenceDelay += self.associateInbound(self.Configuration.Inbounds[first], self.Configuration.EarliestArrivalTime) self.SequenceDelay += delay
while 1: while 1:
index = self.selectNextLandingIndex(self.Sequence[-1]) index = self.selectNextLandingIndex(self.Sequence[-1])
@@ -109,7 +119,9 @@ class Ant:
break break
self.InboundSelected[index] = True self.InboundSelected[index] = True
self.SequenceDelay += self.associateInbound(self.Configuration.Inbounds[index], self.Configuration.EarliestArrivalTime) delay, rwy = self.associateInbound(self.Configuration.Inbounds[index], self.Configuration.EarliestArrivalTime)
self.SequenceDelay += delay
self.InboundScore[len(self.Sequence)] = Ant.qualifyDelay(delay, self.Configuration.Inbounds[index], rwy)
self.Sequence.append(index) self.Sequence.append(index)
# update the local pheromone # update the local pheromone
@@ -120,4 +132,7 @@ class Ant:
# validate that nothing went wrong # validate that nothing went wrong
if len(self.Sequence) != len(self.Configuration.Inbounds): if len(self.Sequence) != len(self.Configuration.Inbounds):
self.SequenceDelay = None self.SequenceDelay = None
self.SequenceScore = None
self.Sequence = None self.Sequence = None
else:
self.SequenceScore = np.median(self.InboundScore)

View File

@@ -35,7 +35,8 @@ class Colony:
# TODO remove this after testing and optimization # TODO remove this after testing and optimization
for runway in inbound.ArrivalCandidates: for runway in inbound.ArrivalCandidates:
inbound.ArrivalCandidates[runway].InitialArrivalTime = tmp inbound.ArrivalCandidates[runway].InitialArrivalTime = tmp
inbound.ArrivalCandidates[runway].EarliestArrivalTime = tmp inbound.ArrivalCandidates[runway].EarliestArrivalTime = tmp - inbound.ArrivalCandidates[runway].MaximumTimeToGain
inbound.ArrivalCandidates[runway].LatestArrivalTime = tmp + inbound.ArrivalCandidates[runway].MaximumTimeToLose
tmp += timedelta(seconds = 20) tmp += timedelta(seconds = 20)
Colony.associateInbound(rwyManager, inbound, earliestArrivalTime, False) Colony.associateInbound(rwyManager, inbound, earliestArrivalTime, False)
overallDelay += inbound.PlannedArrivalTime - inbound.InitialArrivalTime overallDelay += inbound.PlannedArrivalTime - inbound.InitialArrivalTime
@@ -83,19 +84,26 @@ class Colony:
ant.findSolution(index) ant.findSolution(index)
# fallback to check if findSolution was successful # fallback to check if findSolution was successful
if None == ant.SequenceDelay or None == ant.Sequence: if None == ant.SequenceDelay or None == ant.Sequence or None == ant.SequenceScore:
sys.stderr.write('Invalid ANT run detected!') sys.stderr.write('Invalid ANT run detected!')
sys.exit(-1) sys.exit(-1)
candidates.append([ ant.SequenceDelay, ant.Sequence ]) candidates.append(
[
ant.SequenceDelay,
ant.Sequence,
ant.SequenceScore,
ant.SequenceDelay.total_seconds() / ant.SequenceScore
]
)
# find the best solution in all candidates of this generation # find the best solution in all candidates of this generation
bestCandidate = None bestCandidate = None
for candidate in candidates: for candidate in candidates:
if None == bestCandidate or candidate[0] < bestCandidate[0]: if None == bestCandidate or candidate[3] < bestCandidate[3]:
bestCandidate = candidate bestCandidate = candidate
dTheta = 1.0 / (candidate[0].total_seconds() / 60.0) dTheta = 1.0 / ((candidate[0].total_seconds() / 60.0) or 1.0)
for i in range(1, len(candidate[1])): for i in range(1, len(candidate[1])):
update = (1.0 - self.Configuration.Epsilon) * self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] + dTheta update = (1.0 - self.Configuration.Epsilon) * self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] + dTheta
self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] = max(update, self.Configuration.ThetaZero) self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] = max(update, self.Configuration.ThetaZero)