Ant.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. #!/usr/bin/env python
  2. from datetime import datetime, timedelta
  3. import math
  4. import numpy as np
  5. import random
  6. import bisect
  7. import itertools
  8. from aman.sys.aco.Configuration import Configuration
  9. from aman.sys.aco.RunwayManager import RunwayManager
  10. from aman.sys.aco.Node import Node
  11. # This class implements a single ant of the following paper:
  12. # https://sci-hub.mksa.top/10.1109/cec.2019.8790135
  13. class Ant:
  14. def __init__(self, pheromoneTable : np.array, configuration : Configuration, nodes):
  15. self.Configuration = configuration
  16. self.Nodes = nodes
  17. self.RunwayManager = RunwayManager(self.Configuration)
  18. self.InboundSelected = [ False ] * len(self.Nodes)
  19. self.InboundScore = np.zeros([ len(self.Nodes), 1 ])
  20. self.PheromoneMatrix = pheromoneTable
  21. self.SequenceDelay = timedelta(seconds = 0)
  22. self.Sequence = None
  23. def qualifyDelay(delay, node, runway):
  24. if 0.0 > delay.total_seconds():
  25. delay = timedelta(seconds = 0)
  26. # calculate the heuristic scaling to punish increased delays for single inbounds
  27. scaledDelay = delay.total_seconds() / node.ArrivalCandidates[runway.Name].MaximumTimeToLose.total_seconds()
  28. return max(1.0 / (99.0 * (scaledDelay ** 2) + 1), 0.01)
  29. # Implements function (5), but adapted to the following logic:
  30. # An adaption of the heuristic function is used:
  31. # - Calculates the unused runway time (time between two consecutive landings)
  32. # - Calculates a ratio between the inbound delay and the unused runway time
  33. # - Weight the overall ratio based on maximum time to lose to punish high time to lose rates while other flights are gaining time
  34. def heuristicInformation(self, preceeding : int, current : int):
  35. rwy, eta, unusedRunwayTime = self.RunwayManager.selectArrivalRunway(self.Nodes[current], True, self.Configuration.EarliestArrivalTime)
  36. inboundDelay = eta - self.Nodes[current].ArrivalCandidates[rwy.Name].InitialArrivalTime
  37. if 0.0 > inboundDelay.total_seconds():
  38. inboundDelay = timedelta(seconds = 0)
  39. # calculate the fraction with a mix of the unused runway time and the delay of single aircrafts
  40. fraction = self.Configuration.RunwayOccupasionRatio * unusedRunwayTime.total_seconds()
  41. fraction += (1.0 - self.Configuration.RunwayOccupasionRatio) * inboundDelay.total_seconds()
  42. fraction += self.SequenceDelay.total_seconds()
  43. fraction /= 60.0
  44. # calculate the heuristic scaling to punish increased delays for single inbounds
  45. weight = Ant.qualifyDelay(inboundDelay, self.Nodes[current], rwy)
  46. return weight * self.PheromoneMatrix[preceeding, current] * ((1.0 / (fraction or 1)) ** self.Configuration.Beta)
  47. # Implements functions (3), (6)
  48. def selectNextLandingIndex(self, preceedingIndex : int):
  49. q = float(random.randint(0, 100)) / 100
  50. weights = []
  51. if q <= self.Configuration.PseudoRandomSelectionRate:
  52. for i in range(0, len(self.InboundSelected)):
  53. if False == self.InboundSelected[i]:
  54. weights.append(self.heuristicInformation(preceedingIndex, i))
  55. else:
  56. # roulette selection strategy
  57. pheromoneScale = 0.0
  58. for i in range(0, len(self.InboundSelected)):
  59. if False == self.InboundSelected[i]:
  60. pheromoneScale += self.heuristicInformation(preceedingIndex, i)
  61. for i in range(0, len(self.InboundSelected)):
  62. if False == self.InboundSelected[i]:
  63. weights.append(self.heuristicInformation(preceedingIndex, i) / (pheromoneScale or 1))
  64. total = sum(weights)
  65. cumdist = list(itertools.accumulate(weights)) + [total]
  66. candidateIndex = bisect.bisect(cumdist, random.random() * total)
  67. for i in range(0, len(self.InboundSelected)):
  68. if False == self.InboundSelected[i]:
  69. if 0 == candidateIndex:
  70. return i
  71. else:
  72. candidateIndex -= 1
  73. return None
  74. def associateInbound(self, node : Node, earliestArrivalTime : datetime):
  75. # prepare the statistics
  76. rwy, eta, _ = self.RunwayManager.selectArrivalRunway(node, True, self.Configuration.EarliestArrivalTime)
  77. eta = max(earliestArrivalTime, eta)
  78. node.Inbound.PlannedRunway = rwy
  79. node.Inbound.PlannedStar = node.ArrivalCandidates[rwy.Name].Star
  80. node.Inbound.PlannedArrivalTime = eta
  81. node.Inbound.PlannedArrivalRoute = node.ArrivalCandidates[rwy.Name].ArrivalRoute
  82. node.Inbound.InitialArrivalTime = node.ArrivalCandidates[rwy.Name].InitialArrivalTime
  83. self.RunwayManager.RunwayInbounds[rwy.Name] = node
  84. delay = node.Inbound.PlannedArrivalTime - node.Inbound.InitialArrivalTime
  85. if 0.0 < delay.total_seconds():
  86. return delay, rwy
  87. else:
  88. return timedelta(seconds = 0), rwy
  89. def findSolution(self, first : int):
  90. self.Sequence = []
  91. # select the first inbound
  92. self.InboundSelected[first] = True
  93. delay, rwy = self.associateInbound(self.Nodes[first], self.Configuration.EarliestArrivalTime)
  94. self.InboundScore[0] = Ant.qualifyDelay(delay, self.Nodes[first], rwy)
  95. self.Sequence.append(first)
  96. self.SequenceDelay += delay
  97. while 1:
  98. index = self.selectNextLandingIndex(self.Sequence[-1])
  99. if None == index:
  100. break
  101. self.InboundSelected[index] = True
  102. delay, rwy = self.associateInbound(self.Nodes[index], self.Configuration.EarliestArrivalTime)
  103. self.SequenceDelay += delay
  104. self.InboundScore[len(self.Sequence)] = Ant.qualifyDelay(delay, self.Nodes[index], rwy)
  105. self.Sequence.append(index)
  106. # update the local pheromone
  107. update = (1.0 - self.Configuration.propagationRatio) * self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]]
  108. update += self.Configuration.propagationRatio * self.Configuration.ThetaZero
  109. self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]] = max(self.Configuration.ThetaZero, update)
  110. # validate that nothing went wrong
  111. if len(self.Sequence) != len(self.Nodes):
  112. self.SequenceDelay = None
  113. self.SequenceScore = None
  114. self.Sequence = None
  115. else:
  116. self.SequenceScore = np.median(self.InboundScore)