#!/usr/bin/env python import ctypes import glob import os import sys import threading import zmq import zmq.auth from zmq.auth import Authenticator from aman.config import Server class ReceiverThread(threading.Thread): def __init__(self, socket): threading.Thread.__init__(self) self.socket = socket def run(self): try: # create the poller to wait with a timeout to receive data poller = zmq.Poller() poller.register(self.socket, zmq.POLLIN) while True: # wait 1s to receive data events = poller.poll(1000) if self.socket in events and events[self.socket] == zmq.POLLIN: msg = self.socket.recv() finally: return def threadId(self): if hasattr(self, '_thread_id'): return self._thread_id for id, thread in threading._active.items(): if thread is self: return id def stopThread(self): id = self.threadId() res = ctypes.pythonapi.PyThreadState_SetAsyncExc(id, ctypes.py_object(SystemExit)) if 1 < res: ctypes.pythonapi.PyThreadState_SetAsyncExc(id, 0) # @brief Receives and sends messages to EuroScope plugins class Euroscope: # @brief Initializes the ZMQ socket # @param[in] config The server configuration def __init__(self, config : Server.Server): self.context = zmq.Context() # initialize the authentication module authLocation = ( str(config.ClientKeyPath) ) self.auth = Authenticator(context = self.context) self.auth.configure_curve(domain='*', location = authLocation) self.auth.allow('127.0.0.1') self.auth.start() # read the certificates keyPairPath = glob.glob(os.path.join(config.ServerKeyPath, '*.key_secret')) if 1 != len(keyPairPath): sys.stderr.write('No public-private keypair found for the server certificate') sys.exit(-1) keyPair = zmq.auth.load_certificate(keyPairPath[0]) # initialize the receiver self.receiverSocket = self.context.socket(zmq.ROUTER) self.receiverSocket.setsockopt(zmq.CURVE_PUBLICKEY, keyPair[0]) self.receiverSocket.setsockopt(zmq.CURVE_SECRETKEY, keyPair[1]) self.receiverSocket.setsockopt(zmq.CURVE_SERVER, True) self.receiverSocket.bind('tcp://' + config.Address + ':' + str(config.PortReceiver)) self.receiverThread = ReceiverThread(self.receiverSocket) self.receiverThread.start() # initialize the notification self.notificationSocket = self.context.socket(zmq.DEALER) self.notificationSocket.setsockopt(zmq.CURVE_PUBLICKEY, keyPair[0]) self.notificationSocket.setsockopt(zmq.CURVE_SECRETKEY, keyPair[1]) self.notificationSocket.setsockopt(zmq.CURVE_SERVER, True) self.notificationSocket.bind('tcp://' + config.Address + ':' + str(config.PortNotification)) def __del__(self): self.auth.stop() self.receiverThread.stopThread() self.receiverThread.join()