From c7738346bb6cbbe60013c6eaf9621aee7fec7fc7 Mon Sep 17 00:00:00 2001 From: Sven Czarnian Date: Tue, 17 Aug 2021 17:42:21 +0200 Subject: [PATCH] receive aircraft reports --- aman/com/Euroscope.py | 45 ++++++++++++++++++++----------------------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/aman/com/Euroscope.py b/aman/com/Euroscope.py index 9255be6..a494653 100644 --- a/aman/com/Euroscope.py +++ b/aman/com/Euroscope.py @@ -5,11 +5,12 @@ import glob import os import sys import threading +import time import zmq import zmq.auth -from zmq.auth import Authenticator +from aman.com import AircraftReport_pb2 from aman.config import Server class ReceiverThread(threading.Thread): @@ -18,18 +19,19 @@ class ReceiverThread(threading.Thread): self.socket = socket def run(self): - try: - # create the poller to wait with a timeout to receive data - poller = zmq.Poller() - poller.register(self.socket, zmq.POLLIN) + while True: + try: + msg = self.socket.recv(zmq.NOBLOCK) - while True: - # wait 1s to receive data - events = poller.poll(1000) - if self.socket in events and events[self.socket] == zmq.POLLIN: - msg = self.socket.recv() - finally: - return + # parse the received message + report = AircraftReport_pb2.AircraftReport() + report.ParseFromString(msg) + except zmq.ZMQError as error: + if zmq.EAGAIN == error.errno: + time.sleep(0.5) + pass + else: + return def threadId(self): if hasattr(self, '_thread_id'): @@ -51,15 +53,6 @@ class Euroscope: def __init__(self, config : Server.Server): self.context = zmq.Context() - # initialize the authentication module - authLocation = ( - str(config.ClientKeyPath) - ) - self.auth = Authenticator(context = self.context) - self.auth.configure_curve(domain='*', location = authLocation) - self.auth.allow('127.0.0.1') - self.auth.start() - # read the certificates keyPairPath = glob.glob(os.path.join(config.ServerKeyPath, '*.key_secret')) if 1 != len(keyPairPath): @@ -68,22 +61,26 @@ class Euroscope: keyPair = zmq.auth.load_certificate(keyPairPath[0]) # initialize the receiver - self.receiverSocket = self.context.socket(zmq.ROUTER) + self.receiverSocket = zmq.Socket(self.context, zmq.SUB) self.receiverSocket.setsockopt(zmq.CURVE_PUBLICKEY, keyPair[0]) self.receiverSocket.setsockopt(zmq.CURVE_SECRETKEY, keyPair[1]) self.receiverSocket.setsockopt(zmq.CURVE_SERVER, True) self.receiverSocket.bind('tcp://' + config.Address + ':' + str(config.PortReceiver)) + self.receiverSocket.setsockopt(zmq.SUBSCRIBE, b'') self.receiverThread = ReceiverThread(self.receiverSocket) self.receiverThread.start() + print('Listening at tcp://' + config.Address + ':' + str(config.PortReceiver)) # initialize the notification - self.notificationSocket = self.context.socket(zmq.DEALER) + self.notificationSocket = zmq.Socket(self.context, zmq.PUB) self.notificationSocket.setsockopt(zmq.CURVE_PUBLICKEY, keyPair[0]) self.notificationSocket.setsockopt(zmq.CURVE_SECRETKEY, keyPair[1]) self.notificationSocket.setsockopt(zmq.CURVE_SERVER, True) self.notificationSocket.bind('tcp://' + config.Address + ':' + str(config.PortNotification)) + print('Publishing at tcp://' + config.Address + ':' + str(config.PortNotification)) def __del__(self): - self.auth.stop() self.receiverThread.stopThread() self.receiverThread.join() + self.receiverSocket.close() + self.notificationSocket.close()