Colony.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. #!/usr/bin/env python
  2. from datetime import datetime as dt
  3. from datetime import datetime, timedelta
  4. import numpy as np
  5. import pytz
  6. import random
  7. import sys
  8. import time
  9. from aman.sys.aco.Ant import Ant
  10. from aman.sys.aco.Configuration import Configuration
  11. from aman.sys.aco.Node import Node
  12. from aman.sys.aco.RunwayManager import RunwayManager
  13. from aman.types.Inbound import Inbound
  14. # This class implements the ant colony of the following paper:
  15. # https://sci-hub.mksa.top/10.1109/cec.2019.8790135
  16. class Colony:
  17. def associateInbound(rwyManager : RunwayManager, node : Node, earliestArrivalTime : datetime):
  18. rwy, eta, _ = rwyManager.selectArrivalRunway(node, earliestArrivalTime)
  19. if None == eta:
  20. return False
  21. eta = max(earliestArrivalTime, eta)
  22. node.Inbound.PlannedRunway = rwy
  23. node.Inbound.PlannedStar = node.ArrivalCandidates[rwy.Name].Star
  24. node.Inbound.PlannedArrivalRoute = node.ArrivalCandidates[rwy.Name].ArrivalRoute
  25. node.Inbound.PlannedArrivalTime = eta
  26. node.Inbound.InitialArrivalTime = node.ArrivalCandidates[rwy.Name].InitialArrivalTime
  27. node.Inbound.PlannedTrackmiles = node.ArrivalCandidates[rwy.Name].Trackmiles
  28. rwyManager.RunwayInbounds[rwy.Name] = node
  29. return True
  30. def calculateInitialCosts(rwyManager : RunwayManager, nodes, earliestArrivalTime : datetime):
  31. overallDelay = timedelta(seconds = 0)
  32. # assume that the nodes are sorted in FCFS order
  33. for node in nodes:
  34. if False == Colony.associateInbound(rwyManager, node, earliestArrivalTime):
  35. return None
  36. overallDelay += node.Inbound.PlannedArrivalTime - node.Inbound.InitialArrivalTime
  37. return overallDelay
  38. def __init__(self, inbounds, configuration : Configuration):
  39. self.Configuration = configuration
  40. self.ResultDelay = None
  41. self.FcfsDelay = None
  42. self.Result = None
  43. self.Nodes = []
  44. # create the new planning instances
  45. currentTime = dt.utcfromtimestamp(int(time.time())).replace(tzinfo = pytz.UTC)
  46. for inbound in inbounds:
  47. self.Nodes.append(Node(inbound, currentTime, self.Configuration.WeatherModel, self.Configuration.AirportConfiguration, self.Configuration.RunwayConstraints))
  48. rwyManager = RunwayManager(self.Configuration)
  49. delay = Colony.calculateInitialCosts(rwyManager, self.Nodes, self.Configuration.EarliestArrivalTime)
  50. if None == delay:
  51. return
  52. self.FcfsDelay = delay
  53. # run the optimization in every cycle to ensure optimal spacings based on TTG
  54. if 0.0 >= delay.total_seconds():
  55. delay = timedelta(seconds = 1.0)
  56. # initial value for the optimization
  57. self.Configuration.ThetaZero = 1.0 / (len(self.Nodes) * (delay.total_seconds() / 60.0))
  58. self.PheromoneMatrix = np.ones(( len(self.Nodes), len(self.Nodes) ), dtype=float) * self.Configuration.ThetaZero
  59. def sequenceAndPredictInbound(self, rwyManager : RunwayManager, node : Node):
  60. self.Result.append(node)
  61. Colony.associateInbound(rwyManager, node, self.Configuration.EarliestArrivalTime)
  62. reqTimeDelta = self.Result[-1].Inbound.EnrouteArrivalTime - self.Result[-1].Inbound.PlannedArrivalTime
  63. self.Result[-1].Inbound.PlannedArrivalRoute[0].PTA = self.Result[-1].Inbound.PlannedArrivalRoute[0].ETA - reqTimeDelta
  64. for i in range(1, len(self.Result[-1].Inbound.PlannedArrivalRoute)):
  65. prev = self.Result[-1].Inbound.PlannedArrivalRoute[i - 1]
  66. current = self.Result[-1].Inbound.PlannedArrivalRoute[i]
  67. current.PTA = prev.PTA + (current.ETA - prev.ETA)
  68. def optimize(self):
  69. if None == self.FcfsDelay:
  70. return False
  71. # define the tracking variables
  72. bestSequence = None
  73. # run the optimization loops
  74. for _ in range(0, self.Configuration.ExplorationRuns):
  75. # select the first inbound
  76. index = random.randint(1, len(self.Nodes)) - 1
  77. candidates = []
  78. for _ in range(0, self.Configuration.AntCount):
  79. # let the ant find a solution
  80. ant = Ant(self.PheromoneMatrix, self.Configuration, self.Nodes)
  81. ant.findSolution(index)
  82. # fallback to check if findSolution was successful
  83. if None == ant.SequenceDelay or None == ant.Sequence:
  84. sys.stderr.write('Invalid ANT run detected!')
  85. break
  86. candidates.append(
  87. [
  88. ant.SequenceDelay,
  89. ant.Sequence
  90. ]
  91. )
  92. # find the best solution in all candidates of this generation
  93. bestCandidate = None
  94. for candidate in candidates:
  95. if None == bestCandidate or candidate[0] < bestCandidate[0]:
  96. bestCandidate = candidate
  97. if None != bestSequence:
  98. dTheta = 1.0 / ((bestSequence[0].total_seconds() / 60.0) or 1.0)
  99. for i in range(1, len(bestSequence[1])):
  100. update = (1.0 - self.Configuration.Epsilon) * self.PheromoneMatrix[bestSequence[1][i - 1], bestSequence[1][i]] + self.Configuration.Epsilon * dTheta
  101. self.PheromoneMatrix[bestSequence[1][i - 1], bestSequence[1][i]] = max(update, self.Configuration.ThetaZero)
  102. # check if we find a new best candidate
  103. if None != bestCandidate:
  104. if None == bestSequence or bestCandidate[0] < bestSequence[0]:
  105. bestSequence = bestCandidate
  106. # found the optimal solution
  107. if 1 >= bestSequence[0].total_seconds():
  108. break
  109. # create the final sequence
  110. self.Result = []
  111. rwyManager = RunwayManager(self.Configuration)
  112. # use the optimized sequence
  113. if None != bestSequence and self.FcfsDelay >= bestSequence[0]:
  114. # create the resulting sequence
  115. self.ResultDelay = bestSequence[0]
  116. # finalize the sequence
  117. for idx in bestSequence[1]:
  118. self.sequenceAndPredictInbound(rwyManager, self.Nodes[idx])
  119. # use the FCFS sequence
  120. else:
  121. self.ResultDelay = self.FcfsDelay
  122. for node in self.Nodes:
  123. self.sequenceAndPredictInbound(rwyManager, node)
  124. return True