122 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			122 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| 
 | |
| import ctypes
 | |
| import glob
 | |
| import os
 | |
| import sys
 | |
| import time
 | |
| 
 | |
| import zmq
 | |
| import zmq.auth
 | |
| 
 | |
| from aman.com import AircraftReport_pb2
 | |
| from aman.config.Server import Server
 | |
| from threading import Thread, _active
 | |
| 
 | |
| class ReceiverThread(Thread):
 | |
|     def __init__(self, socket, aman):
 | |
|         Thread.__init__(self)
 | |
|         self.socket = socket
 | |
|         self.aman = aman
 | |
| 
 | |
|     def run(self):
 | |
|         while True:
 | |
|             try:
 | |
|                 msg = self.socket.recv(zmq.NOBLOCK)
 | |
| 
 | |
|                 # parse the received message
 | |
|                 report = AircraftReport_pb2.AircraftReport()
 | |
|                 report.ParseFromString(msg)
 | |
| 
 | |
|                 # try to associate the received aircraft to an airport
 | |
|                 self.aman.updateAircraftReport(report)
 | |
| 
 | |
|             except zmq.ZMQError as error:
 | |
|                 if zmq.EAGAIN == error.errno:
 | |
|                     time.sleep(0.5)
 | |
|                     continue
 | |
|                 else:
 | |
|                     return
 | |
| 
 | |
|     def threadId(self):
 | |
|         if hasattr(self, '_thread_id'):
 | |
|             return self._thread_id
 | |
|         for id, thread in _active.items():
 | |
|             if thread is self:
 | |
|                 return id
 | |
| 
 | |
|     def stopThread(self):
 | |
|         id = self.threadId()
 | |
|         res = ctypes.pythonapi.PyThreadState_SetAsyncExc(id, ctypes.py_object(SystemExit))
 | |
|         if 1 < res:
 | |
|             ctypes.pythonapi.PyThreadState_SetAsyncExc(id, 0)
 | |
| 
 | |
| # @brief Receives and sends messages to EuroScope plugins
 | |
| class Euroscope:
 | |
|     def __init__(self):
 | |
|         self.context = None
 | |
|         self.receiverSocket = None
 | |
|         self.receiverThread = None
 | |
|         self.notificationSocket = None
 | |
| 
 | |
|     def __del__(self):
 | |
|         self.release()
 | |
| 
 | |
|     # @brief Initializes the ZMQ socket
 | |
|     # @param[in] config The server configuration
 | |
|     def acquire(self, configPath : str, config : Server, aman):
 | |
|         self.context = zmq.Context()
 | |
| 
 | |
|         # find the key directories
 | |
|         serverKeyPath = os.path.join(os.path.join(configPath, 'keys'), 'server')
 | |
|         if False == os.path.isdir(serverKeyPath):
 | |
|             sys.stderr.write('No directory for the server key found')
 | |
|             sys.exit(-1)
 | |
|         print('Path to the server key: ' + serverKeyPath)
 | |
| 
 | |
|         clientKeyPath = os.path.join(os.path.join(configPath, 'keys'), 'clients')
 | |
|         if False == os.path.isdir(clientKeyPath):
 | |
|             sys.stderr.write('No directory for the client keys found')
 | |
|             sys.exit(-1)
 | |
|         print('Path to the client keys: ' + clientKeyPath)
 | |
| 
 | |
|         # read the certificates
 | |
|         keyPairPath = glob.glob(os.path.join(serverKeyPath, '*.key_secret'))
 | |
|         if 1 != len(keyPairPath):
 | |
|             sys.stderr.write('No public-private keypair found for the server certificate')
 | |
|             sys.exit(-1)
 | |
|         keyPair = zmq.auth.load_certificate(keyPairPath[0])
 | |
| 
 | |
|         # initialize the receiver
 | |
|         self.receiverSocket = zmq.Socket(self.context, zmq.SUB)
 | |
|         self.receiverSocket.setsockopt(zmq.CURVE_PUBLICKEY, keyPair[0])
 | |
|         self.receiverSocket.setsockopt(zmq.CURVE_SECRETKEY, keyPair[1])
 | |
|         self.receiverSocket.setsockopt(zmq.CURVE_SERVER, True)
 | |
|         self.receiverSocket.bind('tcp://' + config.Address + ':' + str(config.PortReceiver))
 | |
|         self.receiverSocket.setsockopt(zmq.SUBSCRIBE, b'')
 | |
|         self.receiverThread = ReceiverThread(self.receiverSocket, aman)
 | |
|         self.receiverThread.start()
 | |
|         print('Listening to tcp://' + config.Address + ':' + str(config.PortReceiver))
 | |
| 
 | |
|         # initialize the notification
 | |
|         self.notificationSocket = zmq.Socket(self.context, zmq.PUB)
 | |
|         self.notificationSocket.setsockopt(zmq.CURVE_PUBLICKEY, keyPair[0])
 | |
|         self.notificationSocket.setsockopt(zmq.CURVE_SECRETKEY, keyPair[1])
 | |
|         self.notificationSocket.setsockopt(zmq.CURVE_SERVER, True)
 | |
|         self.notificationSocket.bind('tcp://' + config.Address + ':' + str(config.PortNotification))
 | |
|         print('Publishing to tcp://' + config.Address + ':' + str(config.PortNotification))
 | |
| 
 | |
|     def release(self):
 | |
|         if None != self.receiverThread:
 | |
|             self.receiverThread.stopThread()
 | |
|             self.receiverThread.join()
 | |
|         self.receiverThread = None
 | |
| 
 | |
|         if None != self.receiverSocket:
 | |
|             self.receiverSocket.close()
 | |
|         self.receiverSocket = None
 | |
| 
 | |
|         if None != self.notificationSocket:
 | |
|             self.notificationSocket.close()
 | |
|         self.notificationSocket = None
 |