Colony.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. #!/usr/bin/env python
  2. from datetime import datetime as dt
  3. from datetime import datetime, timedelta
  4. import numpy as np
  5. import pytz
  6. import random
  7. import sys
  8. import time
  9. from aman.sys.aco.Ant import Ant
  10. from aman.sys.aco.Configuration import Configuration
  11. from aman.sys.aco.Node import Node
  12. from aman.sys.aco.RunwayManager import RunwayManager
  13. from aman.types.Inbound import Inbound
  14. # This class implements the ant colony of the following paper:
  15. # https://sci-hub.mksa.top/10.1109/cec.2019.8790135
  16. class Colony:
  17. def associateInbound(rwyManager : RunwayManager, node : Node, earliestArrivalTime : datetime):
  18. rwy, eta, _ = rwyManager.selectArrivalRunway(node, earliestArrivalTime)
  19. eta = max(earliestArrivalTime, eta)
  20. node.Inbound.PlannedRunway = rwy
  21. node.Inbound.PlannedStar = node.ArrivalCandidates[rwy.Name].Star
  22. node.Inbound.PlannedArrivalRoute = node.ArrivalCandidates[rwy.Name].ArrivalRoute
  23. node.Inbound.PlannedArrivalTime = eta
  24. node.Inbound.InitialArrivalTime = node.ArrivalCandidates[rwy.Name].InitialArrivalTime
  25. node.Inbound.PlannedTrackmiles = node.ArrivalCandidates[rwy.Name].Trackmiles
  26. rwyManager.RunwayInbounds[rwy.Name] = node
  27. def calculateInitialCosts(rwyManager : RunwayManager, nodes, earliestArrivalTime : datetime):
  28. overallDelay = timedelta(seconds = 0)
  29. # assume that the nodes are sorted in FCFS order
  30. for node in nodes:
  31. Colony.associateInbound(rwyManager, node, earliestArrivalTime)
  32. overallDelay += node.Inbound.PlannedArrivalTime - node.Inbound.InitialArrivalTime
  33. return overallDelay
  34. def __init__(self, inbounds, configuration : Configuration):
  35. self.Configuration = configuration
  36. self.ResultDelay = None
  37. self.Result = None
  38. self.Nodes = []
  39. # create the new planning instances
  40. currentTime = dt.utcfromtimestamp(int(time.time())).replace(tzinfo = pytz.UTC)
  41. for inbound in inbounds:
  42. self.Nodes.append(Node(inbound, currentTime, self.Configuration.WeatherModel, self.Configuration.AirportConfiguration, self.Configuration.RunwayConstraints))
  43. rwyManager = RunwayManager(self.Configuration)
  44. delay = Colony.calculateInitialCosts(rwyManager, self.Nodes, self.Configuration.EarliestArrivalTime)
  45. self.FcfsDelay = delay
  46. # run the optimization in every cycle to ensure optimal spacings based on TTG
  47. if 0.0 >= delay.total_seconds():
  48. delay = timedelta(seconds = 1.0)
  49. # initial value for the optimization
  50. self.Configuration.ThetaZero = 1.0 / (len(self.Nodes) * (delay.total_seconds() / 60.0))
  51. self.PheromoneMatrix = np.ones(( len(self.Nodes), len(self.Nodes) ), dtype=float) * self.Configuration.ThetaZero
  52. def sequenceAndPredictInbound(self, rwyManager : RunwayManager, node : Node):
  53. self.Result.append(node)
  54. Colony.associateInbound(rwyManager, node, self.Configuration.EarliestArrivalTime)
  55. reqTimeDelta = self.Result[-1].Inbound.InitialArrivalTime - self.Result[-1].Inbound.PlannedArrivalTime
  56. self.Result[-1].Inbound.PlannedArrivalRoute[0].PTA = self.Result[-1].Inbound.PlannedArrivalRoute[0].ETA - reqTimeDelta
  57. for i in range(1, len(self.Result[-1].Inbound.PlannedArrivalRoute)):
  58. prev = self.Result[-1].Inbound.PlannedArrivalRoute[i - 1]
  59. current = self.Result[-1].Inbound.PlannedArrivalRoute[i]
  60. current.PTA = prev.PTA + (current.ETA - prev.ETA)
  61. def optimize(self):
  62. # FCFS is the best solution
  63. if None != self.Result:
  64. return
  65. # define the tracking variables
  66. bestSequence = None
  67. # run the optimization loops
  68. for _ in range(0, self.Configuration.ExplorationRuns):
  69. # select the first inbound
  70. index = random.randint(1, len(self.Nodes)) - 1
  71. candidates = []
  72. for _ in range(0, self.Configuration.AntCount):
  73. # let the ant find a solution
  74. ant = Ant(self.PheromoneMatrix, self.Configuration, self.Nodes)
  75. ant.findSolution(index)
  76. # fallback to check if findSolution was successful
  77. if None == ant.SequenceDelay or None == ant.Sequence:
  78. sys.stderr.write('Invalid ANT run detected!')
  79. sys.exit(-1)
  80. candidates.append(
  81. [
  82. ant.SequenceDelay,
  83. ant.Sequence
  84. ]
  85. )
  86. # find the best solution in all candidates of this generation
  87. bestCandidate = None
  88. for candidate in candidates:
  89. if None == bestCandidate or candidate[0] < bestCandidate[0]:
  90. bestCandidate = candidate
  91. if None != bestSequence:
  92. dTheta = 1.0 / ((bestSequence[0].total_seconds() / 60.0) or 1.0)
  93. for i in range(1, len(bestSequence[1])):
  94. update = (1.0 - self.Configuration.Epsilon) * self.PheromoneMatrix[bestSequence[1][i - 1], bestSequence[1][i]] + self.Configuration.Epsilon * dTheta
  95. self.PheromoneMatrix[bestSequence[1][i - 1], bestSequence[1][i]] = max(update, self.Configuration.ThetaZero)
  96. # check if we find a new best candidate
  97. if None != bestCandidate:
  98. if None == bestSequence or bestCandidate[0] < bestSequence[0]:
  99. bestSequence = bestCandidate
  100. # found the optimal solution
  101. if 1 >= bestSequence[0].total_seconds():
  102. break
  103. # create the final sequence
  104. self.Result = []
  105. rwyManager = RunwayManager(self.Configuration)
  106. # use the optimized sequence
  107. if None != bestSequence and self.FcfsDelay >= bestSequence[0]:
  108. # create the resulting sequence
  109. self.ResultDelay = bestSequence[0]
  110. # finalize the sequence
  111. for idx in bestSequence[1]:
  112. self.sequenceAndPredictInbound(rwyManager, self.Nodes[idx])
  113. # use the FCFS sequence
  114. else:
  115. self.ResultDelay = self.FcfsDelay
  116. for node in self.Nodes:
  117. self.sequenceAndPredictInbound(node)