Colony.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. #!/usr/bin/env python
  2. from datetime import datetime, timedelta
  3. import numpy as np
  4. import random
  5. import sys
  6. import pytz
  7. from aman.sys.aco.Ant import Ant
  8. from aman.sys.aco.Configuration import Configuration
  9. from aman.sys.aco.RunwayManager import RunwayManager
  10. from aman.types.Inbound import Inbound
  11. # This class implements the ant colony of the following paper:
  12. # https://sci-hub.mksa.top/10.1109/cec.2019.8790135
  13. class Colony:
  14. def associateInbound(rwyManager : RunwayManager, inbound : Inbound, earliestArrivalTime : datetime, useITA : bool):
  15. rwy, eta, _ = rwyManager.selectArrivalRunway(inbound, useITA, earliestArrivalTime)
  16. eta = max(earliestArrivalTime, eta)
  17. inbound.PlannedRunway = rwy
  18. inbound.PlannedStar = inbound.ArrivalCandidates[rwy.Name].Star
  19. inbound.PlannedArrivalTime = eta
  20. inbound.InitialArrivalTime = inbound.ArrivalCandidates[rwy.Name].InitialArrivalTime
  21. rwyManager.RunwayInbounds[rwy.Name] = inbound
  22. def calculateInitialCosts(rwyManager : RunwayManager, inbounds, earliestArrivalTime : datetime):
  23. overallDelay = timedelta(seconds = 0)
  24. # assume that the inbounds are sorted in FCFS order
  25. print('FCFS-Sequence:')
  26. tmp = datetime.now().replace(tzinfo = pytz.UTC) + timedelta(seconds = 50 * len(inbounds))
  27. for inbound in inbounds:
  28. for runway in inbound.ArrivalCandidates:
  29. inbound.ArrivalCandidates[runway].InitialArrivalTime = tmp
  30. inbound.ArrivalCandidates[runway].EarliestArrivalTime = tmp
  31. tmp += timedelta(seconds = 20)
  32. Colony.associateInbound(rwyManager, inbound, earliestArrivalTime, False)
  33. overallDelay += inbound.PlannedArrivalTime - inbound.InitialArrivalTime
  34. print(' ' + inbound.Report.aircraft.callsign + ': ' + inbound.PlannedRunway.Name + ' @ ' + str(inbound.PlannedArrivalTime) +
  35. ' dt=' + str((inbound.PlannedArrivalTime - inbound.InitialArrivalTime).total_seconds()))
  36. return overallDelay
  37. def __init__(self, configuration : Configuration):
  38. self.Configuration = configuration
  39. self.ResultDelay = None
  40. self.Result = None
  41. rwyManager = RunwayManager(self.Configuration)
  42. delay = Colony.calculateInitialCosts(rwyManager, self.Configuration.Inbounds, self.Configuration.EarliestArrivalTime)
  43. self.FcfsDelay = delay
  44. # check if FCFS is the ideal solution
  45. if 0.0 >= delay.total_seconds():
  46. self.Result = self.Configuration.Inbounds
  47. self.ResultDelay = delay
  48. return
  49. # initial value for the optimization
  50. self.Configuration.ThetaZero = 1.0 / (len(self.Configuration.Inbounds) * (delay.total_seconds() / 60.0))
  51. self.PheromoneMatrix = np.ones(( len(self.Configuration.Inbounds), len(self.Configuration.Inbounds) ), dtype=float) * self.Configuration.ThetaZero
  52. def optimize(self):
  53. # FCFS is the best solution
  54. if None != self.Result:
  55. return
  56. # define the tracking variables
  57. bestSequence = None
  58. # run the optimization loops
  59. for _ in range(0, self.Configuration.ExplorationRuns):
  60. # select the first inbound
  61. index = random.randint(1, len(self.Configuration.Inbounds)) - 1
  62. candidates = []
  63. for _ in range(0, self.Configuration.AntCount):
  64. # let the ant find a solution
  65. ant = Ant(self.PheromoneMatrix, self.Configuration)
  66. ant.findSolution(index)
  67. # fallback to check if findSolution was successful
  68. if None == ant.SequenceDelay or None == ant.Sequence:
  69. sys.stderr.write('Invalid ANT run detected!')
  70. sys.exit(-1)
  71. candidates.append([ ant.SequenceDelay, ant.Sequence ])
  72. # find the best solution in all candidates of this generation
  73. bestCandidate = None
  74. for candidate in candidates:
  75. if None == bestCandidate or candidate[0] < bestCandidate[0]:
  76. bestCandidate = candidate
  77. dTheta = 1.0 / (candidate[0].total_seconds() / 60.0)
  78. for i in range(1, len(candidate[1])):
  79. update = (1.0 - self.Configuration.Epsilon) * self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] + dTheta
  80. self.PheromoneMatrix[candidate[1][i - 1], candidate[1][i]] = max(update, self.Configuration.ThetaZero)
  81. # check if we find a new best candidate
  82. if None != bestCandidate:
  83. if None == bestSequence or bestCandidate[0] < bestSequence[0]:
  84. bestSequence = bestCandidate
  85. print(self.PheromoneMatrix)
  86. # create the final sequence
  87. if None != bestSequence:
  88. # create the resulting sequence
  89. self.ResultDelay = bestSequence[0]
  90. self.Result = []
  91. # finalize the sequence
  92. rwyManager = RunwayManager(self.Configuration)
  93. for i in range(0, len(bestSequence[1])):
  94. self.Result.append(self.Configuration.Inbounds[bestSequence[1][i]])
  95. Colony.associateInbound(rwyManager, self.Result[-1], self.Configuration.EarliestArrivalTime, True)