diff --git a/include/aman/com/AircraftScheduler.h b/include/aman/com/BackendReceiver.h similarity index 61% rename from include/aman/com/AircraftScheduler.h rename to include/aman/com/BackendReceiver.h index 68c32e4..680945e 100644 --- a/include/aman/com/AircraftScheduler.h +++ b/include/aman/com/BackendReceiver.h @@ -1,6 +1,6 @@ /* * @brief Defines the aircraft scheduling notifier module to receive plans from the backend - * @file aman/com/AircraftScheduler.h + * @file aman/com/BackendReceiver.h * @author Sven Czarnian * @copyright Copyright 2021 Sven Czarnian * @license This project is published under the GNU General Public License v3 (GPLv3) @@ -9,12 +9,13 @@ #pragma once #include +#include #include #include #pragma warning(push, 0) -#include "protobuf/AircraftSchedule.pb.h" +#include "protobuf/Communication.pb.h" #pragma warning(pop) namespace aman { @@ -22,13 +23,15 @@ namespace aman { * @brief Defines the aircraft scheduling notification class to receive scheduling sequences * @ingroup com */ - class AircraftScheduler { + class BackendReceiver { private: - std::unique_ptr m_socket; - std::thread m_receiverThread; - std::atomic_bool m_stopReceiver; + std::unique_ptr m_socket; + std::thread m_receiverThread; + std::atomic_bool m_stopReceiver; + std::list> m_sequences; + std::mutex m_sequencesLock; - AircraftScheduler() noexcept; + BackendReceiver() noexcept; template bool setSocketKey(const std::string& key, T entry) { @@ -40,13 +43,14 @@ namespace aman { return false; } } + void receiveSequence(); void run(); public: - AircraftScheduler(const AircraftScheduler&) = delete; - AircraftScheduler(AircraftScheduler&&) = delete; - AircraftScheduler& operator=(const AircraftScheduler&) = delete; - AircraftScheduler& operator=(AircraftScheduler&&) = delete; + BackendReceiver(const BackendReceiver&) = delete; + BackendReceiver(BackendReceiver&&) = delete; + BackendReceiver& operator=(const BackendReceiver&) = delete; + BackendReceiver& operator=(BackendReceiver&&) = delete; /** * @brief Initializes the aircraft scheduler @@ -64,14 +68,14 @@ namespace aman { */ bool initialized() const noexcept; /** - * @brief Receives a new scheduling message - * @return The new scheduling message or a nullptr + * @brief Returns the current sequence of the receiver queue + * @return The sequence queue */ - std::unique_ptr receive(); + std::shared_ptr receive(); /** * @brief Returns the scheduling instance * @return The system-wide instance */ - static AircraftScheduler& instance() noexcept; + static BackendReceiver& instance() noexcept; }; } diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5345ccc..33f0e79 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -32,8 +32,8 @@ SET(SOURCE_FILES ) SET(SOURCE_COM_FILES + com/BackendReceiver.cpp com/BackendNotification.cpp - com/AircraftScheduler.cpp com/ZmqContext.cpp com/ZmqContext.h ) @@ -51,8 +51,8 @@ SET(SOURCE_FILES_RES ) SET(INCLUDE_COM_FILES + ${CMAKE_SOURCE_DIR}/include/aman/com/BackendReceiver.h ${CMAKE_SOURCE_DIR}/include/aman/com/BackendNotification.h - ${CMAKE_SOURCE_DIR}/include/aman/com/AircraftScheduler.h ) SET(INCLUDE_CONFIG_FILES diff --git a/src/PlugIn.cpp b/src/PlugIn.cpp index 206db73..582b04b 100644 --- a/src/PlugIn.cpp +++ b/src/PlugIn.cpp @@ -15,7 +15,7 @@ #include #include -#include +#include #include #include #include @@ -62,20 +62,20 @@ PlugIn::PlugIn() : ZmqContext::instance().initialize(); - if (false == AircraftReporter::instance().initialize(this->m_configuration)) { + if (false == BackendNotification::instance().initialize(this->m_configuration)) { this->DisplayUserMessage(PLUGIN_NAME, "ERROR", "Unable to initialize the reporter-connection to the backend", true, true, true, true, true); return; } - if (false == AircraftScheduler::instance().initialize(this->m_configuration)) { + if (false == BackendReceiver::instance().initialize(this->m_configuration)) { this->DisplayUserMessage(PLUGIN_NAME, "ERROR", "Unable to initialize the scheduling-connection to the backend", true, true, true, true, true); - AircraftReporter::instance().deinitialize(); + BackendNotification::instance().deinitialize(); return; } } PlugIn::~PlugIn() noexcept { - AircraftScheduler::instance().deinitialize(); + BackendReceiver::instance().deinitialize(); BackendNotification::instance().deinitialize(); ZmqContext::instance().deinitialize(); google::protobuf::ShutdownProtobufLibrary(); diff --git a/src/com/AircraftScheduler.cpp b/src/com/BackendReceiver.cpp similarity index 63% rename from src/com/AircraftScheduler.cpp rename to src/com/BackendReceiver.cpp index b756da8..7576d7e 100644 --- a/src/com/AircraftScheduler.cpp +++ b/src/com/BackendReceiver.cpp @@ -2,26 +2,28 @@ * Author: * Sven Czarnian * Brief: - * Implements the scheduling notification + * Implements the backend receiver * Copyright: * 2021 Sven Czarnian * License: * GNU General Public License v3 (GPLv3) */ -#include +#include #include "ZmqContext.h" using namespace aman; using namespace std::chrono; -AircraftScheduler::AircraftScheduler() noexcept : +BackendReceiver::BackendReceiver() noexcept : m_socket(), m_receiverThread(), - m_stopReceiver(false) { } + m_stopReceiver(false), + m_sequences(), + m_sequencesLock() { } -bool AircraftScheduler::initialize(const Communication& configuration) { +bool BackendReceiver::initialize(const Communication& configuration) { if (nullptr != this->m_socket) return true; if (false == configuration.valid) @@ -47,12 +49,12 @@ bool AircraftScheduler::initialize(const Communication& configuration) { } this->m_stopReceiver = false; - this->m_receiverThread = std::thread(&AircraftScheduler::run, this); + this->m_receiverThread = std::thread(&BackendReceiver::run, this); return true; } -bool AircraftScheduler::deinitialize() { +bool BackendReceiver::deinitialize() { if (nullptr == this->m_socket) return true; @@ -64,42 +66,56 @@ bool AircraftScheduler::deinitialize() { return true; } -bool AircraftScheduler::initialized() const noexcept { +bool BackendReceiver::initialized() const noexcept { return nullptr != this->m_socket; } -std::unique_ptr AircraftScheduler::receive() { +void BackendReceiver::receiveSequence() { zmq::message_t message; if (nullptr == this->m_socket) - return std::unique_ptr(); + return; try { auto result = this->m_socket->recv(message, zmq::recv_flags::dontwait); if (false == result.has_value() || 0 == result.value()) - return std::unique_ptr(); + return; } catch (zmq::error_t&) { - return std::unique_ptr(); + return; } - std::unique_ptr retval = std::make_unique(); + std::unique_ptr retval = std::make_unique(); retval->ParseFromString(reinterpret_cast(message.data())); - return retval; + std::lock_guard guard(this->m_sequencesLock); + this->m_sequences.push_back(std::move(retval)); } -void AircraftScheduler::run() { +void BackendReceiver::run() { while (false == this->m_stopReceiver) { auto message = this->receive(); if (nullptr == message) { std::this_thread::sleep_for(500ms); continue; } + + this->receiveSequence(); } } -AircraftScheduler& AircraftScheduler::instance() noexcept { - static AircraftScheduler __instance; +std::shared_ptr BackendReceiver::receive() { + std::lock_guard guard(this->m_sequencesLock); + if (0 != this->m_sequences.size()) { + auto retval = this->m_sequences.front(); + this->m_sequences.pop_front(); + return retval; + } + + return nullptr; +} + +BackendReceiver& BackendReceiver::instance() noexcept { + static BackendReceiver __instance; return __instance; }