Euroscope.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. #!/usr/bin/env python
  2. import ctypes
  3. import glob
  4. import os
  5. import sys
  6. import threading
  7. import zmq
  8. from zmq.asyncio import Context
  9. import zmq.auth
  10. from zmq.auth.asyncio import AsyncioAuthenticator
  11. from aman.config import Server
  12. class ReceiverThread(threading.Thread):
  13. def __init__(self, socket):
  14. threading.Thread.__init__(self)
  15. self.socket = socket
  16. def run(self):
  17. try:
  18. while True:
  19. True
  20. finally:
  21. return
  22. def threadId(self):
  23. if hasattr(self, '_thread_id'):
  24. return self._thread_id
  25. for id, thread in threading._active.items():
  26. if thread is self:
  27. return id
  28. def stopThread(self):
  29. id = self.threadId()
  30. res = ctypes.pythonapi.PyThreadState_SetAsyncExc(id, ctypes.py_object(SystemExit))
  31. if 1 < res:
  32. ctypes.pythonapi.PyThreadState_SetAsyncExc(id, 0)
  33. # @brief Receives and sends messages to EuroScope plugins
  34. class Euroscope:
  35. # @brief Initializes the ZMQ socket
  36. # @param[in] config The server configuration
  37. def __init__(self, config : Server.Server):
  38. self.context = Context.instance()
  39. # initialize the authentication module
  40. authLocation = (
  41. str(config.ClientKeyPath)
  42. )
  43. self.auth = AsyncioAuthenticator(context = self.context)
  44. self.auth.configure_curve(domain='*', location = authLocation)
  45. self.auth.allow('127.0.0.1')
  46. self.auth.start()
  47. # read the certificates
  48. keyPairPath = glob.glob(os.path.join(config.ServerKeyPath, '*.key_secret'))
  49. if 1 != len(keyPairPath):
  50. sys.stderr.write('No public-private keypair found for the server certificate')
  51. sys.exit(-1)
  52. keyPair = zmq.auth.load_certificate(keyPairPath[0])
  53. # initialize the receiver
  54. self.receiverSocket = self.context.socket(zmq.ROUTER)
  55. self.receiverSocket.setsockopt(zmq.CURVE_PUBLICKEY, keyPair[0])
  56. self.receiverSocket.setsockopt(zmq.CURVE_SECRETKEY, keyPair[1])
  57. self.receiverSocket.setsockopt(zmq.CURVE_SERVER, True)
  58. self.receiverSocket.bind('tcp://' + config.Address + ':' + str(config.PortReceiver))
  59. self.receiverThread = ReceiverThread(self.receiverSocket)
  60. self.receiverThread.start()
  61. # initialize the notification
  62. self.notificationSocket = self.context.socket(zmq.DEALER)
  63. self.notificationSocket.setsockopt(zmq.CURVE_PUBLICKEY, keyPair[0])
  64. self.notificationSocket.setsockopt(zmq.CURVE_SECRETKEY, keyPair[1])
  65. self.notificationSocket.setsockopt(zmq.CURVE_SERVER, True)
  66. self.notificationSocket.bind('tcp://' + config.Address + ':' + str(config.PortNotification))
  67. def __del__(self):
  68. self.auth.stop()
  69. self.receiverThread.stopThread()
  70. self.receiverThread.join()