Euroscope.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. #!/usr/bin/env python
  2. import ctypes
  3. import glob
  4. import os
  5. import sys
  6. import threading
  7. import time
  8. import zmq
  9. import zmq.auth
  10. from aman.com import AircraftReport_pb2
  11. from aman.config.Server import Server
  12. class ReceiverThread(threading.Thread):
  13. def __init__(self, socket, aman):
  14. threading.Thread.__init__(self)
  15. self.socket = socket
  16. self.aman = aman
  17. def run(self):
  18. while True:
  19. try:
  20. msg = self.socket.recv(zmq.NOBLOCK)
  21. # parse the received message
  22. report = AircraftReport_pb2.AircraftReport()
  23. report.ParseFromString(msg)
  24. # try to associate the received aircraft to an airport
  25. self.aman.updateAircraftReport(report)
  26. except zmq.ZMQError as error:
  27. if zmq.EAGAIN == error.errno:
  28. time.sleep(0.5)
  29. continue
  30. else:
  31. return
  32. def threadId(self):
  33. if hasattr(self, '_thread_id'):
  34. return self._thread_id
  35. for id, thread in threading._active.items():
  36. if thread is self:
  37. return id
  38. def stopThread(self):
  39. id = self.threadId()
  40. res = ctypes.pythonapi.PyThreadState_SetAsyncExc(id, ctypes.py_object(SystemExit))
  41. if 1 < res:
  42. ctypes.pythonapi.PyThreadState_SetAsyncExc(id, 0)
  43. # @brief Receives and sends messages to EuroScope plugins
  44. class Euroscope:
  45. # @brief Initializes the ZMQ socket
  46. # @param[in] config The server configuration
  47. def __init__(self, configPath : str, config : Server, aman):
  48. self.context = zmq.Context()
  49. # find the key directories
  50. serverKeyPath = os.path.join(os.path.join(configPath, 'keys'), 'server')
  51. if False == os.path.isdir(serverKeyPath):
  52. sys.stderr.write('No directory for the server key found')
  53. sys.exit(-1)
  54. print('Path to the server key: ' + serverKeyPath)
  55. clientKeyPath = os.path.join(os.path.join(configPath, 'keys'), 'clients')
  56. if False == os.path.isdir(clientKeyPath):
  57. sys.stderr.write('No directory for the client keys found')
  58. sys.exit(-1)
  59. print('Path to the client keys: ' + clientKeyPath)
  60. # read the certificates
  61. keyPairPath = glob.glob(os.path.join(serverKeyPath, '*.key_secret'))
  62. if 1 != len(keyPairPath):
  63. sys.stderr.write('No public-private keypair found for the server certificate')
  64. sys.exit(-1)
  65. keyPair = zmq.auth.load_certificate(keyPairPath[0])
  66. # initialize the receiver
  67. self.receiverSocket = zmq.Socket(self.context, zmq.SUB)
  68. self.receiverSocket.setsockopt(zmq.CURVE_PUBLICKEY, keyPair[0])
  69. self.receiverSocket.setsockopt(zmq.CURVE_SECRETKEY, keyPair[1])
  70. self.receiverSocket.setsockopt(zmq.CURVE_SERVER, True)
  71. self.receiverSocket.bind('tcp://' + config.Address + ':' + str(config.PortReceiver))
  72. self.receiverSocket.setsockopt(zmq.SUBSCRIBE, b'')
  73. self.receiverThread = ReceiverThread(self.receiverSocket, aman)
  74. self.receiverThread.start()
  75. print('Listening at tcp://' + config.Address + ':' + str(config.PortReceiver))
  76. # initialize the notification
  77. self.notificationSocket = zmq.Socket(self.context, zmq.PUB)
  78. self.notificationSocket.setsockopt(zmq.CURVE_PUBLICKEY, keyPair[0])
  79. self.notificationSocket.setsockopt(zmq.CURVE_SECRETKEY, keyPair[1])
  80. self.notificationSocket.setsockopt(zmq.CURVE_SERVER, True)
  81. self.notificationSocket.bind('tcp://' + config.Address + ':' + str(config.PortNotification))
  82. print('Publishing at tcp://' + config.Address + ':' + str(config.PortNotification))
  83. def __del__(self):
  84. self.receiverThread.stopThread()
  85. self.receiverThread.join()
  86. self.receiverSocket.close()
  87. self.notificationSocket.close()