Colony.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. #!/usr/bin/env python
  2. from datetime import datetime, timedelta
  3. import numpy as np
  4. import random
  5. import sys
  6. import pytz
  7. from aman.sys.aco.Ant import Ant
  8. from aman.sys.aco.Configuration import Configuration
  9. from aman.sys.aco.RunwayManager import RunwayManager
  10. from aman.types.Inbound import Inbound
  11. # This class implements the ant colony of the following paper:
  12. # https://sci-hub.mksa.top/10.1109/cec.2019.8790135
  13. class Colony:
  14. def associateInbound(rwyManager : RunwayManager, inbound : Inbound, earliestArrivalTime : datetime, useITA : bool):
  15. rwy, eta, _ = rwyManager.selectArrivalRunway(inbound, useITA, earliestArrivalTime)
  16. eta = max(earliestArrivalTime, eta)
  17. inbound.PlannedRunway = rwy
  18. inbound.PlannedStar = inbound.ArrivalCandidates[rwy.Name].Star
  19. inbound.PlannedArrivalTime = eta
  20. inbound.InitialArrivalTime = inbound.ArrivalCandidates[rwy.Name].InitialArrivalTime
  21. rwyManager.RunwayInbounds[rwy.Name] = inbound
  22. def calculateInitialCosts(rwyManager : RunwayManager, inbounds, earliestArrivalTime : datetime):
  23. overallDelay = timedelta(seconds = 0)
  24. # assume that the inbounds are sorted in FCFS order
  25. print('FCFS-Sequence:')
  26. # TODO remove this after testing and optimization
  27. tmp = datetime.now().replace(tzinfo = pytz.UTC) + timedelta(seconds = 50 * len(inbounds))
  28. for inbound in inbounds:
  29. # TODO remove this after testing and optimization
  30. for runway in inbound.ArrivalCandidates:
  31. inbound.ArrivalCandidates[runway].InitialArrivalTime = tmp
  32. inbound.ArrivalCandidates[runway].EarliestArrivalTime = tmp - inbound.ArrivalCandidates[runway].MaximumTimeToGain
  33. inbound.ArrivalCandidates[runway].LatestArrivalTime = tmp + inbound.ArrivalCandidates[runway].MaximumTimeToLose
  34. tmp += timedelta(seconds = 20)
  35. Colony.associateInbound(rwyManager, inbound, earliestArrivalTime, False)
  36. overallDelay += inbound.PlannedArrivalTime - inbound.InitialArrivalTime
  37. print(' ' + inbound.Callsign + ': ' + inbound.PlannedRunway.Name + ' @ ' + str(inbound.PlannedArrivalTime) +
  38. ' dt=' + str((inbound.PlannedArrivalTime - inbound.InitialArrivalTime).total_seconds()))
  39. return overallDelay
  40. def __init__(self, configuration : Configuration):
  41. self.Configuration = configuration
  42. self.ResultDelay = None
  43. self.Result = None
  44. rwyManager = RunwayManager(self.Configuration)
  45. delay = Colony.calculateInitialCosts(rwyManager, self.Configuration.Inbounds, self.Configuration.EarliestArrivalTime)
  46. self.FcfsDelay = delay
  47. # check if FCFS is the ideal solution
  48. if 0.0 >= delay.total_seconds():
  49. self.Result = self.Configuration.Inbounds
  50. self.ResultDelay = delay
  51. return
  52. # initial value for the optimization
  53. self.Configuration.ThetaZero = 1.0 / (len(self.Configuration.Inbounds) * (delay.total_seconds() / 60.0))
  54. self.PheromoneMatrix = np.ones(( len(self.Configuration.Inbounds), len(self.Configuration.Inbounds) ), dtype=float) * self.Configuration.ThetaZero
  55. def optimize(self):
  56. # FCFS is the best solution
  57. if None != self.Result:
  58. return
  59. # define the tracking variables
  60. bestSequence = None
  61. # run the optimization loops
  62. for _ in range(0, self.Configuration.ExplorationRuns):
  63. # select the first inbound
  64. index = random.randint(1, len(self.Configuration.Inbounds)) - 1
  65. candidates = []
  66. for _ in range(0, self.Configuration.AntCount):
  67. # let the ant find a solution
  68. ant = Ant(self.PheromoneMatrix, self.Configuration)
  69. ant.findSolution(index)
  70. # fallback to check if findSolution was successful
  71. if None == ant.SequenceDelay or None == ant.Sequence or None == ant.SequenceScore:
  72. sys.stderr.write('Invalid ANT run detected!')
  73. sys.exit(-1)
  74. candidates.append(
  75. [
  76. ant.SequenceDelay,
  77. ant.Sequence,
  78. ant.SequenceScore,
  79. ant.SequenceDelay.total_seconds() / ant.SequenceScore
  80. ]
  81. )
  82. # find the best solution in all candidates of this generation
  83. bestCandidate = None
  84. for candidate in candidates:
  85. if None == bestCandidate or candidate[3] < bestCandidate[3]:
  86. bestCandidate = candidate
  87. dTheta = 1.0 / ((candidate[0].total_seconds() / 60.0) or 1.0)
  88. for i in range(1, len(candidate[1])):
  89. update = (1.0 - self.Configuration.Epsilon) * self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] + dTheta
  90. self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] = max(update, self.Configuration.ThetaZero)
  91. # check if we find a new best candidate
  92. if None != bestCandidate:
  93. if None == bestSequence or bestCandidate[0] < bestSequence[0]:
  94. bestSequence = bestCandidate
  95. # create the final sequence
  96. if None != bestSequence:
  97. # create the resulting sequence
  98. self.ResultDelay = bestSequence[0]
  99. self.Result = []
  100. # finalize the sequence
  101. rwyManager = RunwayManager(self.Configuration)
  102. for i in range(0, len(bestSequence[1])):
  103. self.Result.append(self.Configuration.Inbounds[bestSequence[1][i]])
  104. Colony.associateInbound(rwyManager, self.Result[-1], self.Configuration.EarliestArrivalTime, True)