Program Listing for File EntityWatchdog.h

Return to documentation for file (axr.sdk/utils/EntityWatchdog.h)

// Copyright (C) AcceleratXR, Inc. All rights reserved.
//
// Author: Jean-Philippe Steinmetz <info@acceleratxr.com>
#pragma once

#include <cpprest/details/basic_types.h>

#include <chrono>
#include <functional>
#include <memory>
#include <map>
#include <mutex>
#include <vector>
#include <unordered_set>
#include <thread>
#include <typeinfo>

#include "../Configuration.h"
#include "../Object.h"
#include "../ServiceBase.h"
#include "../net/IApiClient.h"
#include "../net/IWebSocket.h"
#include "../net/HttpContent.h"

#define DEFINE_ENTITYWATCHDOG_SPEC(Type) \
namespace axr { \
namespace sdk { \
namespace utils { \
    typedef class EntityWatchdog<models::Type> EntityWatchdog##Type; \
} \
} \
} \

namespace axr {
    namespace sdk {
        namespace utils {

            class IEntityWatchdog
            {
            public:
                virtual bool NotifyEntityUpdate(Object& msg) = 0;

                virtual void SetPushSocket(std::shared_ptr<net::IWebSocket> inSocket) = 0;

                virtual void Shutdown() = 0;
            };

            // Forward declare so that we can make it inherit from std::enable_shared_from_this.
            template<class T>
            class EntityWatchdog;

            template<class T>
            class EntityWatchdog : public IEntityWatchdog, public std::enable_shared_from_this<EntityWatchdog<T>>
            {
            public:
                typedef std::function<void(std::shared_ptr<T> oldObj, std::shared_ptr<T> newObj)> EntityCallback;

                EntityWatchdog(std::shared_ptr<Configuration> inConfig, std::shared_ptr<net::IApiClient> inApiClient)
                    : config(inConfig)
                    , apiClient(inApiClient)
                {
                }

                virtual ~EntityWatchdog() {}

                virtual void SetPushSocket(std::shared_ptr<net::IWebSocket> inSocket) override
                {
                    this->socket = inSocket;
                }

                virtual bool NotifyEntityUpdate(Object& message) override
                {
                    std::lock_guard<std::mutex> guard(mutex);

                    utility::string_t action = std::get<utility::string_t>(message.GetProperty(_XPLATSTR("action")));
                    std::shared_ptr<Object> data = message.HasProperty(_XPLATSTR("data")) ? std::get<std::shared_ptr<Object>>(message.GetProperty(_XPLATSTR("data"))) : nullptr;
                    if (data == nullptr)
                    {
                        return false;
                    }

                    // Are we tracking the entity in question?
                    utility::string_t uid = std::get<utility::string_t>(data->GetProperty(_XPLATSTR("uid")));
                    auto entityIter = entities.find(uid);
                    if (entityIter == entities.cend())
                    {
                        return false;
                    }

                    // What kind of operation is this?
                    std::shared_ptr<T> entity = entityIter->second;
                    std::shared_ptr<T> newEntity = nullptr;
                    bool isDelete = false;
                    bool updated = false;
                    if (action.compare(_XPLATSTR("delete")) == 0)
                    {
                        isDelete = true;
                        updated = true;
                    }
                    else if (action.compare(_XPLATSTR("update")) == 0)
                    {
                        // Retrieve the entity version and compare to what we currently have
                        uint64_t version = (uint64_t)std::get<int64_t>(data->GetProperty(_XPLATSTR("version")));
                        if (version > entity->GetVersion())
                        {
                            // Update the entity's internal data
                            newEntity = std::make_shared<T>();
                            newEntity->MoveFrom(data);
                            updated = true;
                        }
                    }

                    if (updated)
                    {
                        // Notify all the callbacks
                        auto iter = callbacks.find(uid);
                        if (iter != callbacks.cend())
                        {
                            for (auto func : iter->second)
                            {
                                if (func != nullptr)
                                {
                                    func(entity, newEntity);
                                }
                            }
                        }

                        // Clean up if this is a delete operation
                        if (isDelete)
                        {
                            entities.erase(entityIter);
                            callbacks.erase(iter);
                        }
                    }

                    return true;
                }

                virtual void Shutdown() override
                {
                    std::lock_guard<std::mutex> guard(mutex);
                    pendingShutdown = true;
                    callbacks.clear();
                    entities.clear();
                    remoteUrls.clear();
                }

                void Start()
                {
                    pplx::cancellation_token cancelToken = config->GetCancellationToken();
                    uint32_t pollingFrequency = config->GetPollingFrequency();
                    std::shared_ptr<EntityWatchdog> thisPtr = this->shared_from_this();
                    pplx::create_task([cancelToken, pollingFrequency, thisPtr]()
                    {
                        while (thisPtr != nullptr && !thisPtr->pendingShutdown && !cancelToken.is_canceled())
                        {
                            thisPtr->Run();

                            // Sleep
                            std::this_thread::sleep_for(std::chrono::milliseconds(pollingFrequency));
                        }
                    }, cancelToken).then([](pplx::task<void> task)
                    {
                        try
                        {
                            task.get();
                        }
                        catch (...)
                        {
                            // There's nothing to do here
                        }
                    });
                }

                size_t Watch(const std::map<utility::string_t, utility::string_t> searchParams, const uint16_t startIndex, const uint16_t maxResults, const utility::string_t& sortBy, std::function<void(std::shared_ptr<T> obj)> callback)
                {
                    // TODO
                    return 0;
                }

                size_t Watch(std::shared_ptr<T> entity, EntityCallback callback)
                {
                    size_t result = 0;
                    if (entity == nullptr)
                    {
                        throw Exception(400, _XPLATSTR("Provided entity is not valid. Was it deleted?"));
                    }

                    utility::string_t entityUid = entity->GetUid();

                    if (entity->GetRemoteUrl().empty())
                    {
                        throw Exception(400, _XPLATSTR("Provided entity does not have a remote url set."));
                    }

                    std::lock_guard<std::mutex> guard(mutex);

                    // Add the provided callback to the list for this entity
                    auto iter = callbacks.find(entityUid);
                    if (iter != callbacks.cend())
                    {
                        // Add the callback to the existing list
                        iter->second.push_back(callback);
                        result = iter->second.size() - 1;
                    }
                    else
                    {
                        // Create a new list of callbacks
                        std::vector<EntityCallback> cbs;
                        cbs.push_back(callback);
                        callbacks.insert_or_assign(entityUid, cbs);
                        result = cbs.size() - 1;
                    }

                    // Add the entity to the list being monitored
                    auto iter2 = entities.find(entityUid);
                    if (iter2 == entities.cend())
                    {
                        entities.insert_or_assign(entityUid, entity);
                    }

                    // Store the remote URL
                    auto iter3 = remoteUrls.find(entityUid);
                    if (iter3 == remoteUrls.cend())
                    {
                        remoteUrls.insert_or_assign(entityUid, entity->GetRemoteUrl());
                    }

                    return result;
                }

                void Unwatch(std::shared_ptr<T> entity, size_t id)
                {
                    if (entity != nullptr)
                    {
                        std::lock_guard<std::mutex> guard(mutex);
                        utility::string_t entityUid = entity->GetUid();

                        if (!entityUid.empty())
                        {
                            auto iter = callbacks.find(entityUid);
                            if (iter != callbacks.cend())
                            {
                                // Remove the call back from our list
                                if (id >= 0 && id < iter->second.size())
                                {
                                    auto idx = iter->second.begin() + id;
                                    iter->second.erase(idx);
                                }

                                // If there are no more callbacks, stop monitoring this entity
                                if (iter->second.size() == 0)
                                {
                                    entities.erase(entityUid);
                                    remoteUrls.erase(entityUid);
                                }
                            }
                        }
                    }
                }

            private:
                void RetrieveEntity(utility::string_t entityUid, utility::string_t url)
                {
                    std::shared_ptr<EntityWatchdog> thisPtr = this->shared_from_this();
                    if (thisPtr != nullptr && !entityUid.empty() && !url.empty())
                    {
                        std::map<utility::string_t, utility::string_t> queryParams;
                        std::map<utility::string_t, utility::string_t> headerParams;
                        std::shared_ptr<T> oldEntity = thisPtr->entities.find(entityUid)->second;

                        apiClient->Get(url, queryParams, headerParams).then([entityUid, oldEntity, thisPtr, url](Variant result)
                        {
                            if (3 == result.index())
                            {
                                std::shared_ptr<Object> obj = std::get<std::shared_ptr<Object>>(result);
                                std::shared_ptr<T> newEntity = std::make_shared<T>();
                                newEntity->MoveFrom(obj);
                                newEntity->SetRemoteUrl(url);

                                // Did we receive a new version?
                                if (oldEntity == nullptr || newEntity->GetVersion() > oldEntity->GetVersion())
                                {
                                    std::lock_guard<std::mutex> guard(thisPtr->mutex);

                                    // Notify all the callbacks
                                    auto iter = thisPtr->callbacks.find(entityUid);
                                    if (iter != thisPtr->callbacks.cend())
                                    {
                                        for (auto func : iter->second)
                                        {
                                            if (func != nullptr)
                                            {
                                                func(oldEntity, newEntity);
                                            }
                                        }
                                    }

                                    // Update the internal cache
                                    thisPtr->entities.insert_or_assign(entityUid, newEntity);
                                }
                            }
                        })
                        .then([entityUid, oldEntity, thisPtr](pplx::task<void> task)
                        {
                            try
                            {
                                task.get();
                            }
                            catch (axr::sdk::Exception err)
                            {
                                // Check if the entity was no longer found
                                if (err.error_code().value() == 404)
                                {
                                    std::lock_guard<std::mutex> guard(thisPtr->mutex);

                                    // Notify all the callbacks that the object is null
                                    auto iter = thisPtr->callbacks.find(entityUid);
                                    if (iter != thisPtr->callbacks.cend())
                                    {
                                        for (auto func : iter->second)
                                        {
                                            if (func != nullptr)
                                            {
                                                func(oldEntity, nullptr);
                                            }
                                        }
                                    }

                                    // The entity is no longer available so we need to clean up
                                    thisPtr->callbacks.erase(entityUid);
                                    thisPtr->entities.erase(entityUid);
                                    thisPtr->remoteUrls.erase(entityUid);
                                }
                            }
                            catch (...)
                            {
                                // Nothing to do here
                            }
                        }, config->GetCancellationToken());
                    }
                }

                void Run()
                {
                    // When an active WebSocket connection is available it is used to monitor for changes.
                    // Otherwise we fall back to polling.
                    if (socket == nullptr)
                    {
                        // Go through each monitored entity and attempt to update it
                        for (auto iter : remoteUrls)
                        {
                            RetrieveEntity(iter.first, iter.second);
                        }
                    }
                }

                std::shared_ptr<Configuration> config = nullptr;
                std::shared_ptr<net::IApiClient> apiClient = nullptr;
                std::map<utility::string_t, std::vector<EntityCallback>> callbacks;
                std::map<utility::string_t, std::shared_ptr<T>> entities;
                std::map<utility::string_t, utility::string_t> remoteUrls;
                std::shared_ptr<net::IWebSocket> socket = nullptr;
                std::mutex mutex;
                bool pendingShutdown = false;
            };

        } // namespace utils
    } // namespace sdk
} // namespace axr