/* ****************************************************************************
 * Copyright 2019 Open Systems Development BV                                 *
 *                                                                            *
 * Permission is hereby granted, free of charge, to any person obtaining a    *
 * copy of this software and associated documentation files (the "Software"), *
 * to deal in the Software without restriction, including without limitation  *
 * the rights to use, copy, modify, merge, publish, distribute, sublicense,   *
 * and/or sell copies of the Software, and to permit persons to whom the      *
 * Software is furnished to do so, subject to the following conditions:       *
 *                                                                            *
 * The above copyright notice and this permission notice shall be included in *
 * all copies or substantial portions of the Software.                        *
 *                                                                            *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,   *
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL    *
 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING    *
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER        *
 * DEALINGS IN THE SOFTWARE.                                                  *
 * ***************************************************************************/
#include "clientpaho.h"

#include "errorcode.h"
#include "mqttutil.h"
#include "lockguard.h"
#include "metaprogrammingdefs.h"
#include "mqttstream.h"
#include "scopeguard.h"
#include "uriparser.h"

// std::chrono
#include "compat-chrono.h"

// std
#include <algorithm>
#include <iterator>

using namespace osdev::components::mqtt;

namespace {

#if defined(__clang__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-template"
#endif

OSDEV_COMPONENTS_HASMEMBER_TRAIT(onSuccess5)

template <typename TRet>
inline typename std::enable_if<!has_onSuccess5<TRet>::value, TRet>::type initializeMqttStruct(TRet*)
{
    return MQTTAsync_disconnectOptions_initializer;
}

template <typename TRet>
inline typename std::enable_if<has_onSuccess5<TRet>::value, TRet>::type initializeMqttStruct(TRet*)
{
// For some reason g++ on centos7 evaluates the function body even when it is discarded by SFINAE.
// This leads to a compile error on an undefined symbol. We will use the old initializer macro, but this
// method should not be chosen when the struct does not contain member onSuccess5!
// On yocto warrior mqtt-paho-c 1.3.0 the macro MQTTAsync_disconnectOptions_initializer5 is not defined.
// while the struct does have an onSuccess5 member. In that case we do need correct initializer code.
// We fall back to the MQTTAsync_disconnectOptions_initializer macro and initialize
// additional fields ourself (which unfortunately results in a pesky compiler warning about missing field initializers).
#ifndef MQTTAsync_disconnectOptions_initializer5
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wmissing-field-initializers"
    TRet ret = MQTTAsync_disconnectOptions_initializer;
    ret.struct_version = 1;
    ret.onSuccess5 = nullptr;
    ret.onFailure5 = nullptr;
    return ret;
#pragma GCC diagnostic pop
#else
    return MQTTAsync_disconnectOptions_initializer5;
#endif
}

template <typename TRet>
struct Init
{
    static TRet initialize()
    {
        return initializeMqttStruct<TRet>(static_cast<TRet*>(nullptr));
    }
};
#if defined(__clang__)
#pragma GCC diagnostic pop
#endif

} // namespace

std::atomic_int ClientPaho::s_numberOfInstances(0);

ClientPaho::ClientPaho(const std::string& _endpoint,
    const std::string& _id,
    const std::function<void(const std::string&, ConnectionStatus)>& connectionStatusCallback,
    const std::function<void(const std::string& clientId, std::int32_t pubMsgToken)>& deliveryCompleteCallback)
    : m_mutex()
    , m_endpoint()
    , m_username()
    , m_password()
    , m_clientId(_id)
    , m_pendingOperations()
    , m_operationResult()
    , m_operationsCompleteCV()
    , m_subscriptions()
    , m_pendingSubscriptions()
    , m_subscribeTokenToTopic()
    , m_unsubscribeTokenToTopic()
    , m_pendingPublishes()
    , m_processPendingPublishes(false)
    , m_pendingPublishesReadyCV()
    , m_client()
    , m_connectionStatus(ConnectionStatus::Disconnected)
    , m_connectionStatusCallback(connectionStatusCallback)
    , m_deliveryCompleteCallback(deliveryCompleteCallback)
    , m_lastUnsubscribe(-1)
    , m_connectPromise()
    , m_disconnectPromise()
    , m_callbackEventQueue(m_clientId)
    , m_workerThread()
{
    if (0 == s_numberOfInstances++) {
        MQTTAsync_setTraceCallback(&ClientPaho::onLogPaho);
    }
    // MLOGIC_COMMON_DEBUG("ClientPaho", "%1 - ctor ClientPaho %2", m_clientId, this);
    parseEndpoint(_endpoint);
    auto rc = MQTTAsync_create(&m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr);
    if (MQTTASYNC_SUCCESS == rc)
    {
        MQTTAsync_setCallbacks(m_client, reinterpret_cast<void*>(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete);
        m_workerThread = std::thread(&ClientPaho::callbackEventHandler, this);
    }
    else
    {
        // Do something sensible here.
    }
}

ClientPaho::~ClientPaho()
{
    if( MQTTAsync_isConnected( m_client ) )
    {
        this->unsubscribeAll();

        this->waitForCompletion(std::chrono::milliseconds(2000), std::set<int32_t>{});
        this->disconnect(true, 5000);
    }
    else
    {
        // If the status was already disconnected this call does nothing
        setConnectionStatus(ConnectionStatus::Disconnected);
    }

    if (0 == --s_numberOfInstances)
    {
        // encountered a case where termination of the logging system within paho led to a segfault.
        // This was a paho thread that was cleaned while at the same time the logging system was terminated.
        // Removing the trace callback will not solve the underlying problem but hopefully will trigger it less
        // frequently.
        MQTTAsync_setTraceCallback(nullptr);
    }

    MQTTAsync_destroy(&m_client);

    m_callbackEventQueue.stop();
    if (m_workerThread.joinable())
    {
        m_workerThread.join();
    }
}

std::string ClientPaho::clientId() const
{
    return m_clientId;
}

ConnectionStatus ClientPaho::connectionStatus() const
{
    return m_connectionStatus;
}

std::int32_t ClientPaho::connect(bool wait)
{
    {
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
        if (ConnectionStatus::Disconnected != m_connectionStatus)
        {
            return -1;
        }
        setConnectionStatus(ConnectionStatus::ConnectInProgress);
    }

    MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    conn_opts.onSuccess = &ClientPaho::onConnectSuccess;
    conn_opts.onFailure = &ClientPaho::onConnectFailure;
    conn_opts.context = this;
    conn_opts.automaticReconnect = 1;
    if (!m_username.empty())
    {
        conn_opts.username = m_username.c_str();
    }

    if (!m_password.empty())
    {
        conn_opts.password = m_password.c_str();
    }

    std::promise<void> waitForConnectPromise{};
    auto waitForConnect = waitForConnectPromise.get_future();
    m_connectPromise.reset();
    if (wait)
    {
        m_connectPromise = std::make_unique<std::promise<void>>(std::move(waitForConnectPromise));
    }

    {
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
        if (!m_pendingOperations.insert(-100).second)
        {
            // Write something
        }
        m_operationResult.erase(-100);
    }

    int rc = MQTTAsync_connect(m_client, &conn_opts);
    if (MQTTASYNC_SUCCESS != rc)
    {
        setConnectionStatus(ConnectionStatus::Disconnected);
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
        m_operationResult[-100] = false;
        m_pendingOperations.erase(-100);
    }

    if (wait)
    {
        waitForConnect.get();
        m_connectPromise.reset();
    }
    return -100;
}

std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs)
{
    ConnectionStatus currentStatus = m_connectionStatus;

    {
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
        if (ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus) {
            return -1;
        }

        currentStatus = m_connectionStatus;
        setConnectionStatus(ConnectionStatus::DisconnectInProgress);
    }

    MQTTAsync_disconnectOptions disconn_opts = Init<MQTTAsync_disconnectOptions>::initialize();
    disconn_opts.timeout = timeoutMs;
    disconn_opts.onSuccess = &ClientPaho::onDisconnectSuccess;
    disconn_opts.onFailure = &ClientPaho::onDisconnectFailure;
    disconn_opts.context = this;

    std::promise<void> waitForDisconnectPromise{};
    auto waitForDisconnect = waitForDisconnectPromise.get_future();
    m_disconnectPromise.reset();
    if (wait) {
        m_disconnectPromise = std::make_unique<std::promise<void>>(std::move(waitForDisconnectPromise));
    }

    {
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
        if (!m_pendingOperations.insert(-200).second)
        {
            // "ClientPaho", "%1 disconnect - token %2 already in use", m_clientId, -200)
        }
        m_operationResult.erase(-200);
    }

    int rc = MQTTAsync_disconnect(m_client, &disconn_opts);
    if (MQTTASYNC_SUCCESS != rc) {
        if (MQTTASYNC_DISCONNECTED == rc) {
            currentStatus = ConnectionStatus::Disconnected;
        }

        setConnectionStatus(currentStatus);
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
        m_operationResult[-200] = false;
        m_pendingOperations.erase(-200);

        if (MQTTASYNC_DISCONNECTED == rc) {
            return -1;
        }
        // ("ClientPaho", "%1 - failed to disconnect, return code %2", m_clientId, pahoAsyncErrorCodeToString(rc));
    }

    if (wait) {
        if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100)))
        {
            // ("ClientPaho", "%1 - timeout occurred on disconnect", m_clientId);

        }
        waitForDisconnect.get();
        m_disconnectPromise.reset();
    }
    return -200;
}

std::int32_t ClientPaho::publish(const MqttMessage& message, int qos)
{
    if (ConnectionStatus::DisconnectInProgress == m_connectionStatus)
    {
        // ("ClientPaho", "%1 - disconnect in progress, ignoring publish with qos %2 on topic %3", m_clientId, qos, message.topic());
        return -1;
    }
    else if (ConnectionStatus::Disconnected == m_connectionStatus)
    {
        // ("ClientPaho", "%1 - unable to publish, not connected", m_clientId);
    }

    if (!isValidTopic(message.topic()))
    {
        // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, message.topic());
    }

    if (qos > 2)
    {
        qos = 2;
    }
    else if (qos < 0)
    {
        qos = 0;
    }


    std::unique_lock<std::mutex> lck(m_mutex);
    if (ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes) {
        m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; });
        if (ConnectionStatus::ReconnectInProgress == m_connectionStatus) {
            // ("ClientPaho", "Adding publish to pending queue.");
            m_pendingPublishes.push_front(Publish{ qos, message });
            return -1;
        }
    }

    return publishInternal(message, qos);
}

void ClientPaho::publishPending()
{
    {
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
        if (!m_processPendingPublishes) {
            return;
        }
    }

    if (ConnectionStatus::Connected != m_connectionStatus)
    {
        // MqttException, "Not connected");
    }

    while (!m_pendingPublishes.empty())
    {
        const auto& pub = m_pendingPublishes.back();
        publishInternal(pub.data, pub.qos);
        // else ("ClientPaho", "%1 - pending publish on topic %2 failed : %3", m_clientId, pub.data.topic(), e.what());

        m_pendingPublishes.pop_back();
    }

    {
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
        m_processPendingPublishes = false;
    }
    m_pendingPublishesReadyCV.notify_all();
}

std::int32_t ClientPaho::subscribe(const std::string& topic, int qos, const std::function<void(MqttMessage msg)>& cb)
{
    if (ConnectionStatus::Connected != m_connectionStatus)
    {
        // MqttException, "Not connected"
    }

    if (!isValidTopic(topic))
    {
        // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic);
    }

    if (qos > 2)
    {
        qos = 2;
    }
    else if (qos < 0)
    {
        qos = 0;
    }

    {
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);

        auto itExisting = m_subscriptions.find(topic);
        if (m_subscriptions.end() != itExisting) {
            if (itExisting->second.qos == qos) {
                return -1;
            }
            // (OverlappingTopicException, "existing subscription with same topic, but different qos", topic);
        }

        auto itPending = m_pendingSubscriptions.find(topic);
        if (m_pendingSubscriptions.end() != itPending) {
            if (itPending->second.qos == qos) {
                auto itToken = std::find_if(m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair<MQTTAsync_token, std::string>& item) { return topic == item.second; });
                if (m_subscribeTokenToTopic.end() != itToken) {
                    return itToken->first;
                }
                else {
                    return -1;
                }
            }
            // (OverlappingTopicException, "pending subscription with same topic, but different qos", topic);
        }

        std::string existingTopic{};
        if (isOverlappingInternal(topic, existingTopic))
        {
            // (OverlappingTopicException, "overlapping topic", existingTopic, topic);
        }

        // ("ClientPaho", "%1 - adding subscription on topic %2 to the pending subscriptions", m_clientId, topic);
        m_pendingSubscriptions.emplace(std::make_pair(topic, Subscription{ qos, boost::regex(convertTopicToRegex(topic)), cb }));
    }
    return subscribeInternal(topic, qos);
}

void ClientPaho::resubscribe()
{
    decltype(m_pendingSubscriptions) pendingSubscriptions{};
    {
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
        std::copy(m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end()));
    }

    for (const auto& s : pendingSubscriptions)
    {
        subscribeInternal(s.first, s.second.qos);
    }
}

std::int32_t ClientPaho::unsubscribe(const std::string& topic, int qos)
{
    {
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
        bool found = false;
        for (const auto& s : m_subscriptions)
        {
            if (topic == s.first && qos == s.second.qos)
            {
                found = true;
                break;
            }
        }
        if (!found)
        {
            return -1;
        }
    }

    MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
    opts.onSuccess = &ClientPaho::onUnsubscribeSuccess;
    opts.onFailure = &ClientPaho::onUnsubscribeFailure;
    opts.context = this;

    {
        // Need to lock the mutex because it is possible that the callback is faster than
        // the insertion of the token into the pending operations.
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
        auto rc = MQTTAsync_unsubscribe(m_client, topic.c_str(), &opts);
        if (MQTTASYNC_SUCCESS != rc)
        {
            // ("ClientPaho", "%1 - unsubscribe on topic %2 failed with code %3", m_clientId, topic, pahoAsyncErrorCodeToString(rc));
        }

        if (!m_pendingOperations.insert(opts.token).second)
        {
            // ("ClientPaho", "%1 unsubscribe - token %2 already in use", m_clientId, opts.token);
        }

        m_operationResult.erase(opts.token);
        if (m_unsubscribeTokenToTopic.count(opts.token) > 0)
        {
            // ("ClientPaho", "%1 - token already in use, replacing unsubscribe from topic %2 with topic %3", m_clientId, m_unsubscribeTokenToTopic[opts.token], topic);
        }
        m_lastUnsubscribe = opts.token; // centos7 workaround
        m_unsubscribeTokenToTopic[opts.token] = topic;
    }

    // Because of a bug in paho-c on centos7 the unsubscribes need to be sequential (best effort).
    this->waitForCompletion(std::chrono::seconds(1), std::set<int32_t>{ opts.token });

    return opts.token;
}

void ClientPaho::unsubscribeAll()
{
    decltype(m_subscriptions) subscriptions{};
    {
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
        subscriptions = m_subscriptions;
    }

    for (const auto& s : subscriptions) {
        this->unsubscribe(s.first, s.second.qos);
    }
}

std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::milliseconds waitFor, const std::set<std::int32_t>& tokens) const
{
    if (waitFor <= std::chrono::milliseconds(0)) {
        return std::chrono::milliseconds(0);
    }
    std::chrono::milliseconds timeElapsed{};
    {
        osdev::components::mqtt::measurement::TimeMeasurement msr("waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds)
        {
            timeElapsed = std::chrono::ceil<std::chrono::milliseconds>(sinceStart);
        });
        std::unique_lock<std::mutex> lck(m_mutex);
        // ("ClientPaho", "%1 waitForCompletion - pending operations : %2", m_clientId, m_pendingOperations);
        m_operationsCompleteCV.wait_for(lck, waitFor, [this, &tokens]()
        {
            if (tokens.empty())
            { // wait for all operations to end
                return m_pendingOperations.empty();
            }
            else if (tokens.size() == 1)
            {
                return m_pendingOperations.find(*tokens.cbegin()) == m_pendingOperations.end();
            }
            std::vector<std::int32_t> intersect{};
            std::set_intersection(m_pendingOperations.begin(), m_pendingOperations.end(), tokens.begin(), tokens.end(), std::back_inserter(intersect));
            return intersect.empty();
        } );
    }
    return timeElapsed;
}

bool ClientPaho::isOverlapping(const std::string& topic) const
{
    std::string existingTopic{};
    return isOverlapping(topic, existingTopic);
}

bool ClientPaho::isOverlapping(const std::string& topic, std::string& existingTopic) const
{
    OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
    return isOverlappingInternal(topic, existingTopic);
}

std::vector<std::int32_t> ClientPaho::pendingOperations() const
{
    OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
    std::vector<std::int32_t> retval{};
    retval.resize(m_pendingOperations.size());
    std::copy(m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin());
    return retval;
}

bool ClientPaho::hasPendingSubscriptions() const
{
    OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
    return !m_pendingSubscriptions.empty();
}

boost::optional<bool> ClientPaho::operationResult(std::int32_t token) const
{
    OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
    boost::optional<bool> ret{};
    auto cit = m_operationResult.find(token);
    if (m_operationResult.end() != cit)
    {
        ret = cit->second;
    }
    return ret;
}

void ClientPaho::parseEndpoint(const std::string& _endpoint)
{
    auto ep = UriParser::parse(_endpoint);
    if (ep.find("user") != ep.end())
    {
        m_username = ep["user"];
        ep["user"].clear();
    }

    if (ep.find("password") != ep.end())
    {
        m_password = ep["password"];
        ep["password"].clear();
    }
    m_endpoint = UriParser::toString(ep);
}

std::int32_t ClientPaho::publishInternal(const MqttMessage& message, int qos)
{
    MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
    opts.onSuccess = &ClientPaho::onPublishSuccess;
    opts.onFailure = &ClientPaho::onPublishFailure;
    opts.context = this;
    auto msg = message.toAsyncMessage();
    msg.qos = qos;

    // Need to lock the mutex because it is possible that the callback is faster than
    // the insertion of the token into the pending operations.

    // OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
    auto rc = MQTTAsync_sendMessage(m_client, message.topic().c_str(), &msg, &opts);
    if (MQTTASYNC_SUCCESS != rc)
    {
        // ("ClientPaho", "%1 - publish on topic %2 failed with code %3", m_clientId, message.topic(), pahoAsyncErrorCodeToString(rc));
    }

    if (!m_pendingOperations.insert(opts.token).second)
    {
        // ("ClientPaho", "%1 publishInternal - token %2 already in use", m_clientId, opts.token);
    }
    m_operationResult.erase(opts.token);
    return opts.token;
}

std::int32_t ClientPaho::subscribeInternal(const std::string& topic, int qos)
{
    MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
    opts.onSuccess = &ClientPaho::onSubscribeSuccess;
    opts.onFailure = &ClientPaho::onSubscribeFailure;
    opts.context = this;

    // Need to lock the mutex because it is possible that the callback is faster than
    // the insertion of the token into the pending operations.
    OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
    auto rc = MQTTAsync_subscribe(m_client, topic.c_str(), qos, &opts);
    if (MQTTASYNC_SUCCESS != rc)
    {
        m_pendingSubscriptions.erase(topic);
        // ("ClientPaho", "%1 - subscription on topic %2 failed with code %3", m_clientId, topic, pahoAsyncErrorCodeToString(rc));
        // (MqttException, "Subscription failed");
    }

    if (!m_pendingOperations.insert(opts.token).second)
    {
        // ("ClientPaho", "%1 subscribe - token %2 already in use", m_clientId, opts.token);
    }
    m_operationResult.erase(opts.token);
    if (m_subscribeTokenToTopic.count(opts.token) > 0)
    {
        // ("ClientPaho", "%1 - overwriting pending subscription on topic %2 with topic %3", m_clientId, m_subscribeTokenToTopic[opts.token], topic);
    }
    m_subscribeTokenToTopic[opts.token] = topic;
    return opts.token;
}

void ClientPaho::setConnectionStatus(ConnectionStatus status)
{
    ConnectionStatus curStatus = m_connectionStatus;
    m_connectionStatus = status;
    if (status != curStatus && m_connectionStatusCallback)
    {
        m_connectionStatusCallback(m_clientId, status);
    }
}

bool ClientPaho::isOverlappingInternal(const std::string& topic, std::string& existingTopic) const
{
    existingTopic.clear();
    for (const auto& s : m_pendingSubscriptions)
    {
        if (testForOverlap(s.first, topic))
        {
            existingTopic = s.first;
            return true;
        }
    }

    for (const auto& s : m_subscriptions)
    {
        if (testForOverlap(s.first, topic))
        {
            existingTopic = s.first;
            return true;
        }
    }
    return false;
}

void ClientPaho::pushIncomingEvent(std::function<void()> ev)
{
    m_callbackEventQueue.push(ev);
}

void ClientPaho::callbackEventHandler()
{
    // ("ClientPaho", "%1 - starting callback event handler", m_clientId);
    for (;;) {
        std::vector<std::function<void()>> events;
        if (!m_callbackEventQueue.pop(events))
        {
            break;
        }

        for (const auto& ev : events)
        {
            ev();
            // ("ClientPaho", "%1 - Exception occurred: %2", m_clientId, mlogicException);
        }
    }
    // ("ClientPaho", "%1 - leaving callback event handler", m_clientId);
}

void ClientPaho::onConnectOnInstance(const std::string& cause)
{
    (void)cause;
    // toLogFile ("ClientPaho", "onConnectOnInstance %1 - reconnected (cause %2)", m_clientId, cause);
    {
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
        std::copy(m_subscriptions.begin(), m_subscriptions.end(), std::inserter(m_pendingSubscriptions, m_pendingSubscriptions.end()));
        m_subscriptions.clear();
        m_processPendingPublishes = true; // all publishes are on hold until publishPending is called.
    }

    setConnectionStatus(ConnectionStatus::Connected);
}

void ClientPaho::onConnectSuccessOnInstance(const MqttSuccess& response)
{
    auto connectData = response.connectionData();
    // ("ClientPaho", "onConnectSuccessOnInstance %1 - connected to endpoint %2 (mqtt version %3, session present %4)",
        // m_clientId, connectData.serverUri(), connectData.mqttVersion(), connectData.sessionPresent());
    {
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
        // Register the connect callback that is used in reconnect scenarios.
        auto rc = MQTTAsync_setConnected(m_client, this, &ClientPaho::onConnect);
        if (MQTTASYNC_SUCCESS != rc)
        {
            // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the connected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc));
        }
        // For MQTTV5
        //rc = MQTTAsync_setDisconnected(m_client, this, &ClientPaho::onDisconnect);
        //if (MQTTASYNC_SUCCESS != rc) {
        //    // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the disconnected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc));
        //}
        // ("ClientPaho", "onConnectSuccessOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
        m_operationResult[-100] = true;
        m_pendingOperations.erase(-100);
    }
    setConnectionStatus(ConnectionStatus::Connected);
    if (m_connectPromise)
    {
        m_connectPromise->set_value();
    }
    m_operationsCompleteCV.notify_all();
}

void ClientPaho::onConnectFailureOnInstance(const MqttFailure& response)
{
    (void)response;
    // ("ClientPaho", "onConnectFailureOnInstance %1 - connection failed with code %2 (%3)", m_clientId, response.codeToString(), response.message());
    {
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
        if (m_connectPromise) {
            m_connectPromise->set_value();
        }
        // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
        m_operationResult[-100] = false;
        m_pendingOperations.erase(-100);
    }
    if (ConnectionStatus::ConnectInProgress == m_connectionStatus)
    {
        setConnectionStatus(ConnectionStatus::Disconnected);
    }
    m_operationsCompleteCV.notify_all();
}

//void ClientPaho::onDisconnectOnInstance(enum MQTTReasonCodes reasonCode)
//{
//    MLOGIC_COMMON_INFO("ClientPaho", "onDisconnectOnInstance %1 - disconnect (reason %2)", MQTTReasonCode_toString(reasonCode));
//}

void ClientPaho::onDisconnectSuccessOnInstance(const MqttSuccess&)
{
    // ("ClientPaho", "onDisconnectSuccessOnInstance %1 - disconnected from endpoint %2", m_clientId, m_endpoint);
    {
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
        m_subscriptions.clear();
        m_pendingSubscriptions.clear();
        m_subscribeTokenToTopic.clear();
        m_unsubscribeTokenToTopic.clear();

        // ("ClientPaho", "onDisconnectSuccessOnInstance %1 - pending operations : %2, removing all operations", m_clientId, m_pendingOperations);
        m_operationResult[-200] = true;
        m_pendingOperations.clear();
    }

    setConnectionStatus(ConnectionStatus::Disconnected);

    if (m_disconnectPromise) {
        m_disconnectPromise->set_value();
    }
    m_operationsCompleteCV.notify_all();
}

void ClientPaho::onDisconnectFailureOnInstance(const MqttFailure& response)
{
    (void)response;
    // ("ClientPaho", "onDisconnectFailureOnInstance %1 - disconnect failed with code %2 (%3)", m_clientId, response.codeToString(), response.message());
    {
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
        // ("ClientPaho", "onDisconnectFailureOnInstance %1 - pending operations : %2, removing operation -200", m_clientId, m_pendingOperations);
        m_operationResult[-200] = false;
        m_pendingOperations.erase(-200);
    }

    if (MQTTAsync_isConnected(m_client))
    {
        setConnectionStatus(ConnectionStatus::Connected);
    }
    else
    {
        setConnectionStatus(ConnectionStatus::Disconnected);
    }

    if (m_disconnectPromise)
    {
        m_disconnectPromise->set_value();
    }
    m_operationsCompleteCV.notify_all();
}

void ClientPaho::onPublishSuccessOnInstance(const MqttSuccess& response)
{
    auto pd = response.publishData();
    // ("ClientPaho", "onPublishSuccessOnInstance %1 - publish with token %2 succeeded (message was %3)", m_clientId, response.token(), pd.payload());
    {
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
        // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
        m_operationResult[response.token()] = true;
        m_pendingOperations.erase(response.token());
    }
    m_operationsCompleteCV.notify_all();
}

void ClientPaho::onPublishFailureOnInstance(const MqttFailure& response)
{
    // ("ClientPaho", "onPublishFailureOnInstance %1 - publish with token %2 failed with code %3 (%4)", m_clientId, response.token(), response.codeToString(), response.message());
    {
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
        // ("ClientPaho", "onPublishFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
        m_operationResult[response.token()] = false;
        m_pendingOperations.erase(response.token());
    }
    m_operationsCompleteCV.notify_all();
}

void ClientPaho::onSubscribeSuccessOnInstance(const MqttSuccess& response)
{
    // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - subscribe with token %2 succeeded", m_clientId, response.token());
    OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); });
    OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
    bool operationOk = false;
    OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response, &operationOk]()
    {
        // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
        m_operationResult[response.token()] = operationOk;
        m_pendingOperations.erase(response.token());
    });
    auto it = m_subscribeTokenToTopic.find(response.token());
    if (m_subscribeTokenToTopic.end() == it) {
        // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - unknown token %2", m_clientId, response.token());
        return;
    }
    auto topic = it->second;
    m_subscribeTokenToTopic.erase(it);

    auto pendingIt = m_pendingSubscriptions.find(topic);
    if (m_pendingSubscriptions.end() == pendingIt)
    {
        // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - cannot find pending subscription for token %2", m_clientId, response.token());
        return;
    }
    if (response.qos() != pendingIt->second.qos)
    {
        // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - subscription requested qos %2, endpoint assigned qos %3", m_clientId, pendingIt->second.qos, response.qos());
    }
    // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - move pending subscription on topic %2 to the registered subscriptions", m_clientId, topic);
    m_subscriptions.emplace(std::make_pair(pendingIt->first, std::move(pendingIt->second)));
    m_pendingSubscriptions.erase(pendingIt);
    operationOk = true;
}

void ClientPaho::onSubscribeFailureOnInstance(const MqttFailure& response)
{
    // ("ClientPaho", "onSubscribeFailureOnInstance %1 - subscription failed with code %2 (%3)", m_clientId, response.codeToString(), response.message());
    OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); });
    OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
    OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response]()
    {
        // MLOGIC_COMMON_DEBUG("ClientPaho", "onSubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
        m_operationResult[response.token()] = false;
        m_pendingOperations.erase(response.token());
    });

    auto it = m_subscribeTokenToTopic.find(response.token());
    if (m_subscribeTokenToTopic.end() == it)
    {
        // ("ClientPaho", "onSubscribeFailureOnInstance %1 - unknown token %2", m_clientId, response.token());
        return;
    }
    auto topic = it->second;
    m_subscribeTokenToTopic.erase(it);

    auto pendingIt = m_pendingSubscriptions.find(topic);
    if (m_pendingSubscriptions.end() == pendingIt)
    {
        // ("ClientPaho", "onSubscribeFailureOnInstance %1 - cannot find pending subscription for token %2", m_clientId, response.token());
        return;
    }
    // ("ClientPaho", "onSubscribeFailureOnInstance %1 - remove pending subscription on topic %2", m_clientId, topic);
    m_pendingSubscriptions.erase(pendingIt);
}

void ClientPaho::onUnsubscribeSuccessOnInstance(const MqttSuccess& response)
{
    // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - unsubscribe with token %2 succeeded", m_clientId, response.token());

    OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); });
    OSDEV_COMPONENTS_LOCKGUARD(m_mutex);

    // On centos7 the unsubscribe response is a nullptr, so we do not have a valid token.
    // As a workaround the last unsubscribe token is stored and is used when no valid token is available.
    // This is by no means bullet proof because rapid unsubscribes in succession will overwrite this member
    // before the callback on the earlier unsubscribe has arrived. On centos7 the unsubscribes have to be handled
    // sequentially (see ClientPaho::unsubscribe)!
    auto token = response.token();
    if (-1 == token)
    {
        token = m_lastUnsubscribe;
        m_lastUnsubscribe = -1;
    }

    bool operationOk = false;
    OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, token, &operationOk]()
    {
        // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, token);
        m_operationResult[token] = operationOk;
        m_pendingOperations.erase(token);
    });

    auto it = m_unsubscribeTokenToTopic.find(token);
    if (m_unsubscribeTokenToTopic.end() == it)
    {
        // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - unknown token %2", m_clientId, token);
        return;
    }
    auto topic = it->second;
    m_unsubscribeTokenToTopic.erase(it);

    auto registeredIt = m_subscriptions.find(topic);
    if (m_subscriptions.end() == registeredIt) {
        // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - cannot find subscription for token %2", m_clientId, response.token());
        return;
    }
    // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - remove subscription on topic %2 from the registered subscriptions", m_clientId, topic);
    m_subscriptions.erase(registeredIt);
    operationOk = true;
}

void ClientPaho::onUnsubscribeFailureOnInstance(const MqttFailure& response)
{
    // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - subscription failed with code %2 (%3)", m_clientId, response.codeToString(), response.message());
    OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); });
    OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
    OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response]()
    {
        // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
        m_operationResult[response.token()] = false;
        m_pendingOperations.erase(response.token());
    });

    auto it = m_unsubscribeTokenToTopic.find(response.token());
    if (m_unsubscribeTokenToTopic.end() == it)
    {
        // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - unknown token %2", m_clientId, response.token());
        return;
    }
    auto topic = it->second;
    m_unsubscribeTokenToTopic.erase(it);
}

int ClientPaho::onMessageArrivedOnInstance(const MqttMessage& message)
{
    // ("ClientPaho", "onMessageArrivedOnInstance %1 - received message on topic %2, retained : %3, dup : %4", m_clientId, message.topic(), message.retained(), message.duplicate());

    std::function<void(MqttMessage)> cb;

    {
        OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
        for (const auto& s : m_subscriptions)
        {
            if (boost::regex_match(message.topic(), s.second.topicRegex))
            {
                cb = s.second.callback;
            }
        }
    }

    if (cb)
    {
        cb(message);
    }
    else
    {
        // ("ClientPaho", "onMessageArrivedOnInstance %1 - no topic filter found for message received on topic %2", m_clientId, message.topic());
    }
    return 1;
}

void ClientPaho::onDeliveryCompleteOnInstance(MQTTAsync_token token)
{
    // ("ClientPaho", "onDeliveryCompleteOnInstance %1 - message with token %2 is delivered", m_clientId, token);
    if (m_deliveryCompleteCallback)
    {
        m_deliveryCompleteCallback(m_clientId, static_cast<std::int32_t>(token));
    }
}

void ClientPaho::onConnectionLostOnInstance(const std::string& cause)
{
    (void)cause;
    // ("ClientPaho", "onConnectionLostOnInstance %1 - connection lost (%2)", m_clientId, cause);
    setConnectionStatus(ConnectionStatus::ReconnectInProgress);

    OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
    // Remove all tokens related to subscriptions from the active operations.
    for (const auto& p : m_subscribeTokenToTopic)
    {
        // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
        m_pendingOperations.erase(p.first);
    }

    for (const auto& p : m_unsubscribeTokenToTopic)
    {
        // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
        m_pendingOperations.erase(p.first);
    }
    // Clear the administration used in the subscribe process.
    m_subscribeTokenToTopic.clear();
    m_unsubscribeTokenToTopic.clear();
}

// static
void ClientPaho::onConnect(void* context, char* cause)
{
    if (context)
    {
        auto* cl = reinterpret_cast<ClientPaho*>(context);
        std::string reason(nullptr == cause ? "unknown cause" : cause);
        cl->pushIncomingEvent([cl, reason]() { cl->onConnectOnInstance(reason); });
    }
}

// static
void ClientPaho::onConnectSuccess(void* context, MQTTAsync_successData* response)
{
    if (context)
    {
        auto* cl = reinterpret_cast<ClientPaho*>(context);
        if (!response) {
            // connect should always have a valid response struct.
            // ("ClientPaho", "onConnectSuccess - no response data");
        }
        MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent));
        cl->pushIncomingEvent([cl, resp]() { cl->onConnectSuccessOnInstance(resp); });
    }
}

// static
void ClientPaho::onConnectFailure(void* context, MQTTAsync_failureData* response)
{
    if (context)
    {
        auto* cl = reinterpret_cast<ClientPaho*>(context);
        MqttFailure resp(response);
        cl->pushIncomingEvent([cl, resp]() { cl->onConnectFailureOnInstance(resp); });
    }
}

//// static
//void ClientPaho::onDisconnect(void* context, MQTTProperties* properties, enum MQTTReasonCodes reasonCode)
//{
//    apply_unused_parameters(properties);
//    try {
//        if (context) {
//            auto* cl = reinterpret_cast<ClientPaho*>(context);
//            cl->pushIncomingEvent([cl, reasonCode]() { cl->onDisconnectOnInstance(reasonCode); });
//        }
//    }
//    catch (...) {
//    }
//    catch (const std::exception& e) {
//        MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - exception : %1", e.what());
//    }
//    catch (...) {
//        MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - unknown exception");
//    }
//}

// static
void ClientPaho::onDisconnectSuccess(void* context, MQTTAsync_successData* response)
{
    if (context)
    {
        auto* cl = reinterpret_cast<ClientPaho*>(context);
        MqttSuccess resp(response ? response->token : 0);
        cl->pushIncomingEvent([cl, resp]() { cl->onDisconnectSuccessOnInstance(resp); });
    }
}

// static
void ClientPaho::onDisconnectFailure(void* context, MQTTAsync_failureData* response)
{
    if (context)
    {
        auto* cl = reinterpret_cast<ClientPaho*>(context);
        MqttFailure resp(response);
        cl->pushIncomingEvent([cl, resp]() { cl->onDisconnectFailureOnInstance(resp); });
    }
}

// static
void ClientPaho::onPublishSuccess(void* context, MQTTAsync_successData* response)
{
    if (context)
    {
        auto* cl = reinterpret_cast<ClientPaho*>(context);
        if (!response)
        {
            // publish should always have a valid response struct.
            // toLogFile ("ClientPaho", "onPublishSuccess - no response data");
        }
        MqttSuccess resp(response->token, MqttMessage(response->alt.pub.destinationName == nullptr ? "null" : response->alt.pub.destinationName, response->alt.pub.message));
        cl->pushIncomingEvent([cl, resp]() { cl->onPublishSuccessOnInstance(resp); });
    }
}

// static
void ClientPaho::onPublishFailure(void* context, MQTTAsync_failureData* response)
{
    (void)response;
    if (context)
    {
        auto* cl = reinterpret_cast<ClientPaho*>(context);
        MqttFailure resp(response);
        cl->pushIncomingEvent([cl, resp]() { cl->onPublishFailureOnInstance(resp); });
    }
}

// static
void ClientPaho::onSubscribeSuccess(void* context, MQTTAsync_successData* response)
{
    if (context)
    {
        auto* cl = reinterpret_cast<ClientPaho*>(context);
        if (!response)
        {
            // subscribe should always have a valid response struct.
            // MLOGIC_COMMON_FATAL("ClientPaho", "onSubscribeSuccess - no response data");
        }
        MqttSuccess resp(response->token, response->alt.qos);
        cl->pushIncomingEvent([cl, resp]() { cl->onSubscribeSuccessOnInstance(resp); });
    }
}

// static
void ClientPaho::onSubscribeFailure(void* context, MQTTAsync_failureData* response)
{
    if (context)
    {
        auto* cl = reinterpret_cast<ClientPaho*>(context);
        MqttFailure resp(response);
        cl->pushIncomingEvent([cl, resp]() { cl->onSubscribeFailureOnInstance(resp); });
    }
}

// static
void ClientPaho::onUnsubscribeSuccess(void* context, MQTTAsync_successData* response)
{
    if (context)
    {
        auto* cl = reinterpret_cast<ClientPaho*>(context);
        MqttSuccess resp(response ? response->token : -1);
        cl->pushIncomingEvent([cl, resp]() { cl->onUnsubscribeSuccessOnInstance(resp); });
    }
}

// static
void ClientPaho::onUnsubscribeFailure(void* context, MQTTAsync_failureData* response)
{
    if (context)
    {
        auto* cl = reinterpret_cast<ClientPaho*>(context);
        MqttFailure resp(response);
        cl->pushIncomingEvent([cl, resp]() { cl->onUnsubscribeFailureOnInstance(resp); });
    }
}

// static
int ClientPaho::onMessageArrived(void* context, char* topicName, int, MQTTAsync_message* message)
{

    OSDEV_COMPONENTS_SCOPEGUARD(freeMessage, [&topicName, &message]()
    {
        MQTTAsync_freeMessage(&message);
        MQTTAsync_free(topicName);
    });

    if (context)
    {
        auto* cl = reinterpret_cast<ClientPaho*>(context);
        MqttMessage msg(topicName, *message);
        cl->pushIncomingEvent([cl, msg]() { cl->onMessageArrivedOnInstance(msg); });
    }

    return 1; // always return true. Otherwise this callback is triggered again.
}

// static
void ClientPaho::onDeliveryComplete(void* context, MQTTAsync_token token)
{
    if (context)
    {
        auto* cl = reinterpret_cast<ClientPaho*>(context);
        cl->pushIncomingEvent([cl, token]() { cl->onDeliveryCompleteOnInstance(token); });
    }
}

// static
void ClientPaho::onConnectionLost(void* context, char* cause)
{
    OSDEV_COMPONENTS_SCOPEGUARD(freeCause, [&cause]()
    {
        if (cause)
        {
            MQTTAsync_free(cause);
        }
    });

    if (context)
    {
        auto* cl = reinterpret_cast<ClientPaho*>(context);
        std::string msg(nullptr == cause ? "cause unknown" : cause);
        cl->pushIncomingEvent([cl, msg]() { cl->onConnectionLostOnInstance(msg); });
    }
}

// static
void ClientPaho::onLogPaho(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{
    (void)message;
    switch (level)
    {
        case MQTTASYNC_TRACE_MAXIMUM:
        case MQTTASYNC_TRACE_MEDIUM:
        case MQTTASYNC_TRACE_MINIMUM: {
            // ("ClientPaho", "paho - %1", message)
            break;
        }
        case MQTTASYNC_TRACE_PROTOCOL: {
            // ("ClientPaho", "paho - %1", message)
            break;
        }
        case MQTTASYNC_TRACE_ERROR:
        case MQTTASYNC_TRACE_SEVERE:
        case MQTTASYNC_TRACE_FATAL: {
            // ("ClientPaho", "paho - %1", message)
            break;
        }
    }
}