Ant.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. #!/usr/bin/env python
  2. from datetime import datetime, timedelta
  3. import math
  4. import numpy as np
  5. import random
  6. import bisect
  7. import itertools
  8. from aman.sys.aco.Configuration import Configuration
  9. from aman.sys.aco.RunwayManager import RunwayManager
  10. from aman.types.Inbound import Inbound
  11. # This class implements a single ant of the following paper:
  12. # https://sci-hub.mksa.top/10.1109/cec.2019.8790135
  13. class Ant:
  14. def __init__(self, pheromoneTable : np.array, configuration : Configuration):
  15. self.Configuration = configuration
  16. self.RunwayManager = RunwayManager(self.Configuration)
  17. self.InboundSelected = [ False ] * len(self.Configuration.Inbounds)
  18. self.PheromoneMatrix = pheromoneTable
  19. self.SequenceDelay = timedelta(seconds = 0)
  20. self.Sequence = None
  21. # Implements function (5), but adapted to the following logic:
  22. # An adaption of the heuristic function is used:
  23. # - Calculates the unused runway time (time between two consecutive landings)
  24. # - Calculates a ratio between the inbound delay and the unused runway time
  25. # - Adds the current overal sequence delay to the heuristic function
  26. def heuristicInformation(self, preceeding : int, current : int):
  27. rwy, eta, unusedRunwayTime = self.RunwayManager.selectArrivalRunway(self.Configuration.Inbounds[current], True, self.Configuration.EarliestArrivalTime)
  28. inboundDelay = eta - self.Configuration.Inbounds[current].ArrivalCandidates[rwy.Name].InitialArrivalTime
  29. if 0.0 > inboundDelay.total_seconds():
  30. inboundDelay = timedelta(seconds = 0)
  31. # calculate the fraction with a mix of the unused runway time and the delay of single aircrafts
  32. fraction = self.Configuration.RunwayOccupasionRatio * unusedRunwayTime.total_seconds()
  33. fraction += (1.0 - self.Configuration.RunwayOccupasionRatio) * inboundDelay.total_seconds()
  34. fraction += self.SequenceDelay.total_seconds()
  35. fraction /= 60.0
  36. return self.PheromoneMatrix[preceeding, current] * ((1.0 / (fraction or 1)) ** self.Configuration.Beta)
  37. # Implements functions (3), (6)
  38. def selectNextLandingIndex(self, preceedingIndex : int):
  39. q = float(random.randint(0, 100)) / 100
  40. weights = []
  41. if q <= self.Configuration.PseudoRandomSelectionRate:
  42. for i in range(0, len(self.InboundSelected)):
  43. if False == self.InboundSelected[i]:
  44. weights.append(self.heuristicInformation(preceedingIndex, i))
  45. else:
  46. # roulette selection strategy
  47. pheromoneScale = 0.0
  48. for i in range(0, len(self.InboundSelected)):
  49. if False == self.InboundSelected[i]:
  50. pheromoneScale += self.heuristicInformation(preceedingIndex, i)
  51. # fallback
  52. if 0.0 >= pheromoneScale:
  53. pheromoneScale = 1.0
  54. for i in range(0, len(self.InboundSelected)):
  55. if False == self.InboundSelected[i]:
  56. weights.append(self.heuristicInformation(preceedingIndex, i) / pheromoneScale)
  57. total = sum(weights)
  58. cumdist = list(itertools.accumulate(weights)) + [total]
  59. candidateIndex = bisect.bisect(cumdist, random.random() * total)
  60. for i in range(0, len(self.InboundSelected)):
  61. if False == self.InboundSelected[i]:
  62. if 0 == candidateIndex:
  63. return i
  64. else:
  65. candidateIndex -= 1
  66. return None
  67. def associateInbound(self, inbound : Inbound, earliestArrivalTime : datetime):
  68. # prepare the statistics
  69. rwy, eta, _ = self.RunwayManager.selectArrivalRunway(inbound, True, self.Configuration.EarliestArrivalTime)
  70. eta = max(earliestArrivalTime, eta)
  71. inbound.PlannedRunway = rwy
  72. inbound.PlannedStar = inbound.ArrivalCandidates[rwy.Name].Star
  73. inbound.PlannedArrivalTime = eta
  74. inbound.InitialArrivalTime = inbound.ArrivalCandidates[rwy.Name].InitialArrivalTime
  75. self.RunwayManager.RunwayInbounds[rwy.Name] = inbound
  76. delay = inbound.PlannedArrivalTime - inbound.InitialArrivalTime
  77. if 0.0 < delay.total_seconds():
  78. return delay
  79. else:
  80. return timedelta(seconds = 0)
  81. def findSolution(self, first : int):
  82. self.Sequence = []
  83. # select the first inbound
  84. self.InboundSelected[first] = True
  85. self.Sequence.append(first)
  86. self.SequenceDelay += self.associateInbound(self.Configuration.Inbounds[first], self.Configuration.EarliestArrivalTime)
  87. while 1:
  88. index = self.selectNextLandingIndex(self.Sequence[-1])
  89. if None == index:
  90. break
  91. self.InboundSelected[index] = True
  92. self.SequenceDelay += self.associateInbound(self.Configuration.Inbounds[index], self.Configuration.EarliestArrivalTime)
  93. self.Sequence.append(index)
  94. # update the local pheromone
  95. update = (1.0 - self.Configuration.propagationRatio) * self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]]
  96. update += self.Configuration.propagationRatio * self.Configuration.ThetaZero
  97. self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]] = max(self.Configuration.ThetaZero, update)
  98. # validate that nothing went wrong
  99. if len(self.Sequence) != len(self.Configuration.Inbounds):
  100. self.SequenceDelay = None
  101. self.Sequence = None