receive aircraft reports

This commit is contained in:
Sven Czarnian
2021-08-17 17:42:21 +02:00
parent 4e8e8f15e4
commit c7738346bb

View File

@@ -5,11 +5,12 @@ import glob
import os import os
import sys import sys
import threading import threading
import time
import zmq import zmq
import zmq.auth import zmq.auth
from zmq.auth import Authenticator
from aman.com import AircraftReport_pb2
from aman.config import Server from aman.config import Server
class ReceiverThread(threading.Thread): class ReceiverThread(threading.Thread):
@@ -18,17 +19,18 @@ class ReceiverThread(threading.Thread):
self.socket = socket self.socket = socket
def run(self): def run(self):
try:
# create the poller to wait with a timeout to receive data
poller = zmq.Poller()
poller.register(self.socket, zmq.POLLIN)
while True: while True:
# wait 1s to receive data try:
events = poller.poll(1000) msg = self.socket.recv(zmq.NOBLOCK)
if self.socket in events and events[self.socket] == zmq.POLLIN:
msg = self.socket.recv() # parse the received message
finally: report = AircraftReport_pb2.AircraftReport()
report.ParseFromString(msg)
except zmq.ZMQError as error:
if zmq.EAGAIN == error.errno:
time.sleep(0.5)
pass
else:
return return
def threadId(self): def threadId(self):
@@ -51,15 +53,6 @@ class Euroscope:
def __init__(self, config : Server.Server): def __init__(self, config : Server.Server):
self.context = zmq.Context() self.context = zmq.Context()
# initialize the authentication module
authLocation = (
str(config.ClientKeyPath)
)
self.auth = Authenticator(context = self.context)
self.auth.configure_curve(domain='*', location = authLocation)
self.auth.allow('127.0.0.1')
self.auth.start()
# read the certificates # read the certificates
keyPairPath = glob.glob(os.path.join(config.ServerKeyPath, '*.key_secret')) keyPairPath = glob.glob(os.path.join(config.ServerKeyPath, '*.key_secret'))
if 1 != len(keyPairPath): if 1 != len(keyPairPath):
@@ -68,22 +61,26 @@ class Euroscope:
keyPair = zmq.auth.load_certificate(keyPairPath[0]) keyPair = zmq.auth.load_certificate(keyPairPath[0])
# initialize the receiver # initialize the receiver
self.receiverSocket = self.context.socket(zmq.ROUTER) self.receiverSocket = zmq.Socket(self.context, zmq.SUB)
self.receiverSocket.setsockopt(zmq.CURVE_PUBLICKEY, keyPair[0]) self.receiverSocket.setsockopt(zmq.CURVE_PUBLICKEY, keyPair[0])
self.receiverSocket.setsockopt(zmq.CURVE_SECRETKEY, keyPair[1]) self.receiverSocket.setsockopt(zmq.CURVE_SECRETKEY, keyPair[1])
self.receiverSocket.setsockopt(zmq.CURVE_SERVER, True) self.receiverSocket.setsockopt(zmq.CURVE_SERVER, True)
self.receiverSocket.bind('tcp://' + config.Address + ':' + str(config.PortReceiver)) self.receiverSocket.bind('tcp://' + config.Address + ':' + str(config.PortReceiver))
self.receiverSocket.setsockopt(zmq.SUBSCRIBE, b'')
self.receiverThread = ReceiverThread(self.receiverSocket) self.receiverThread = ReceiverThread(self.receiverSocket)
self.receiverThread.start() self.receiverThread.start()
print('Listening at tcp://' + config.Address + ':' + str(config.PortReceiver))
# initialize the notification # initialize the notification
self.notificationSocket = self.context.socket(zmq.DEALER) self.notificationSocket = zmq.Socket(self.context, zmq.PUB)
self.notificationSocket.setsockopt(zmq.CURVE_PUBLICKEY, keyPair[0]) self.notificationSocket.setsockopt(zmq.CURVE_PUBLICKEY, keyPair[0])
self.notificationSocket.setsockopt(zmq.CURVE_SECRETKEY, keyPair[1]) self.notificationSocket.setsockopt(zmq.CURVE_SECRETKEY, keyPair[1])
self.notificationSocket.setsockopt(zmq.CURVE_SERVER, True) self.notificationSocket.setsockopt(zmq.CURVE_SERVER, True)
self.notificationSocket.bind('tcp://' + config.Address + ':' + str(config.PortNotification)) self.notificationSocket.bind('tcp://' + config.Address + ':' + str(config.PortNotification))
print('Publishing at tcp://' + config.Address + ':' + str(config.PortNotification))
def __del__(self): def __del__(self):
self.auth.stop()
self.receiverThread.stopThread() self.receiverThread.stopThread()
self.receiverThread.join() self.receiverThread.join()
self.receiverSocket.close()
self.notificationSocket.close()