Colony.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. #!/usr/bin/env python
  2. from datetime import datetime, timedelta
  3. import numpy as np
  4. import random
  5. import sys
  6. import pytz
  7. from aman.sys.aco.Ant import Ant
  8. from aman.sys.aco.Configuration import Configuration
  9. from aman.sys.aco.RunwayManager import RunwayManager
  10. from aman.types.Inbound import Inbound
  11. # This class implements the ant colony of the following paper:
  12. # https://sci-hub.mksa.top/10.1109/cec.2019.8790135
  13. class Colony:
  14. def associateInbound(rwyManager : RunwayManager, inbound : Inbound, earliestArrivalTime : datetime, useITA : bool):
  15. rwy, eta, _ = rwyManager.selectArrivalRunway(inbound, useITA, earliestArrivalTime)
  16. eta = max(earliestArrivalTime, eta)
  17. inbound.PlannedRunway = rwy
  18. inbound.PlannedStar = inbound.ArrivalCandidates[rwy.Name].Star
  19. inbound.PlannedArrivalTime = eta
  20. inbound.InitialArrivalTime = inbound.ArrivalCandidates[rwy.Name].InitialArrivalTime
  21. rwyManager.RunwayInbounds[rwy.Name] = inbound
  22. def calculateInitialCosts(rwyManager : RunwayManager, inbounds, earliestArrivalTime : datetime):
  23. overallDelay = timedelta(seconds = 0)
  24. # assume that the inbounds are sorted in FCFS order
  25. print('FCFS-Sequence:')
  26. for inbound in inbounds:
  27. Colony.associateInbound(rwyManager, inbound, earliestArrivalTime, False)
  28. overallDelay += inbound.PlannedArrivalTime - inbound.InitialArrivalTime
  29. print(' ' + inbound.Callsign + ': ' + inbound.PlannedRunway.Name + ' @ ' + str(inbound.PlannedArrivalTime) +
  30. ' dt=' + str((inbound.PlannedArrivalTime - inbound.InitialArrivalTime).total_seconds()))
  31. return overallDelay
  32. def __init__(self, configuration : Configuration):
  33. self.Configuration = configuration
  34. self.ResultDelay = None
  35. self.Result = None
  36. rwyManager = RunwayManager(self.Configuration)
  37. delay = Colony.calculateInitialCosts(rwyManager, self.Configuration.Inbounds, self.Configuration.EarliestArrivalTime)
  38. self.FcfsDelay = delay
  39. # check if FCFS is the ideal solution
  40. if 0.0 >= delay.total_seconds():
  41. self.Result = self.Configuration.Inbounds
  42. self.ResultDelay = delay
  43. return
  44. # initial value for the optimization
  45. self.Configuration.ThetaZero = 1.0 / (len(self.Configuration.Inbounds) * (delay.total_seconds() / 60.0))
  46. self.PheromoneMatrix = np.ones(( len(self.Configuration.Inbounds), len(self.Configuration.Inbounds) ), dtype=float) * self.Configuration.ThetaZero
  47. def optimize(self):
  48. # FCFS is the best solution
  49. if None != self.Result:
  50. return
  51. # define the tracking variables
  52. bestSequence = None
  53. # run the optimization loops
  54. for _ in range(0, self.Configuration.ExplorationRuns):
  55. # select the first inbound
  56. index = random.randint(1, len(self.Configuration.Inbounds)) - 1
  57. candidates = []
  58. for _ in range(0, self.Configuration.AntCount):
  59. # let the ant find a solution
  60. ant = Ant(self.PheromoneMatrix, self.Configuration)
  61. ant.findSolution(index)
  62. # fallback to check if findSolution was successful
  63. if None == ant.SequenceDelay or None == ant.Sequence or None == ant.SequenceScore:
  64. sys.stderr.write('Invalid ANT run detected!')
  65. sys.exit(-1)
  66. candidates.append(
  67. [
  68. ant.SequenceDelay,
  69. ant.Sequence,
  70. ant.SequenceScore,
  71. ant.SequenceDelay.total_seconds() / ant.SequenceScore
  72. ]
  73. )
  74. # find the best solution in all candidates of this generation
  75. bestCandidate = None
  76. for candidate in candidates:
  77. if None == bestCandidate or candidate[3] < bestCandidate[3]:
  78. bestCandidate = candidate
  79. dTheta = 1.0 / ((candidate[0].total_seconds() / 60.0) or 1.0)
  80. for i in range(1, len(candidate[1])):
  81. update = (1.0 - self.Configuration.Epsilon) * self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] + dTheta
  82. self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] = max(update, self.Configuration.ThetaZero)
  83. # check if we find a new best candidate
  84. if None != bestCandidate:
  85. if None == bestSequence or bestCandidate[0] < bestSequence[0]:
  86. bestSequence = bestCandidate
  87. # create the final sequence
  88. if None != bestSequence:
  89. # create the resulting sequence
  90. self.ResultDelay = bestSequence[0]
  91. self.Result = []
  92. # finalize the sequence
  93. rwyManager = RunwayManager(self.Configuration)
  94. for i in range(0, len(bestSequence[1])):
  95. self.Result.append(self.Configuration.Inbounds[bestSequence[1][i]])
  96. Colony.associateInbound(rwyManager, self.Result[-1], self.Configuration.EarliestArrivalTime, True)