Euroscope.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. #!/usr/bin/env python
  2. import ctypes
  3. import glob
  4. import os
  5. import sys
  6. import threading
  7. import zmq
  8. from zmq.asyncio import Context
  9. import zmq.auth
  10. from zmq.auth.asyncio import AsyncioAuthenticator
  11. from aman.config import Server
  12. class ReceiverThread(threading.Thread):
  13. def __init__(self, socket):
  14. threading.Thread.__init__(self)
  15. self.socket = socket
  16. def __del__(self):
  17. self.stopThread()
  18. def run(self):
  19. try:
  20. while True:
  21. True
  22. finally:
  23. return
  24. def threadId(self):
  25. if hasattr(self, '_thread_id'):
  26. return self._thread_id
  27. for id, thread in threading._active.items():
  28. if thread is self:
  29. return id
  30. def stopThread(self):
  31. id = self.threadId()
  32. res = ctypes.pythonapi.PyThreadState_SetAsyncExc(id, ctypes.py_object(SystemExit))
  33. if 1 < res:
  34. ctypes.pythonapi.PyThreadState_SetAsyncExc(id, 0)
  35. # @brief Receives and sends messages to EuroScope plugins
  36. class Euroscope:
  37. # @brief Initializes the ZMQ socket
  38. # @param[in] config The server configuration
  39. def __init__(self, config : Server.Server):
  40. self.context = Context.instance()
  41. # initialize the authentication module
  42. authLocation = (
  43. str(config.ClientKeyPath)
  44. )
  45. self.auth = AsyncioAuthenticator(context = self.context)
  46. self.auth.configure_curve(domain='*', location = authLocation)
  47. self.auth.allow('127.0.0.1')
  48. self.auth.start()
  49. # read the certificates
  50. keyPairPath = glob.glob(os.path.join(config.ServerKeyPath, '*.key_secret'))
  51. if 1 != len(keyPairPath):
  52. sys.stderr.write('No public-private keypair found for the server certificate')
  53. sys.exit(-1)
  54. keyPair = zmq.auth.load_certificate(keyPairPath[0])
  55. # initialize the receiver
  56. self.receiverSocket = self.context.socket(zmq.ROUTER)
  57. self.receiverSocket.setsockopt(zmq.CURVE_PUBLICKEY, keyPair[0])
  58. self.receiverSocket.setsockopt(zmq.CURVE_SECRETKEY, keyPair[1])
  59. self.receiverSocket.setsockopt(zmq.CURVE_SERVER, True)
  60. self.receiverSocket.bind('tcp://' + config.Address + ':' + str(config.PortReceiver))
  61. self.receiverThread = ReceiverThread(self.receiverSocket)
  62. self.receiverThread.start()
  63. # initialize the notification
  64. self.notificationSocket = self.context.socket(zmq.DEALER)
  65. self.notificationSocket.setsockopt(zmq.CURVE_PUBLICKEY, keyPair[0])
  66. self.notificationSocket.setsockopt(zmq.CURVE_SECRETKEY, keyPair[1])
  67. self.notificationSocket.setsockopt(zmq.CURVE_SERVER, True)
  68. self.notificationSocket.bind('tcp://' + config.Address + ':' + str(config.PortNotification))
  69. def __exit__(self, *_exc):
  70. self.auth.stop()
  71. self.receiverThread.stopThread()
  72. self.receiverThread.join()