Colony.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. #!/usr/bin/env python
  2. from datetime import datetime as dt
  3. from datetime import datetime, timedelta
  4. import numpy as np
  5. import pytz
  6. import random
  7. import sys
  8. import time
  9. from aman.sys.aco.Ant import Ant
  10. from aman.sys.aco.Configuration import Configuration
  11. from aman.sys.aco.Node import Node
  12. from aman.sys.aco.RunwayManager import RunwayManager
  13. from aman.types.Inbound import Inbound
  14. # This class implements the ant colony of the following paper:
  15. # https://sci-hub.mksa.top/10.1109/cec.2019.8790135
  16. class Colony:
  17. def associateInbound(rwyManager : RunwayManager, node : Node, earliestArrivalTime : datetime):
  18. type, expectedRwy, rwy, eta, _ = rwyManager.selectArrivalRunway(node, earliestArrivalTime)
  19. if None == eta:
  20. return False
  21. eta = max(earliestArrivalTime, eta)
  22. node.Inbound.PlannedRunway = rwy
  23. node.Inbound.PlannedStar = node.ArrivalCandidates[rwy.Name].Star
  24. node.Inbound.PlannedArrivalRoute = node.ArrivalCandidates[rwy.Name].ArrivalRoute
  25. node.Inbound.PlannedArrivalTime = eta
  26. node.Inbound.InitialArrivalTime = node.ArrivalCandidates[rwy.Name].InitialArrivalTime
  27. node.Inbound.PlannedTrackmiles = node.ArrivalCandidates[rwy.Name].Trackmiles
  28. node.Inbound.AssignmentMode = type
  29. node.Inbound.ExpectedRunway = expectedRwy
  30. rwyManager.registerNode(node, rwy.Name)
  31. return True
  32. def calculateInitialCosts(rwyManager : RunwayManager, nodes, earliestArrivalTime : datetime):
  33. overallDelay = timedelta(seconds = 0)
  34. # assume that the nodes are sorted in FCFS order
  35. for node in nodes:
  36. if False == Colony.associateInbound(rwyManager, node, earliestArrivalTime):
  37. return None
  38. overallDelay += node.Inbound.PlannedArrivalTime - node.Inbound.InitialArrivalTime
  39. return overallDelay
  40. def __init__(self, inbounds, configuration : Configuration):
  41. self.Configuration = configuration
  42. self.ResultDelay = None
  43. self.FcfsDelay = None
  44. self.Result = None
  45. self.Nodes = []
  46. # create the new planning instances
  47. currentTime = dt.utcfromtimestamp(int(time.time())).replace(tzinfo = pytz.UTC)
  48. for inbound in inbounds:
  49. self.Nodes.append(Node(inbound, currentTime, self.Configuration.WeatherModel, self.Configuration.AirportConfiguration, self.Configuration.RunwayConstraints))
  50. rwyManager = RunwayManager(self.Configuration)
  51. delay = Colony.calculateInitialCosts(rwyManager, self.Nodes, self.Configuration.EarliestArrivalTime)
  52. if None == delay:
  53. return
  54. self.FcfsDelay = delay
  55. # run the optimization in every cycle to ensure optimal spacings based on TTG
  56. if 0.0 >= delay.total_seconds():
  57. delay = timedelta(seconds = 1.0)
  58. # initial value for the optimization
  59. self.Configuration.ThetaZero = 1.0 / (len(self.Nodes) * (delay.total_seconds() / 60.0))
  60. self.PheromoneMatrix = np.ones(( len(self.Nodes), len(self.Nodes) ), dtype=float) * self.Configuration.ThetaZero
  61. def sequenceAndPredictInbound(self, rwyManager : RunwayManager, node : Node):
  62. self.Result.append(node)
  63. Colony.associateInbound(rwyManager, node, self.Configuration.EarliestArrivalTime)
  64. reqTimeDelta = self.Result[-1].Inbound.EnrouteArrivalTime - self.Result[-1].Inbound.PlannedArrivalTime
  65. self.Result[-1].Inbound.PlannedArrivalRoute[0].PTA = self.Result[-1].Inbound.PlannedArrivalRoute[0].ETA - reqTimeDelta
  66. for i in range(1, len(self.Result[-1].Inbound.PlannedArrivalRoute)):
  67. prev = self.Result[-1].Inbound.PlannedArrivalRoute[i - 1]
  68. current = self.Result[-1].Inbound.PlannedArrivalRoute[i]
  69. current.PTA = prev.PTA + (current.ETA - prev.ETA)
  70. def optimize(self):
  71. if None == self.FcfsDelay:
  72. return False
  73. # define the tracking variables
  74. bestSequence = None
  75. # run the optimization loops
  76. for _ in range(0, self.Configuration.ExplorationRuns):
  77. # select the first inbound
  78. index = random.randint(1, len(self.Nodes)) - 1
  79. candidates = []
  80. for _ in range(0, self.Configuration.AntCount):
  81. # let the ant find a solution
  82. ant = Ant(self.PheromoneMatrix, self.Configuration, self.Nodes)
  83. ant.findSolution(index)
  84. # fallback to check if findSolution was successful
  85. if None == ant.SequenceDelay or None == ant.Sequence:
  86. sys.stderr.write('Invalid ANT run detected!')
  87. break
  88. candidates.append(
  89. [
  90. ant.SequenceDelay,
  91. ant.Sequence
  92. ]
  93. )
  94. # find the best solution in all candidates of this generation
  95. bestCandidate = None
  96. for candidate in candidates:
  97. if None == bestCandidate or candidate[0] < bestCandidate[0]:
  98. bestCandidate = candidate
  99. if None != bestSequence:
  100. dTheta = 1.0 / ((bestSequence[0].total_seconds() / 60.0) or 1.0)
  101. for i in range(1, len(bestSequence[1])):
  102. update = (1.0 - self.Configuration.Epsilon) * self.PheromoneMatrix[bestSequence[1][i - 1], bestSequence[1][i]] + self.Configuration.Epsilon * dTheta
  103. self.PheromoneMatrix[bestSequence[1][i - 1], bestSequence[1][i]] = max(update, self.Configuration.ThetaZero)
  104. # check if we find a new best candidate
  105. if None != bestCandidate:
  106. if None == bestSequence or bestCandidate[0] < bestSequence[0]:
  107. bestSequence = bestCandidate
  108. # found the optimal solution
  109. if 1 >= bestSequence[0].total_seconds():
  110. break
  111. # create the final sequence
  112. self.Result = []
  113. rwyManager = RunwayManager(self.Configuration)
  114. # use the optimized sequence
  115. if None != bestSequence and self.FcfsDelay >= bestSequence[0]:
  116. # create the resulting sequence
  117. self.ResultDelay = bestSequence[0]
  118. # finalize the sequence
  119. for idx in bestSequence[1]:
  120. self.sequenceAndPredictInbound(rwyManager, self.Nodes[idx])
  121. # use the FCFS sequence
  122. else:
  123. self.ResultDelay = self.FcfsDelay
  124. for node in self.Nodes:
  125. self.sequenceAndPredictInbound(rwyManager, node)
  126. return True