Ant.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. #!/usr/bin/env python
  2. from datetime import datetime, timedelta
  3. import math
  4. import numpy as np
  5. import random
  6. import bisect
  7. import itertools
  8. from aman.sys.aco.Configuration import Configuration
  9. from aman.sys.aco.RunwayManager import RunwayManager
  10. from aman.sys.aco.Node import Node
  11. # This class implements a single ant of the following paper:
  12. # https://sci-hub.mksa.top/10.1109/cec.2019.8790135
  13. class Ant:
  14. def __init__(self, pheromoneTable : np.array, configuration : Configuration, nodes):
  15. self.Configuration = configuration
  16. self.Nodes = nodes
  17. self.RunwayManager = RunwayManager(self.Configuration)
  18. self.InboundSelected = [ False ] * len(self.Nodes)
  19. self.InboundScore = np.zeros([ len(self.Nodes), 1 ])
  20. self.PheromoneMatrix = pheromoneTable
  21. self.SequenceDelay = timedelta(seconds = 0)
  22. self.Sequence = None
  23. # Implements function (5)
  24. def heuristicInformation(self, current : int):
  25. _, _, _, eta, _ = self.RunwayManager.selectArrivalRunway(self.Nodes[current], self.Configuration.EarliestArrivalTime)
  26. if None == eta:
  27. return -1.0
  28. inboundDelay = eta - self.Nodes[current].Inbound.InitialArrivalTime
  29. # calculate the fraction with a mix of the unused runway time and the delay of single aircrafts
  30. heuristic = inboundDelay.total_seconds() / 60.0
  31. heuristic = (1.0 / (heuristic or 1)) ** self.Configuration.Beta
  32. return heuristic
  33. # Implements functions (3), (6)
  34. def selectNextLandingIndex(self):
  35. q = float(random.randint(0, 100)) / 100
  36. weights = []
  37. if q <= self.Configuration.PseudoRandomSelectionRate:
  38. for i in range(0, len(self.InboundSelected)):
  39. if False == self.InboundSelected[i]:
  40. weights.append(self.heuristicInformation(i))
  41. else:
  42. # roulette selection strategy
  43. pheromoneScale = 0.0
  44. for i in range(0, len(self.InboundSelected)):
  45. if False == self.InboundSelected[i]:
  46. pheromoneScale += self.heuristicInformation(i)
  47. for i in range(0, len(self.InboundSelected)):
  48. if False == self.InboundSelected[i]:
  49. weights.append(self.heuristicInformation(i) / (pheromoneScale or 1))
  50. # something was wrong in the runway selection
  51. if -1.0 in weights:
  52. return None
  53. total = sum(weights)
  54. cumdist = list(itertools.accumulate(weights)) + [total]
  55. candidateIndex = bisect.bisect(cumdist, random.random() * total)
  56. for i in range(0, len(self.InboundSelected)):
  57. if False == self.InboundSelected[i]:
  58. if 0 == candidateIndex:
  59. return i
  60. else:
  61. candidateIndex -= 1
  62. return None
  63. def associateInbound(self, node : Node, earliestArrivalTime : datetime):
  64. # prepare the statistics
  65. _, _, rwy, eta, _ = self.RunwayManager.selectArrivalRunway(node, self.Configuration.EarliestArrivalTime)
  66. eta = max(earliestArrivalTime, eta)
  67. node.Inbound.PlannedRunway = rwy
  68. node.Inbound.PlannedStar = node.ArrivalCandidates[rwy.Name].Star
  69. node.Inbound.PlannedArrivalTime = eta
  70. node.Inbound.PlannedArrivalRoute = node.ArrivalCandidates[rwy.Name].ArrivalRoute
  71. node.Inbound.InitialArrivalTime = node.ArrivalCandidates[rwy.Name].InitialArrivalTime
  72. self.RunwayManager.registerNode(node, rwy.Name)
  73. delay = node.Inbound.PlannedArrivalTime - node.Inbound.InitialArrivalTime
  74. if 0.0 < delay.total_seconds():
  75. return delay, rwy
  76. else:
  77. return timedelta(seconds = 0), rwy
  78. def findSolution(self, first : int):
  79. self.Sequence = []
  80. # select the first inbound
  81. self.InboundSelected[first] = True
  82. delay, _ = self.associateInbound(self.Nodes[first], self.Configuration.EarliestArrivalTime)
  83. self.Sequence.append(first)
  84. self.SequenceDelay += delay
  85. while 1:
  86. index = self.selectNextLandingIndex()
  87. if None == index:
  88. break
  89. self.InboundSelected[index] = True
  90. delay, _ = self.associateInbound(self.Nodes[index], self.Configuration.EarliestArrivalTime)
  91. self.SequenceDelay += delay
  92. self.Sequence.append(index)
  93. # update the local pheromone
  94. update = (1.0 - self.Configuration.PropagationRatio) * self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]]
  95. update += self.Configuration.PropagationRatio * self.Configuration.ThetaZero
  96. self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]] = max(self.Configuration.ThetaZero, update)
  97. # validate that nothing went wrong
  98. if len(self.Sequence) != len(self.Nodes):
  99. self.SequenceDelay = None
  100. self.Sequence = None