Colony.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. #!/usr/bin/env python
  2. from datetime import datetime, timedelta
  3. import numpy as np
  4. import random
  5. import sys
  6. import pytz
  7. from aman.sys.aco.Ant import Ant
  8. from aman.sys.aco.Configuration import Configuration
  9. from aman.sys.aco.RunwayManager import RunwayManager
  10. from aman.types.Inbound import Inbound
  11. # This class implements the ant colony of the following paper:
  12. # https://sci-hub.mksa.top/10.1109/cec.2019.8790135
  13. class Colony:
  14. def associateInbound(rwyManager : RunwayManager, inbound : Inbound, earliestArrivalTime : datetime, useITA : bool):
  15. rwy, eta, _ = rwyManager.selectArrivalRunway(inbound, useITA, earliestArrivalTime)
  16. eta = max(earliestArrivalTime, eta)
  17. inbound.PlannedRunway = rwy
  18. inbound.PlannedStar = inbound.ArrivalCandidates[rwy.Name].Star
  19. inbound.PlannedArrivalRoute = inbound.ArrivalCandidates[rwy.Name].ArrivalRoute
  20. inbound.PlannedArrivalTime = eta
  21. inbound.InitialArrivalTime = inbound.ArrivalCandidates[rwy.Name].InitialArrivalTime
  22. inbound.PlannedTrackmiles = inbound.ArrivalCandidates[rwy.Name].Trackmiles
  23. rwyManager.RunwayInbounds[rwy.Name] = inbound
  24. def calculateInitialCosts(rwyManager : RunwayManager, inbounds, earliestArrivalTime : datetime):
  25. overallDelay = timedelta(seconds = 0)
  26. # assume that the inbounds are sorted in FCFS order
  27. print('FCFS-Sequence:')
  28. for inbound in inbounds:
  29. Colony.associateInbound(rwyManager, inbound, earliestArrivalTime, False)
  30. overallDelay += inbound.PlannedArrivalTime - inbound.InitialArrivalTime
  31. print(' ' + inbound.Callsign + ': ' + inbound.PlannedRunway.Name + ' @ ' + str(inbound.PlannedArrivalTime) +
  32. ' dt=' + str((inbound.PlannedArrivalTime - inbound.InitialArrivalTime).total_seconds()))
  33. return overallDelay
  34. def __init__(self, configuration : Configuration):
  35. self.Configuration = configuration
  36. self.ResultDelay = None
  37. self.Result = None
  38. rwyManager = RunwayManager(self.Configuration)
  39. delay = Colony.calculateInitialCosts(rwyManager, self.Configuration.Inbounds, self.Configuration.EarliestArrivalTime)
  40. self.FcfsDelay = delay
  41. # check if FCFS is the ideal solution
  42. if 0.0 >= delay.total_seconds():
  43. self.Result = self.Configuration.Inbounds
  44. self.ResultDelay = delay
  45. return
  46. # initial value for the optimization
  47. self.Configuration.ThetaZero = 1.0 / (len(self.Configuration.Inbounds) * (delay.total_seconds() / 60.0))
  48. self.PheromoneMatrix = np.ones(( len(self.Configuration.Inbounds), len(self.Configuration.Inbounds) ), dtype=float) * self.Configuration.ThetaZero
  49. def optimize(self):
  50. # FCFS is the best solution
  51. if None != self.Result:
  52. return
  53. # define the tracking variables
  54. bestSequence = None
  55. # run the optimization loops
  56. for _ in range(0, self.Configuration.ExplorationRuns):
  57. # select the first inbound
  58. index = random.randint(1, len(self.Configuration.Inbounds)) - 1
  59. candidates = []
  60. for _ in range(0, self.Configuration.AntCount):
  61. # let the ant find a solution
  62. ant = Ant(self.PheromoneMatrix, self.Configuration)
  63. ant.findSolution(index)
  64. # fallback to check if findSolution was successful
  65. if None == ant.SequenceDelay or None == ant.Sequence or None == ant.SequenceScore:
  66. sys.stderr.write('Invalid ANT run detected!')
  67. sys.exit(-1)
  68. candidates.append(
  69. [
  70. ant.SequenceDelay,
  71. ant.Sequence,
  72. ant.SequenceScore,
  73. ant.SequenceDelay.total_seconds() / ant.SequenceScore
  74. ]
  75. )
  76. # find the best solution in all candidates of this generation
  77. bestCandidate = None
  78. for candidate in candidates:
  79. if None == bestCandidate or candidate[3] < bestCandidate[3]:
  80. bestCandidate = candidate
  81. dTheta = 1.0 / ((candidate[0].total_seconds() / 60.0) or 1.0)
  82. for i in range(1, len(candidate[1])):
  83. update = (1.0 - self.Configuration.Epsilon) * self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] + dTheta
  84. self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] = max(update, self.Configuration.ThetaZero)
  85. # check if we find a new best candidate
  86. if None != bestCandidate:
  87. if None == bestSequence or bestCandidate[0] < bestSequence[0]:
  88. bestSequence = bestCandidate
  89. # create the final sequence
  90. if None != bestSequence:
  91. # create the resulting sequence
  92. self.ResultDelay = bestSequence[0]
  93. self.Result = []
  94. # finalize the sequence
  95. rwyManager = RunwayManager(self.Configuration)
  96. for i in range(0, len(bestSequence[1])):
  97. self.Result.append(self.Configuration.Inbounds[bestSequence[1][i]])
  98. Colony.associateInbound(rwyManager, self.Result[-1], self.Configuration.EarliestArrivalTime, True)