Ant.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. #!/usr/bin/env python
  2. from datetime import datetime, timedelta
  3. import math
  4. import numpy as np
  5. import random
  6. import bisect
  7. import itertools
  8. from aman.sys.aco.Configuration import Configuration
  9. from aman.sys.aco.RunwayManager import RunwayManager
  10. from aman.sys.aco.Node import Node
  11. # This class implements a single ant of the following paper:
  12. # https://sci-hub.mksa.top/10.1109/cec.2019.8790135
  13. class Ant:
  14. def __init__(self, pheromoneTable : np.array, configuration : Configuration, nodes):
  15. self.Configuration = configuration
  16. self.Nodes = nodes
  17. self.RunwayManager = RunwayManager(self.Configuration)
  18. self.InboundSelected = [ False ] * len(self.Nodes)
  19. self.InboundScore = np.zeros([ len(self.Nodes), 1 ])
  20. self.PheromoneMatrix = pheromoneTable
  21. self.SequenceDelay = timedelta(seconds = 0)
  22. self.Sequence = None
  23. # Implements function (5)
  24. def heuristicInformation(self, current : int):
  25. _, eta, _ = self.RunwayManager.selectArrivalRunway(self.Nodes[current], self.Configuration.EarliestArrivalTime)
  26. inboundDelay = eta - self.Nodes[current].Inbound.InitialArrivalTime
  27. # calculate the fraction with a mix of the unused runway time and the delay of single aircrafts
  28. heuristic = inboundDelay.total_seconds() / 60.0
  29. heuristic = (1.0 / (heuristic or 1)) ** self.Configuration.Beta
  30. return heuristic
  31. # Implements functions (3), (6)
  32. def selectNextLandingIndex(self):
  33. q = float(random.randint(0, 100)) / 100
  34. weights = []
  35. if q <= self.Configuration.PseudoRandomSelectionRate:
  36. for i in range(0, len(self.InboundSelected)):
  37. if False == self.InboundSelected[i]:
  38. weights.append(self.heuristicInformation(i))
  39. else:
  40. # roulette selection strategy
  41. pheromoneScale = 0.0
  42. for i in range(0, len(self.InboundSelected)):
  43. if False == self.InboundSelected[i]:
  44. pheromoneScale += self.heuristicInformation(i)
  45. for i in range(0, len(self.InboundSelected)):
  46. if False == self.InboundSelected[i]:
  47. weights.append(self.heuristicInformation(i) / (pheromoneScale or 1))
  48. total = sum(weights)
  49. cumdist = list(itertools.accumulate(weights)) + [total]
  50. candidateIndex = bisect.bisect(cumdist, random.random() * total)
  51. for i in range(0, len(self.InboundSelected)):
  52. if False == self.InboundSelected[i]:
  53. if 0 == candidateIndex:
  54. return i
  55. else:
  56. candidateIndex -= 1
  57. return None
  58. def associateInbound(self, node : Node, earliestArrivalTime : datetime):
  59. # prepare the statistics
  60. rwy, eta, _ = self.RunwayManager.selectArrivalRunway(node, self.Configuration.EarliestArrivalTime)
  61. eta = max(earliestArrivalTime, eta)
  62. node.Inbound.PlannedRunway = rwy
  63. node.Inbound.PlannedStar = node.ArrivalCandidates[rwy.Name].Star
  64. node.Inbound.PlannedArrivalTime = eta
  65. node.Inbound.PlannedArrivalRoute = node.ArrivalCandidates[rwy.Name].ArrivalRoute
  66. node.Inbound.InitialArrivalTime = node.ArrivalCandidates[rwy.Name].InitialArrivalTime
  67. self.RunwayManager.RunwayInbounds[rwy.Name] = node
  68. delay = node.Inbound.PlannedArrivalTime - node.Inbound.InitialArrivalTime
  69. if 0.0 < delay.total_seconds():
  70. return delay, rwy
  71. else:
  72. return timedelta(seconds = 0), rwy
  73. def findSolution(self, first : int):
  74. self.Sequence = []
  75. # select the first inbound
  76. self.InboundSelected[first] = True
  77. delay, _ = self.associateInbound(self.Nodes[first], self.Configuration.EarliestArrivalTime)
  78. self.Sequence.append(first)
  79. self.SequenceDelay += delay
  80. while 1:
  81. index = self.selectNextLandingIndex()
  82. if None == index:
  83. break
  84. self.InboundSelected[index] = True
  85. delay, _ = self.associateInbound(self.Nodes[index], self.Configuration.EarliestArrivalTime)
  86. self.SequenceDelay += delay
  87. self.Sequence.append(index)
  88. # update the local pheromone
  89. update = (1.0 - self.Configuration.propagationRatio) * self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]]
  90. update += self.Configuration.propagationRatio * self.Configuration.ThetaZero
  91. self.PheromoneMatrix[self.Sequence[-2], self.Sequence[-1]] = max(self.Configuration.ThetaZero, update)
  92. # validate that nothing went wrong
  93. if len(self.Sequence) != len(self.Nodes):
  94. self.SequenceDelay = None
  95. self.Sequence = None