rename the sequence receiver
This commit is contained in:
		| @@ -1,6 +1,6 @@ | ||||
| /*
 | ||||
|  * @brief Defines the aircraft scheduling notifier module to receive plans from the backend | ||||
|  * @file aman/com/AircraftScheduler.h | ||||
|  * @file aman/com/BackendReceiver.h | ||||
|  * @author Sven Czarnian <devel@svcz.de> | ||||
|  * @copyright Copyright 2021 Sven Czarnian | ||||
|  * @license This project is published under the GNU General Public License v3 (GPLv3) | ||||
| @@ -9,12 +9,13 @@ | ||||
| #pragma once | ||||
| 
 | ||||
| #include <thread> | ||||
| #include <memory> | ||||
| #include <zmq.hpp> | ||||
| 
 | ||||
| #include <aman/types/Communication.h> | ||||
| 
 | ||||
| #pragma warning(push, 0) | ||||
| #include "protobuf/AircraftSchedule.pb.h" | ||||
| #include "protobuf/Communication.pb.h" | ||||
| #pragma warning(pop) | ||||
| 
 | ||||
| namespace aman { | ||||
| @@ -22,13 +23,15 @@ namespace aman { | ||||
|      * @brief Defines the aircraft scheduling notification class to receive scheduling sequences | ||||
|      * @ingroup com | ||||
|      */ | ||||
|     class AircraftScheduler { | ||||
|     class BackendReceiver { | ||||
|     private: | ||||
|         std::unique_ptr<zmq::socket_t> m_socket; | ||||
|         std::thread                    m_receiverThread; | ||||
|         std::atomic_bool               m_stopReceiver; | ||||
|         std::unique_ptr<zmq::socket_t>                     m_socket; | ||||
|         std::thread                                        m_receiverThread; | ||||
|         std::atomic_bool                                   m_stopReceiver; | ||||
|         std::list<std::shared_ptr<aman::AircraftSequence>> m_sequences; | ||||
|         std::mutex                                         m_sequencesLock; | ||||
| 
 | ||||
|         AircraftScheduler() noexcept; | ||||
|         BackendReceiver() noexcept; | ||||
| 
 | ||||
|         template <typename T> | ||||
|         bool setSocketKey(const std::string& key, T entry) { | ||||
| @@ -40,13 +43,14 @@ namespace aman { | ||||
|                 return false; | ||||
|             } | ||||
|         } | ||||
|         void receiveSequence(); | ||||
|         void run(); | ||||
| 
 | ||||
|     public: | ||||
|         AircraftScheduler(const AircraftScheduler&) = delete; | ||||
|         AircraftScheduler(AircraftScheduler&&) = delete; | ||||
|         AircraftScheduler& operator=(const AircraftScheduler&) = delete; | ||||
|         AircraftScheduler& operator=(AircraftScheduler&&) = delete; | ||||
|         BackendReceiver(const BackendReceiver&) = delete; | ||||
|         BackendReceiver(BackendReceiver&&) = delete; | ||||
|         BackendReceiver& operator=(const BackendReceiver&) = delete; | ||||
|         BackendReceiver& operator=(BackendReceiver&&) = delete; | ||||
| 
 | ||||
|         /**
 | ||||
|          * @brief Initializes the aircraft scheduler | ||||
| @@ -64,14 +68,14 @@ namespace aman { | ||||
|          */ | ||||
|         bool initialized() const noexcept; | ||||
|         /**
 | ||||
|          * @brief Receives a new scheduling message | ||||
|          * @return The new scheduling message or a nullptr | ||||
|          * @brief Returns the current sequence of the receiver queue | ||||
|          * @return The sequence queue | ||||
|          */ | ||||
|         std::unique_ptr<aman::AircraftSchedule> receive(); | ||||
|         std::shared_ptr<aman::AircraftSequence> receive(); | ||||
|         /**
 | ||||
|          * @brief Returns the scheduling instance | ||||
|          * @return The system-wide instance | ||||
|          */ | ||||
|         static AircraftScheduler& instance() noexcept; | ||||
|         static BackendReceiver& instance() noexcept; | ||||
|     }; | ||||
| } | ||||
| @@ -32,8 +32,8 @@ SET(SOURCE_FILES | ||||
| ) | ||||
|  | ||||
| SET(SOURCE_COM_FILES | ||||
|     com/BackendReceiver.cpp | ||||
|     com/BackendNotification.cpp | ||||
|     com/AircraftScheduler.cpp | ||||
|     com/ZmqContext.cpp | ||||
|     com/ZmqContext.h | ||||
| ) | ||||
| @@ -51,8 +51,8 @@ SET(SOURCE_FILES_RES | ||||
| ) | ||||
|  | ||||
| SET(INCLUDE_COM_FILES | ||||
|     ${CMAKE_SOURCE_DIR}/include/aman/com/BackendReceiver.h | ||||
|     ${CMAKE_SOURCE_DIR}/include/aman/com/BackendNotification.h | ||||
|     ${CMAKE_SOURCE_DIR}/include/aman/com/AircraftScheduler.h | ||||
| ) | ||||
|  | ||||
| SET(INCLUDE_CONFIG_FILES | ||||
|   | ||||
| @@ -15,7 +15,7 @@ | ||||
| #include <Shlwapi.h> | ||||
| #include <Windows.h> | ||||
|  | ||||
| #include <aman/com/AircraftScheduler.h> | ||||
| #include <aman/com/BackendReceiver.h> | ||||
| #include <aman/com/BackendNotification.h> | ||||
| #include <aman/config/CommunicationFileFormat.h> | ||||
| #include <aman/config/IdentifierFileFormat.h> | ||||
| @@ -62,20 +62,20 @@ PlugIn::PlugIn() : | ||||
|  | ||||
|     ZmqContext::instance().initialize(); | ||||
|  | ||||
|     if (false == AircraftReporter::instance().initialize(this->m_configuration)) { | ||||
|     if (false == BackendNotification::instance().initialize(this->m_configuration)) { | ||||
|         this->DisplayUserMessage(PLUGIN_NAME, "ERROR", "Unable to initialize the reporter-connection to the backend", true, true, true, true, true); | ||||
|         return; | ||||
|     } | ||||
|  | ||||
|     if (false == AircraftScheduler::instance().initialize(this->m_configuration)) { | ||||
|     if (false == BackendReceiver::instance().initialize(this->m_configuration)) { | ||||
|         this->DisplayUserMessage(PLUGIN_NAME, "ERROR", "Unable to initialize the scheduling-connection to the backend", true, true, true, true, true); | ||||
|         AircraftReporter::instance().deinitialize(); | ||||
|         BackendNotification::instance().deinitialize(); | ||||
|         return; | ||||
|     } | ||||
| } | ||||
|  | ||||
| PlugIn::~PlugIn() noexcept { | ||||
|     AircraftScheduler::instance().deinitialize(); | ||||
|     BackendReceiver::instance().deinitialize(); | ||||
|     BackendNotification::instance().deinitialize(); | ||||
|     ZmqContext::instance().deinitialize(); | ||||
|     google::protobuf::ShutdownProtobufLibrary(); | ||||
|   | ||||
| @@ -2,26 +2,28 @@ | ||||
|  * Author: | ||||
|  *   Sven Czarnian <devel@svcz.de> | ||||
|  * Brief: | ||||
|  *   Implements the scheduling notification | ||||
|  *   Implements the backend receiver | ||||
|  * Copyright: | ||||
|  *   2021 Sven Czarnian | ||||
|  * License: | ||||
|  *   GNU General Public License v3 (GPLv3) | ||||
|  */ | ||||
| 
 | ||||
| #include <aman/com/AircraftScheduler.h> | ||||
| #include <aman/com/BackendReceiver.h> | ||||
| 
 | ||||
| #include "ZmqContext.h" | ||||
| 
 | ||||
| using namespace aman; | ||||
| using namespace std::chrono; | ||||
| 
 | ||||
| AircraftScheduler::AircraftScheduler() noexcept : | ||||
| BackendReceiver::BackendReceiver() noexcept : | ||||
|         m_socket(), | ||||
|         m_receiverThread(), | ||||
|         m_stopReceiver(false) { } | ||||
|         m_stopReceiver(false), | ||||
|         m_sequences(), | ||||
|         m_sequencesLock() { } | ||||
| 
 | ||||
| bool AircraftScheduler::initialize(const Communication& configuration) { | ||||
| bool BackendReceiver::initialize(const Communication& configuration) { | ||||
|     if (nullptr != this->m_socket) | ||||
|         return true; | ||||
|     if (false == configuration.valid) | ||||
| @@ -47,12 +49,12 @@ bool AircraftScheduler::initialize(const Communication& configuration) { | ||||
|     } | ||||
| 
 | ||||
|     this->m_stopReceiver = false; | ||||
|     this->m_receiverThread = std::thread(&AircraftScheduler::run, this); | ||||
|     this->m_receiverThread = std::thread(&BackendReceiver::run, this); | ||||
| 
 | ||||
|     return true; | ||||
| } | ||||
| 
 | ||||
| bool AircraftScheduler::deinitialize() { | ||||
| bool BackendReceiver::deinitialize() { | ||||
|     if (nullptr == this->m_socket) | ||||
|         return true; | ||||
| 
 | ||||
| @@ -64,42 +66,56 @@ bool AircraftScheduler::deinitialize() { | ||||
|     return true; | ||||
| } | ||||
| 
 | ||||
| bool AircraftScheduler::initialized() const noexcept { | ||||
| bool BackendReceiver::initialized() const noexcept { | ||||
|     return nullptr != this->m_socket; | ||||
| } | ||||
| 
 | ||||
| std::unique_ptr<aman::AircraftSchedule> AircraftScheduler::receive() { | ||||
| void BackendReceiver::receiveSequence() { | ||||
|     zmq::message_t message; | ||||
| 
 | ||||
|     if (nullptr == this->m_socket) | ||||
|         return std::unique_ptr<aman::AircraftSchedule>(); | ||||
|         return; | ||||
| 
 | ||||
|     try { | ||||
|         auto result = this->m_socket->recv(message, zmq::recv_flags::dontwait); | ||||
|         if (false == result.has_value() || 0 == result.value()) | ||||
|             return std::unique_ptr<aman::AircraftSchedule>(); | ||||
|             return; | ||||
|     } | ||||
|     catch (zmq::error_t&) { | ||||
|         return std::unique_ptr<aman::AircraftSchedule>(); | ||||
|         return; | ||||
|     } | ||||
| 
 | ||||
|     std::unique_ptr<aman::AircraftSchedule> retval = std::make_unique<aman::AircraftSchedule>(); | ||||
|     std::unique_ptr<aman::AircraftSequence> retval = std::make_unique<aman::AircraftSequence>(); | ||||
|     retval->ParseFromString(reinterpret_cast<const char*>(message.data())); | ||||
| 
 | ||||
|     return retval; | ||||
|     std::lock_guard guard(this->m_sequencesLock); | ||||
|     this->m_sequences.push_back(std::move(retval)); | ||||
| } | ||||
| 
 | ||||
| void AircraftScheduler::run() { | ||||
| void BackendReceiver::run() { | ||||
|     while (false == this->m_stopReceiver) { | ||||
|         auto message = this->receive(); | ||||
|         if (nullptr == message) { | ||||
|             std::this_thread::sleep_for(500ms); | ||||
|             continue; | ||||
|         } | ||||
| 
 | ||||
|         this->receiveSequence(); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| AircraftScheduler& AircraftScheduler::instance() noexcept { | ||||
|     static AircraftScheduler __instance; | ||||
| std::shared_ptr<aman::AircraftSequence> BackendReceiver::receive() { | ||||
|     std::lock_guard guard(this->m_sequencesLock); | ||||
|     if (0 != this->m_sequences.size()) { | ||||
|         auto retval = this->m_sequences.front(); | ||||
|         this->m_sequences.pop_front(); | ||||
|         return retval; | ||||
|     } | ||||
| 
 | ||||
|     return nullptr; | ||||
| } | ||||
| 
 | ||||
| BackendReceiver& BackendReceiver::instance() noexcept { | ||||
|     static BackendReceiver __instance; | ||||
|     return __instance; | ||||
| } | ||||
		Reference in New Issue
	
	Block a user