Ant.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. #!/usr/bin/env python
  2. from datetime import datetime, timedelta
  3. import math
  4. import numpy as np
  5. import random
  6. import bisect
  7. import itertools
  8. from aman.sys.aco.Configuration import Configuration
  9. from aman.sys.aco.RunwayManager import RunwayManager
  10. from aman.types.Inbound import Inbound
  11. # This class implements a single ant of the following paper:
  12. # https://sci-hub.mksa.top/10.1109/cec.2019.8790135
  13. class Ant:
  14. def __init__(self, pheromoneTable : np.array, configuration : Configuration):
  15. self.Configuration = configuration
  16. self.RunwayManager = RunwayManager(self.Configuration)
  17. self.InboundSelected = [ False ] * len(self.Configuration.Inbounds)
  18. self.InboundScore = np.zeros([ len(self.Configuration.Inbounds), 1 ])
  19. self.PheromoneMatrix = pheromoneTable
  20. self.SequenceDelay = timedelta(seconds = 0)
  21. self.Sequence = None
  22. def qualifyDelay(delay, inbound, runway):
  23. if 0.0 > delay.total_seconds():
  24. delay = timedelta(seconds = 0)
  25. # calculate the heuristic scaling to punish increased delays for single inbounds
  26. scaledDelay = delay.total_seconds() / inbound.ArrivalCandidates[runway.Name].MaximumTimeToLose.total_seconds()
  27. return max(1.0 / (99.0 * (scaledDelay ** 2) + 1), 0.01)
  28. # Implements function (5), but adapted to the following logic:
  29. # An adaption of the heuristic function is used:
  30. # - Calculates the unused runway time (time between two consecutive landings)
  31. # - Calculates a ratio between the inbound delay and the unused runway time
  32. # - Weight the overall ratio based on maximum time to lose to punish high time to lose rates while other flights are gaining time
  33. def heuristicInformation(self, preceeding : int, current : int):
  34. rwy, eta, unusedRunwayTime = self.RunwayManager.selectArrivalRunway(self.Configuration.Inbounds[current], True, self.Configuration.EarliestArrivalTime)
  35. inboundDelay = eta - self.Configuration.Inbounds[current].ArrivalCandidates[rwy.Name].InitialArrivalTime
  36. if 0.0 > inboundDelay.total_seconds():
  37. inboundDelay = timedelta(seconds = 0)
  38. # calculate the fraction with a mix of the unused runway time and the delay of single aircrafts
  39. fraction = self.Configuration.RunwayOccupasionRatio * unusedRunwayTime.total_seconds()
  40. fraction += (1.0 - self.Configuration.RunwayOccupasionRatio) * inboundDelay.total_seconds()
  41. fraction += self.SequenceDelay.total_seconds()
  42. fraction /= 60.0
  43. # calculate the heuristic scaling to punish increased delays for single inbounds
  44. weight = Ant.qualifyDelay(inboundDelay, self.Configuration.Inbounds[current], rwy)
  45. return weight * self.PheromoneMatrix[preceeding, current] * ((1.0 / (fraction or 1)) ** self.Configuration.Beta)
  46. # Implements functions (3), (6)
  47. def selectNextLandingIndex(self, preceedingIndex : int):
  48. q = float(random.randint(0, 100)) / 100
  49. weights = []
  50. if q <= self.Configuration.PseudoRandomSelectionRate:
  51. for i in range(0, len(self.InboundSelected)):
  52. if False == self.InboundSelected[i]:
  53. weights.append(self.heuristicInformation(preceedingIndex, i))
  54. else:
  55. # roulette selection strategy
  56. pheromoneScale = 0.0
  57. for i in range(0, len(self.InboundSelected)):
  58. if False == self.InboundSelected[i]:
  59. pheromoneScale += self.heuristicInformation(preceedingIndex, i)
  60. for i in range(0, len(self.InboundSelected)):
  61. if False == self.InboundSelected[i]:
  62. weights.append(self.heuristicInformation(preceedingIndex, i) / (pheromoneScale or 1))
  63. total = sum(weights)
  64. cumdist = list(itertools.accumulate(weights)) + [total]
  65. candidateIndex = bisect.bisect(cumdist, random.random() * total)
  66. for i in range(0, len(self.InboundSelected)):
  67. if False == self.InboundSelected[i]:
  68. if 0 == candidateIndex:
  69. return i
  70. else:
  71. candidateIndex -= 1
  72. return None
  73. def associateInbound(self, inbound : Inbound, earliestArrivalTime : datetime):
  74. # prepare the statistics
  75. rwy, eta, _ = self.RunwayManager.selectArrivalRunway(inbound, True, self.Configuration.EarliestArrivalTime)
  76. eta = max(earliestArrivalTime, eta)
  77. inbound.PlannedRunway = rwy
  78. inbound.PlannedStar = inbound.ArrivalCandidates[rwy.Name].Star
  79. inbound.PlannedArrivalTime = eta
  80. inbound.InitialArrivalTime = inbound.ArrivalCandidates[rwy.Name].InitialArrivalTime
  81. self.RunwayManager.RunwayInbounds[rwy.Name] = inbound
  82. delay = inbound.PlannedArrivalTime - inbound.InitialArrivalTime
  83. if 0.0 < delay.total_seconds():
  84. return delay, rwy
  85. else:
  86. return timedelta(seconds = 0), rwy
  87. def findSolution(self, first : int):
  88. self.Sequence = []
  89. # select the first inbound
  90. self.InboundSelected[first] = True
  91. delay, rwy = self.associateInbound(self.Configuration.Inbounds[first], self.Configuration.EarliestArrivalTime)
  92. self.InboundScore[0] = Ant.qualifyDelay(delay, self.Configuration.Inbounds[first], rwy)
  93. self.Sequence.append(first)
  94. self.SequenceDelay += delay
  95. while 1:
  96. index = self.selectNextLandingIndex(self.Sequence[-1])
  97. if None == index:
  98. break
  99. self.InboundSelected[index] = True
  100. delay, rwy = self.associateInbound(self.Configuration.Inbounds[index], self.Configuration.EarliestArrivalTime)
  101. self.SequenceDelay += delay
  102. self.InboundScore[len(self.Sequence)] = Ant.qualifyDelay(delay, self.Configuration.Inbounds[index], rwy)
  103. self.Sequence.append(index)
  104. # update the local pheromone
  105. update = (1.0 - self.Configuration.propagationRatio) * self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]]
  106. update += self.Configuration.propagationRatio * self.Configuration.ThetaZero
  107. self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]] = max(self.Configuration.ThetaZero, update)
  108. # validate that nothing went wrong
  109. if len(self.Sequence) != len(self.Configuration.Inbounds):
  110. self.SequenceDelay = None
  111. self.SequenceScore = None
  112. self.Sequence = None
  113. else:
  114. self.SequenceScore = np.median(self.InboundScore)