#!/usr/bin/env python import zmq from zmq.asyncio import Context import zmq.auth from zmq.auth.asyncio import AsyncioAuthenticator from pathlib import Path from typing import Union, Optional # @brief Receives the information of EuroScope def EuroscopeReceiver(): # @brief Initializes the ZMQ socket # @param[in] address The server address # @param[in] serverKeyPath Path to the server's keypair # @param[in] clientKeys Path to the client's keypairs def __init__(self, address: str, serverKeyPath: Union[str, Path], clientKeys: Union[str, Path]): self.context = Context.instance() # initialize the authentication module authLocation = ( str(clientKeys) ) self.auth = AsyncioAuthenticator(context = self.context) self.auth.configure_curve(domain='*', location = authLocation) self.auth.allow('127.0.0.1') self.auth.start() # initialize the socket self.socket = self.context.socket(zmq.REP) keys = zmq.auth.load_certificate(serverKeyPath) self.socket.setsockopt(zmq.CURVE_PUBLICKEY, keys[0]) self.socket.setsockopt(zmq.CURVE_SECRETKEY, keys[1]) self.socket.setsockopt(zmq.CURVE_SERVER, True) self.socket.bind(address) def __exit__(self, *_exc): self.auth.stop()