1234567891011121314151617181920212223242526272829303132333435363738 |
- import zmq
- from zmq.asyncio import Context
- import zmq.auth
- from zmq.auth.asyncio import AsyncioAuthenticator
- from pathlib import Path
- from typing import Union, Optional
- def EuroscopeReceiver():
-
-
-
-
- def __init__(self, address: str, serverKeyPath: Union[str, Path], clientKeys: Union[str, Path]):
- self.context = Context.instance()
-
- authLocation = (
- str(clientKeys)
- )
- self.auth = AsyncioAuthenticator(context = self.context)
- self.auth.configure_curve(domain='*', location = authLocation)
- self.auth.allow('127.0.0.1')
- self.auth.start()
-
- self.socket = self.context.socket(zmq.REP)
- keys = zmq.auth.load_certificate(serverKeyPath)
- self.socket.setsockopt(zmq.CURVE_PUBLICKEY, keys[0])
- self.socket.setsockopt(zmq.CURVE_SECRETKEY, keys[1])
- self.socket.setsockopt(zmq.CURVE_SERVER, True)
- self.socket.bind(address)
- def __exit__(self, *_exc):
- self.auth.stop()
|