Colony.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. #!/usr/bin/env python
  2. from datetime import datetime as dt
  3. from datetime import datetime, timedelta
  4. import numpy as np
  5. import pytz
  6. import random
  7. import sys
  8. import time
  9. from aman.sys.aco.Ant import Ant
  10. from aman.sys.aco.Configuration import Configuration
  11. from aman.sys.aco.Node import Node
  12. from aman.sys.aco.RunwayManager import RunwayManager
  13. from aman.types.Inbound import Inbound
  14. # This class implements the ant colony of the following paper:
  15. # https://sci-hub.mksa.top/10.1109/cec.2019.8790135
  16. class Colony:
  17. def associateInbound(rwyManager : RunwayManager, node : Node, earliestArrivalTime : datetime, useITA : bool):
  18. rwy, eta, _ = rwyManager.selectArrivalRunway(node, useITA, earliestArrivalTime)
  19. eta = max(earliestArrivalTime, eta)
  20. node.Inbound.PlannedRunway = rwy
  21. node.Inbound.PlannedStar = node.ArrivalCandidates[rwy.Name].Star
  22. node.Inbound.PlannedArrivalRoute = node.ArrivalCandidates[rwy.Name].ArrivalRoute
  23. node.Inbound.PlannedArrivalTime = eta
  24. node.Inbound.InitialArrivalTime = node.ArrivalCandidates[rwy.Name].InitialArrivalTime
  25. node.Inbound.PlannedTrackmiles = node.ArrivalCandidates[rwy.Name].Trackmiles
  26. rwyManager.RunwayInbounds[rwy.Name] = node
  27. def calculateInitialCosts(rwyManager : RunwayManager, nodes, earliestArrivalTime : datetime):
  28. overallDelay = timedelta(seconds = 0)
  29. # assume that the nodes are sorted in FCFS order
  30. for node in nodes:
  31. Colony.associateInbound(rwyManager, node, earliestArrivalTime, False)
  32. overallDelay += node.Inbound.PlannedArrivalTime - node.Inbound.InitialArrivalTime
  33. return overallDelay
  34. def __init__(self, inbounds, configuration : Configuration):
  35. self.Configuration = configuration
  36. self.ResultDelay = None
  37. self.Result = None
  38. self.Nodes = []
  39. # create the new planning instances
  40. currentTime = dt.utcfromtimestamp(int(time.time())).replace(tzinfo = pytz.UTC)
  41. for inbound in inbounds:
  42. self.Nodes.append(Node(inbound, currentTime, self.Configuration.WeatherModel, self.Configuration.NavData, self.Configuration.RunwayConstraints))
  43. rwyManager = RunwayManager(self.Configuration)
  44. delay = Colony.calculateInitialCosts(rwyManager, self.Nodes, self.Configuration.EarliestArrivalTime)
  45. self.FcfsDelay = delay
  46. # run the optimization in every cycle to ensure optimal spacings based on TTG
  47. if 0.0 >= delay.total_seconds():
  48. delay = timedelta(seconds = 1.0)
  49. # initial value for the optimization
  50. self.Configuration.ThetaZero = 1.0 / (len(self.Nodes) * (delay.total_seconds() / 60.0))
  51. self.PheromoneMatrix = np.ones(( len(self.Nodes), len(self.Nodes) ), dtype=float) * self.Configuration.ThetaZero
  52. def optimize(self):
  53. # FCFS is the best solution
  54. if None != self.Result:
  55. return
  56. # define the tracking variables
  57. bestSequence = None
  58. # run the optimization loops
  59. for _ in range(0, self.Configuration.ExplorationRuns):
  60. # select the first inbound
  61. index = random.randint(1, len(self.Nodes)) - 1
  62. candidates = []
  63. for _ in range(0, self.Configuration.AntCount):
  64. # let the ant find a solution
  65. ant = Ant(self.PheromoneMatrix, self.Configuration, self.Nodes)
  66. ant.findSolution(index)
  67. # fallback to check if findSolution was successful
  68. if None == ant.SequenceDelay or None == ant.Sequence or None == ant.SequenceScore:
  69. sys.stderr.write('Invalid ANT run detected!')
  70. sys.exit(-1)
  71. candidates.append(
  72. [
  73. ant.SequenceDelay,
  74. ant.Sequence,
  75. ant.SequenceScore,
  76. ant.SequenceDelay.total_seconds() / ant.SequenceScore
  77. ]
  78. )
  79. # find the best solution in all candidates of this generation
  80. bestCandidate = None
  81. for candidate in candidates:
  82. if None == bestCandidate or candidate[3] < bestCandidate[3]:
  83. bestCandidate = candidate
  84. dTheta = 1.0 / ((candidate[0].total_seconds() / 60.0) or 1.0)
  85. for i in range(1, len(candidate[1])):
  86. update = (1.0 - self.Configuration.Epsilon) * self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] + dTheta
  87. self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] = max(update, self.Configuration.ThetaZero)
  88. # check if we find a new best candidate
  89. if None != bestCandidate:
  90. if None == bestSequence or bestCandidate[0] < bestSequence[0]:
  91. bestSequence = bestCandidate
  92. # create the final sequence
  93. if None != bestSequence:
  94. # create the resulting sequence
  95. self.ResultDelay = bestSequence[0]
  96. self.Result = []
  97. # finalize the sequence
  98. rwyManager = RunwayManager(self.Configuration)
  99. for i in range(0, len(bestSequence[1])):
  100. self.Result.append(self.Nodes[bestSequence[1][i]])
  101. Colony.associateInbound(rwyManager, self.Nodes[bestSequence[1][i]], self.Configuration.EarliestArrivalTime, True)
  102. reqTimeDelta = self.Result[-1].Inbound.InitialArrivalTime - self.Result[-1].Inbound.PlannedArrivalTime
  103. self.Result[-1].Inbound.PlannedArrivalRoute[0].PTA = self.Result[-1].Inbound.PlannedArrivalRoute[0].ETA - reqTimeDelta
  104. for i in range(1, len(self.Result[-1].Inbound.PlannedArrivalRoute)):
  105. prev = self.Result[-1].Inbound.PlannedArrivalRoute[i - 1]
  106. current = self.Result[-1].Inbound.PlannedArrivalRoute[i]
  107. current.PTA = prev.PTA + (current.ETA - prev.ETA)