39 lines
		
	
	
		
			1.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			39 lines
		
	
	
		
			1.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| 
 | |
| import zmq
 | |
| from zmq.asyncio import Context
 | |
| import zmq.auth
 | |
| from zmq.auth.asyncio import AsyncioAuthenticator
 | |
| 
 | |
| from pathlib import Path
 | |
| from typing import Union, Optional
 | |
| 
 | |
| # @brief Receives the information of EuroScope
 | |
| def EuroscopeReceiver():
 | |
|     # @brief Initializes the ZMQ socket
 | |
|     # @param[in] address The server address
 | |
|     # @param[in] serverKeyPath Path to the server's keypair
 | |
|     # @param[in] clientKeys Path to the client's keypairs
 | |
|     def __init__(self, address: str, serverKeyPath: Union[str, Path], clientKeys: Union[str, Path]):
 | |
|         self.context = Context.instance()
 | |
| 
 | |
|         # initialize the authentication module
 | |
|         authLocation = (
 | |
|             str(clientKeys)
 | |
|         )
 | |
|         self.auth = AsyncioAuthenticator(context = self.context)
 | |
|         self.auth.configure_curve(domain='*', location = authLocation)
 | |
|         self.auth.allow('127.0.0.1')
 | |
|         self.auth.start()
 | |
| 
 | |
|         # initialize the socket
 | |
|         self.socket = self.context.socket(zmq.REP)
 | |
|         keys = zmq.auth.load_certificate(serverKeyPath)
 | |
|         self.socket.setsockopt(zmq.CURVE_PUBLICKEY, keys[0])
 | |
|         self.socket.setsockopt(zmq.CURVE_SECRETKEY, keys[1])
 | |
|         self.socket.setsockopt(zmq.CURVE_SERVER, True)
 | |
|         self.socket.bind(address)
 | |
| 
 | |
|     def __exit__(self, *_exc):
 | |
|         self.auth.stop()
 |