123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743 |
- /*
- Copyright (c) 2016-2017 ZeroMQ community
- Copyright (c) 2016 VOCA AS / Harald Nøkland
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"), to
- deal in the Software without restriction, including without limitation the
- rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
- sell copies of the Software, and to permit persons to whom the Software is
- furnished to do so, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in
- all copies or substantial portions of the Software.
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- IN THE SOFTWARE.
- */
- #ifndef __ZMQ_ADDON_HPP_INCLUDED__
- #define __ZMQ_ADDON_HPP_INCLUDED__
- #include "zmq.hpp"
- #include <deque>
- #include <iomanip>
- #include <sstream>
- #include <stdexcept>
- #ifdef ZMQ_CPP11
- #include <limits>
- #include <functional>
- #include <unordered_map>
- #endif
- namespace zmq
- {
- #ifdef ZMQ_CPP11
- namespace detail
- {
- template<bool CheckN, class OutputIt>
- recv_result_t
- recv_multipart_n(socket_ref s, OutputIt out, size_t n, recv_flags flags)
- {
- size_t msg_count = 0;
- message_t msg;
- while (true) {
- if (CheckN) {
- if (msg_count >= n)
- throw std::runtime_error(
- "Too many message parts in recv_multipart_n");
- }
- if (!s.recv(msg, flags)) {
- // zmq ensures atomic delivery of messages
- assert(msg_count == 0);
- return {};
- }
- ++msg_count;
- const bool more = msg.more();
- *out++ = std::move(msg);
- if (!more)
- break;
- }
- return msg_count;
- }
- inline bool is_little_endian()
- {
- const uint16_t i = 0x01;
- return *reinterpret_cast<const uint8_t *>(&i) == 0x01;
- }
- inline void write_network_order(unsigned char *buf, const uint32_t value)
- {
- if (is_little_endian()) {
- ZMQ_CONSTEXPR_VAR uint32_t mask = std::numeric_limits<std::uint8_t>::max();
- *buf++ = (value >> 24) & mask;
- *buf++ = (value >> 16) & mask;
- *buf++ = (value >> 8) & mask;
- *buf++ = value & mask;
- } else {
- std::memcpy(buf, &value, sizeof(value));
- }
- }
- inline uint32_t read_u32_network_order(const unsigned char *buf)
- {
- if (is_little_endian()) {
- return (static_cast<uint32_t>(buf[0]) << 24)
- + (static_cast<uint32_t>(buf[1]) << 16)
- + (static_cast<uint32_t>(buf[2]) << 8)
- + static_cast<uint32_t>(buf[3]);
- } else {
- uint32_t value;
- std::memcpy(&value, buf, sizeof(value));
- return value;
- }
- }
- } // namespace detail
- /* Receive a multipart message.
-
- Writes the zmq::message_t objects to OutputIterator out.
- The out iterator must handle an unspecified number of writes,
- e.g. by using std::back_inserter.
-
- Returns: the number of messages received or nullopt (on EAGAIN).
- Throws: if recv throws. Any exceptions thrown
- by the out iterator will be propagated and the message
- may have been only partially received with pending
- message parts. It is adviced to close this socket in that event.
- */
- template<class OutputIt>
- ZMQ_NODISCARD recv_result_t recv_multipart(socket_ref s,
- OutputIt out,
- recv_flags flags = recv_flags::none)
- {
- return detail::recv_multipart_n<false>(s, std::move(out), 0, flags);
- }
- /* Receive a multipart message.
-
- Writes at most n zmq::message_t objects to OutputIterator out.
- If the number of message parts of the incoming message exceeds n
- then an exception will be thrown.
-
- Returns: the number of messages received or nullopt (on EAGAIN).
- Throws: if recv throws. Throws std::runtime_error if the number
- of message parts exceeds n (exactly n messages will have been written
- to out). Any exceptions thrown
- by the out iterator will be propagated and the message
- may have been only partially received with pending
- message parts. It is adviced to close this socket in that event.
- */
- template<class OutputIt>
- ZMQ_NODISCARD recv_result_t recv_multipart_n(socket_ref s,
- OutputIt out,
- size_t n,
- recv_flags flags = recv_flags::none)
- {
- return detail::recv_multipart_n<true>(s, std::move(out), n, flags);
- }
- /* Send a multipart message.
-
- The range must be a ForwardRange of zmq::message_t,
- zmq::const_buffer or zmq::mutable_buffer.
- The flags may be zmq::send_flags::sndmore if there are
- more message parts to be sent after the call to this function.
-
- Returns: the number of messages sent (exactly msgs.size()) or nullopt (on EAGAIN).
- Throws: if send throws. Any exceptions thrown
- by the msgs range will be propagated and the message
- may have been only partially sent. It is adviced to close this socket in that event.
- */
- template<class Range
- #ifndef ZMQ_CPP11_PARTIAL
- ,
- typename = typename std::enable_if<
- detail::is_range<Range>::value
- && (std::is_same<detail::range_value_t<Range>, message_t>::value
- || detail::is_buffer<detail::range_value_t<Range>>::value)>::type
- #endif
- >
- send_result_t
- send_multipart(socket_ref s, Range &&msgs, send_flags flags = send_flags::none)
- {
- using std::begin;
- using std::end;
- auto it = begin(msgs);
- const auto end_it = end(msgs);
- size_t msg_count = 0;
- while (it != end_it) {
- const auto next = std::next(it);
- const auto msg_flags =
- flags | (next == end_it ? send_flags::none : send_flags::sndmore);
- if (!s.send(*it, msg_flags)) {
- // zmq ensures atomic delivery of messages
- assert(it == begin(msgs));
- return {};
- }
- ++msg_count;
- it = next;
- }
- return msg_count;
- }
- /* Encode a multipart message.
- The range must be a ForwardRange of zmq::message_t. A
- zmq::multipart_t or STL container may be passed for encoding.
- Returns: a zmq::message_t holding the encoded multipart data.
- Throws: std::range_error is thrown if the size of any single part
- can not fit in an unsigned 32 bit integer.
- The encoding is compatible with that used by the CZMQ function
- zmsg_encode(), see https://rfc.zeromq.org/spec/50/.
- Each part consists of a size followed by the data.
- These are placed contiguously into the output message. A part of
- size less than 255 bytes will have a single byte size value.
- Larger parts will have a five byte size value with the first byte
- set to 0xFF and the remaining four bytes holding the size of the
- part's data.
- */
- template<class Range
- #ifndef ZMQ_CPP11_PARTIAL
- ,
- typename = typename std::enable_if<
- detail::is_range<Range>::value
- && (std::is_same<detail::range_value_t<Range>, message_t>::value
- || detail::is_buffer<detail::range_value_t<Range>>::value)>::type
- #endif
- >
- message_t encode(const Range &parts)
- {
- size_t mmsg_size = 0;
- // First pass check sizes
- for (const auto &part : parts) {
- const size_t part_size = part.size();
- if (part_size > std::numeric_limits<std::uint32_t>::max()) {
- // Size value must fit into uint32_t.
- throw std::range_error("Invalid size, message part too large");
- }
- const size_t count_size =
- part_size < std::numeric_limits<std::uint8_t>::max() ? 1 : 5;
- mmsg_size += part_size + count_size;
- }
- message_t encoded(mmsg_size);
- unsigned char *buf = encoded.data<unsigned char>();
- for (const auto &part : parts) {
- const uint32_t part_size = part.size();
- const unsigned char *part_data =
- static_cast<const unsigned char *>(part.data());
- if (part_size < std::numeric_limits<std::uint8_t>::max()) {
- // small part
- *buf++ = (unsigned char) part_size;
- } else {
- // big part
- *buf++ = std::numeric_limits<uint8_t>::max();
- detail::write_network_order(buf, part_size);
- buf += sizeof(part_size);
- }
- std::memcpy(buf, part_data, part_size);
- buf += part_size;
- }
- assert(static_cast<size_t>(buf - encoded.data<unsigned char>()) == mmsg_size);
- return encoded;
- }
- /* Decode an encoded message to multiple parts.
- The given output iterator must be a ForwardIterator to a container
- holding zmq::message_t such as a zmq::multipart_t or various STL
- containers.
- Returns the ForwardIterator advanced once past the last decoded
- part.
- Throws: a std::out_of_range is thrown if the encoded part sizes
- lead to exceeding the message data bounds.
- The decoding assumes the message is encoded in the manner
- performed by zmq::encode(), see https://rfc.zeromq.org/spec/50/.
- */
- template<class OutputIt> OutputIt decode(const message_t &encoded, OutputIt out)
- {
- const unsigned char *source = encoded.data<unsigned char>();
- const unsigned char *const limit = source + encoded.size();
- while (source < limit) {
- size_t part_size = *source++;
- if (part_size == std::numeric_limits<std::uint8_t>::max()) {
- if (static_cast<size_t>(limit - source) < sizeof(uint32_t)) {
- throw std::out_of_range(
- "Malformed encoding, overflow in reading size");
- }
- part_size = detail::read_u32_network_order(source);
- // the part size is allowed to be less than 0xFF
- source += sizeof(uint32_t);
- }
- if (static_cast<size_t>(limit - source) < part_size) {
- throw std::out_of_range("Malformed encoding, overflow in reading part");
- }
- *out = message_t(source, part_size);
- ++out;
- source += part_size;
- }
- assert(source == limit);
- return out;
- }
- #endif
- #ifdef ZMQ_HAS_RVALUE_REFS
- /*
- This class handles multipart messaging. It is the C++ equivalent of zmsg.h,
- which is part of CZMQ (the high-level C binding). Furthermore, it is a major
- improvement compared to zmsg.hpp, which is part of the examples in the ØMQ
- Guide. Unnecessary copying is avoided by using move semantics to efficiently
- add/remove parts.
- */
- class multipart_t
- {
- private:
- std::deque<message_t> m_parts;
- public:
- typedef std::deque<message_t>::value_type value_type;
- typedef std::deque<message_t>::iterator iterator;
- typedef std::deque<message_t>::const_iterator const_iterator;
- typedef std::deque<message_t>::reverse_iterator reverse_iterator;
- typedef std::deque<message_t>::const_reverse_iterator const_reverse_iterator;
- // Default constructor
- multipart_t() {}
- // Construct from socket receive
- multipart_t(socket_t &socket) { recv(socket); }
- // Construct from memory block
- multipart_t(const void *src, size_t size) { addmem(src, size); }
- // Construct from string
- multipart_t(const std::string &string) { addstr(string); }
- // Construct from message part
- multipart_t(message_t &&message) { add(std::move(message)); }
- // Move constructor
- multipart_t(multipart_t &&other) { m_parts = std::move(other.m_parts); }
- // Move assignment operator
- multipart_t &operator=(multipart_t &&other)
- {
- m_parts = std::move(other.m_parts);
- return *this;
- }
- // Destructor
- virtual ~multipart_t() { clear(); }
- message_t &operator[](size_t n) { return m_parts[n]; }
- const message_t &operator[](size_t n) const { return m_parts[n]; }
- message_t &at(size_t n) { return m_parts.at(n); }
- const message_t &at(size_t n) const { return m_parts.at(n); }
- iterator begin() { return m_parts.begin(); }
- const_iterator begin() const { return m_parts.begin(); }
- const_iterator cbegin() const { return m_parts.cbegin(); }
- reverse_iterator rbegin() { return m_parts.rbegin(); }
- const_reverse_iterator rbegin() const { return m_parts.rbegin(); }
- iterator end() { return m_parts.end(); }
- const_iterator end() const { return m_parts.end(); }
- const_iterator cend() const { return m_parts.cend(); }
- reverse_iterator rend() { return m_parts.rend(); }
- const_reverse_iterator rend() const { return m_parts.rend(); }
- // Delete all parts
- void clear() { m_parts.clear(); }
- // Get number of parts
- size_t size() const { return m_parts.size(); }
- // Check if number of parts is zero
- bool empty() const { return m_parts.empty(); }
- // Receive multipart message from socket
- bool recv(socket_t &socket, int flags = 0)
- {
- clear();
- bool more = true;
- while (more) {
- message_t message;
- #ifdef ZMQ_CPP11
- if (!socket.recv(message, static_cast<recv_flags>(flags)))
- return false;
- #else
- if (!socket.recv(&message, flags))
- return false;
- #endif
- more = message.more();
- add(std::move(message));
- }
- return true;
- }
- // Send multipart message to socket
- bool send(socket_t &socket, int flags = 0)
- {
- flags &= ~(ZMQ_SNDMORE);
- bool more = size() > 0;
- while (more) {
- message_t message = pop();
- more = size() > 0;
- #ifdef ZMQ_CPP11
- if (!socket.send(message, static_cast<send_flags>(
- (more ? ZMQ_SNDMORE : 0) | flags)))
- return false;
- #else
- if (!socket.send(message, (more ? ZMQ_SNDMORE : 0) | flags))
- return false;
- #endif
- }
- clear();
- return true;
- }
- // Concatenate other multipart to front
- void prepend(multipart_t &&other)
- {
- while (!other.empty())
- push(other.remove());
- }
- // Concatenate other multipart to back
- void append(multipart_t &&other)
- {
- while (!other.empty())
- add(other.pop());
- }
- // Push memory block to front
- void pushmem(const void *src, size_t size)
- {
- m_parts.push_front(message_t(src, size));
- }
- // Push memory block to back
- void addmem(const void *src, size_t size)
- {
- m_parts.push_back(message_t(src, size));
- }
- // Push string to front
- void pushstr(const std::string &string)
- {
- m_parts.push_front(message_t(string.data(), string.size()));
- }
- // Push string to back
- void addstr(const std::string &string)
- {
- m_parts.push_back(message_t(string.data(), string.size()));
- }
- // Push type (fixed-size) to front
- template<typename T> void pushtyp(const T &type)
- {
- static_assert(!std::is_same<T, std::string>::value,
- "Use pushstr() instead of pushtyp<std::string>()");
- m_parts.push_front(message_t(&type, sizeof(type)));
- }
- // Push type (fixed-size) to back
- template<typename T> void addtyp(const T &type)
- {
- static_assert(!std::is_same<T, std::string>::value,
- "Use addstr() instead of addtyp<std::string>()");
- m_parts.push_back(message_t(&type, sizeof(type)));
- }
- // Push message part to front
- void push(message_t &&message) { m_parts.push_front(std::move(message)); }
- // Push message part to back
- void add(message_t &&message) { m_parts.push_back(std::move(message)); }
- // Alias to allow std::back_inserter()
- void push_back(message_t &&message) { m_parts.push_back(std::move(message)); }
- // Pop string from front
- std::string popstr()
- {
- std::string string(m_parts.front().data<char>(), m_parts.front().size());
- m_parts.pop_front();
- return string;
- }
- // Pop type (fixed-size) from front
- template<typename T> T poptyp()
- {
- static_assert(!std::is_same<T, std::string>::value,
- "Use popstr() instead of poptyp<std::string>()");
- if (sizeof(T) != m_parts.front().size())
- throw std::runtime_error(
- "Invalid type, size does not match the message size");
- T type = *m_parts.front().data<T>();
- m_parts.pop_front();
- return type;
- }
- // Pop message part from front
- message_t pop()
- {
- message_t message = std::move(m_parts.front());
- m_parts.pop_front();
- return message;
- }
- // Pop message part from back
- message_t remove()
- {
- message_t message = std::move(m_parts.back());
- m_parts.pop_back();
- return message;
- }
- // get message part from front
- const message_t &front() { return m_parts.front(); }
- // get message part from back
- const message_t &back() { return m_parts.back(); }
- // Get pointer to a specific message part
- const message_t *peek(size_t index) const { return &m_parts[index]; }
- // Get a string copy of a specific message part
- std::string peekstr(size_t index) const
- {
- std::string string(m_parts[index].data<char>(), m_parts[index].size());
- return string;
- }
- // Peek type (fixed-size) from front
- template<typename T> T peektyp(size_t index) const
- {
- static_assert(!std::is_same<T, std::string>::value,
- "Use peekstr() instead of peektyp<std::string>()");
- if (sizeof(T) != m_parts[index].size())
- throw std::runtime_error(
- "Invalid type, size does not match the message size");
- T type = *m_parts[index].data<T>();
- return type;
- }
- // Create multipart from type (fixed-size)
- template<typename T> static multipart_t create(const T &type)
- {
- multipart_t multipart;
- multipart.addtyp(type);
- return multipart;
- }
- // Copy multipart
- multipart_t clone() const
- {
- multipart_t multipart;
- for (size_t i = 0; i < size(); i++)
- multipart.addmem(m_parts[i].data(), m_parts[i].size());
- return multipart;
- }
- // Dump content to string
- std::string str() const
- {
- std::stringstream ss;
- for (size_t i = 0; i < m_parts.size(); i++) {
- const unsigned char *data = m_parts[i].data<unsigned char>();
- size_t size = m_parts[i].size();
- // Dump the message as text or binary
- bool isText = true;
- for (size_t j = 0; j < size; j++) {
- if (data[j] < 32 || data[j] > 127) {
- isText = false;
- break;
- }
- }
- ss << "\n[" << std::dec << std::setw(3) << std::setfill('0') << size
- << "] ";
- if (size >= 1000) {
- ss << "... (too big to print)";
- continue;
- }
- for (size_t j = 0; j < size; j++) {
- if (isText)
- ss << static_cast<char>(data[j]);
- else
- ss << std::hex << std::setw(2) << std::setfill('0')
- << static_cast<short>(data[j]);
- }
- }
- return ss.str();
- }
- // Check if equal to other multipart
- bool equal(const multipart_t *other) const
- {
- if (size() != other->size())
- return false;
- for (size_t i = 0; i < size(); i++)
- if (*peek(i) != *other->peek(i))
- return false;
- return true;
- }
- #ifdef ZMQ_CPP11
- // Return single part message_t encoded from this multipart_t.
- message_t encode() const { return zmq::encode(*this); }
- // Decode encoded message into multiple parts and append to self.
- void decode_append(const message_t &encoded)
- {
- zmq::decode(encoded, std::back_inserter(*this));
- }
- // Return a new multipart_t containing the decoded message_t.
- static multipart_t decode(const message_t &encoded)
- {
- multipart_t tmp;
- zmq::decode(encoded, std::back_inserter(tmp));
- return tmp;
- }
- #endif
- private:
- // Disable implicit copying (moving is more efficient)
- multipart_t(const multipart_t &other) ZMQ_DELETED_FUNCTION;
- void operator=(const multipart_t &other) ZMQ_DELETED_FUNCTION;
- }; // class multipart_t
- inline std::ostream &operator<<(std::ostream &os, const multipart_t &msg)
- {
- return os << msg.str();
- }
- #endif // ZMQ_HAS_RVALUE_REFS
- #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
- class active_poller_t
- {
- public:
- active_poller_t() = default;
- ~active_poller_t() = default;
- active_poller_t(const active_poller_t &) = delete;
- active_poller_t &operator=(const active_poller_t &) = delete;
- active_poller_t(active_poller_t &&src) = default;
- active_poller_t &operator=(active_poller_t &&src) = default;
- using handler_type = std::function<void(event_flags)>;
- void add(zmq::socket_ref socket, event_flags events, handler_type handler)
- {
- if (!handler)
- throw std::invalid_argument("null handler in active_poller_t::add");
- auto ret = handlers.emplace(
- socket, std::make_shared<handler_type>(std::move(handler)));
- if (!ret.second)
- throw error_t(EINVAL); // already added
- try {
- base_poller.add(socket, events, ret.first->second.get());
- need_rebuild = true;
- }
- catch (...) {
- // rollback
- handlers.erase(socket);
- throw;
- }
- }
- void remove(zmq::socket_ref socket)
- {
- base_poller.remove(socket);
- handlers.erase(socket);
- need_rebuild = true;
- }
- void modify(zmq::socket_ref socket, event_flags events)
- {
- base_poller.modify(socket, events);
- }
- size_t wait(std::chrono::milliseconds timeout)
- {
- if (need_rebuild) {
- poller_events.resize(handlers.size());
- poller_handlers.clear();
- poller_handlers.reserve(handlers.size());
- for (const auto &handler : handlers) {
- poller_handlers.push_back(handler.second);
- }
- need_rebuild = false;
- }
- const auto count = base_poller.wait_all(poller_events, timeout);
- std::for_each(poller_events.begin(),
- poller_events.begin() + static_cast<ptrdiff_t>(count),
- [](decltype(base_poller)::event_type &event) {
- assert(event.user_data != nullptr);
- (*event.user_data)(event.events);
- });
- return count;
- }
- ZMQ_NODISCARD bool empty() const noexcept { return handlers.empty(); }
- size_t size() const noexcept { return handlers.size(); }
- private:
- bool need_rebuild{false};
- poller_t<handler_type> base_poller{};
- std::unordered_map<socket_ref, std::shared_ptr<handler_type>> handlers{};
- std::vector<decltype(base_poller)::event_type> poller_events{};
- std::vector<std::shared_ptr<handler_type>> poller_handlers{};
- }; // class active_poller_t
- #endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
- } // namespace zmq
- #endif // __ZMQ_ADDON_HPP_INCLUDED__
|