Euroscope.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. #!/usr/bin/env python
  2. import ctypes
  3. import glob
  4. import os
  5. import sys
  6. import threading
  7. import time
  8. import zmq
  9. import zmq.auth
  10. from aman.com import AircraftReport_pb2
  11. from aman.config import Server
  12. class ReceiverThread(threading.Thread):
  13. def __init__(self, socket):
  14. threading.Thread.__init__(self)
  15. self.socket = socket
  16. def run(self):
  17. while True:
  18. try:
  19. msg = self.socket.recv(zmq.NOBLOCK)
  20. # parse the received message
  21. report = AircraftReport_pb2.AircraftReport()
  22. report.ParseFromString(msg)
  23. except zmq.ZMQError as error:
  24. if zmq.EAGAIN == error.errno:
  25. time.sleep(0.5)
  26. continue
  27. else:
  28. return
  29. def threadId(self):
  30. if hasattr(self, '_thread_id'):
  31. return self._thread_id
  32. for id, thread in threading._active.items():
  33. if thread is self:
  34. return id
  35. def stopThread(self):
  36. id = self.threadId()
  37. res = ctypes.pythonapi.PyThreadState_SetAsyncExc(id, ctypes.py_object(SystemExit))
  38. if 1 < res:
  39. ctypes.pythonapi.PyThreadState_SetAsyncExc(id, 0)
  40. # @brief Receives and sends messages to EuroScope plugins
  41. class Euroscope:
  42. # @brief Initializes the ZMQ socket
  43. # @param[in] config The server configuration
  44. def __init__(self, config : Server.Server):
  45. self.context = zmq.Context()
  46. # read the certificates
  47. keyPairPath = glob.glob(os.path.join(config.ServerKeyPath, '*.key_secret'))
  48. if 1 != len(keyPairPath):
  49. sys.stderr.write('No public-private keypair found for the server certificate')
  50. sys.exit(-1)
  51. keyPair = zmq.auth.load_certificate(keyPairPath[0])
  52. # initialize the receiver
  53. self.receiverSocket = zmq.Socket(self.context, zmq.SUB)
  54. self.receiverSocket.setsockopt(zmq.CURVE_PUBLICKEY, keyPair[0])
  55. self.receiverSocket.setsockopt(zmq.CURVE_SECRETKEY, keyPair[1])
  56. self.receiverSocket.setsockopt(zmq.CURVE_SERVER, True)
  57. self.receiverSocket.bind('tcp://' + config.Address + ':' + str(config.PortReceiver))
  58. self.receiverSocket.setsockopt(zmq.SUBSCRIBE, b'')
  59. self.receiverThread = ReceiverThread(self.receiverSocket)
  60. self.receiverThread.start()
  61. print('Listening at tcp://' + config.Address + ':' + str(config.PortReceiver))
  62. # initialize the notification
  63. self.notificationSocket = zmq.Socket(self.context, zmq.PUB)
  64. self.notificationSocket.setsockopt(zmq.CURVE_PUBLICKEY, keyPair[0])
  65. self.notificationSocket.setsockopt(zmq.CURVE_SECRETKEY, keyPair[1])
  66. self.notificationSocket.setsockopt(zmq.CURVE_SERVER, True)
  67. self.notificationSocket.bind('tcp://' + config.Address + ':' + str(config.PortNotification))
  68. print('Publishing at tcp://' + config.Address + ':' + str(config.PortNotification))
  69. def __del__(self):
  70. self.receiverThread.stopThread()
  71. self.receiverThread.join()
  72. self.receiverSocket.close()
  73. self.notificationSocket.close()