Euroscope.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. #!/usr/bin/env python
  2. import ctypes
  3. import glob
  4. import os
  5. import sys
  6. import threading
  7. import zmq
  8. import zmq.auth
  9. from zmq.auth import Authenticator
  10. from aman.config import Server
  11. class ReceiverThread(threading.Thread):
  12. def __init__(self, socket):
  13. threading.Thread.__init__(self)
  14. self.socket = socket
  15. def run(self):
  16. try:
  17. # create the poller to wait with a timeout to receive data
  18. poller = zmq.Poller()
  19. poller.register(self.socket, zmq.POLLIN)
  20. while True:
  21. # wait 1s to receive data
  22. events = poller.poll(1000)
  23. if self.socket in events and events[self.socket] == zmq.POLLIN:
  24. msg = self.socket.recv()
  25. finally:
  26. return
  27. def threadId(self):
  28. if hasattr(self, '_thread_id'):
  29. return self._thread_id
  30. for id, thread in threading._active.items():
  31. if thread is self:
  32. return id
  33. def stopThread(self):
  34. id = self.threadId()
  35. res = ctypes.pythonapi.PyThreadState_SetAsyncExc(id, ctypes.py_object(SystemExit))
  36. if 1 < res:
  37. ctypes.pythonapi.PyThreadState_SetAsyncExc(id, 0)
  38. # @brief Receives and sends messages to EuroScope plugins
  39. class Euroscope:
  40. # @brief Initializes the ZMQ socket
  41. # @param[in] config The server configuration
  42. def __init__(self, config : Server.Server):
  43. self.context = zmq.Context()
  44. # initialize the authentication module
  45. authLocation = (
  46. str(config.ClientKeyPath)
  47. )
  48. self.auth = Authenticator(context = self.context)
  49. self.auth.configure_curve(domain='*', location = authLocation)
  50. self.auth.allow('127.0.0.1')
  51. self.auth.start()
  52. # read the certificates
  53. keyPairPath = glob.glob(os.path.join(config.ServerKeyPath, '*.key_secret'))
  54. if 1 != len(keyPairPath):
  55. sys.stderr.write('No public-private keypair found for the server certificate')
  56. sys.exit(-1)
  57. keyPair = zmq.auth.load_certificate(keyPairPath[0])
  58. # initialize the receiver
  59. self.receiverSocket = self.context.socket(zmq.ROUTER)
  60. self.receiverSocket.setsockopt(zmq.CURVE_PUBLICKEY, keyPair[0])
  61. self.receiverSocket.setsockopt(zmq.CURVE_SECRETKEY, keyPair[1])
  62. self.receiverSocket.setsockopt(zmq.CURVE_SERVER, True)
  63. self.receiverSocket.bind('tcp://' + config.Address + ':' + str(config.PortReceiver))
  64. self.receiverThread = ReceiverThread(self.receiverSocket)
  65. self.receiverThread.start()
  66. # initialize the notification
  67. self.notificationSocket = self.context.socket(zmq.DEALER)
  68. self.notificationSocket.setsockopt(zmq.CURVE_PUBLICKEY, keyPair[0])
  69. self.notificationSocket.setsockopt(zmq.CURVE_SECRETKEY, keyPair[1])
  70. self.notificationSocket.setsockopt(zmq.CURVE_SERVER, True)
  71. self.notificationSocket.bind('tcp://' + config.Address + ':' + str(config.PortNotification))
  72. def __del__(self):
  73. self.auth.stop()
  74. self.receiverThread.stopThread()
  75. self.receiverThread.join()