Program Listing for File EntityWatchdog.h
↰ Return to documentation for file (axr.sdk/utils/EntityWatchdog.h
)
// Copyright (C) AcceleratXR, Inc. All rights reserved.
//
// Author: Jean-Philippe Steinmetz <info@acceleratxr.com>
#pragma once
#include <cpprest/details/basic_types.h>
#include <chrono>
#include <functional>
#include <memory>
#include <map>
#include <mutex>
#include <vector>
#include <unordered_set>
#include <thread>
#include <typeinfo>
#include "../Configuration.h"
#include "../Object.h"
#include "../ServiceBase.h"
#include "../net/IApiClient.h"
#include "../net/IWebSocket.h"
#include "../net/HttpContent.h"
#define DEFINE_ENTITYWATCHDOG_SPEC(Type) \
namespace axr { \
namespace sdk { \
namespace utils { \
typedef class EntityWatchdog<models::Type> EntityWatchdog##Type; \
} \
} \
} \
namespace axr {
namespace sdk {
namespace utils {
class IEntityWatchdog
{
public:
virtual bool NotifyEntityUpdate(Object& msg) = 0;
virtual void SetPushSocket(std::shared_ptr<net::IWebSocket> inSocket) = 0;
virtual void Shutdown() = 0;
};
// Forward declare so that we can make it inherit from std::enable_shared_from_this.
template<class T>
class EntityWatchdog;
template<class T>
class EntityWatchdog : public IEntityWatchdog, public std::enable_shared_from_this<EntityWatchdog<T>>
{
public:
typedef std::function<void(std::shared_ptr<T> oldObj, std::shared_ptr<T> newObj)> EntityCallback;
EntityWatchdog(std::shared_ptr<Configuration> inConfig, std::shared_ptr<net::IApiClient> inApiClient)
: config(inConfig)
, apiClient(inApiClient)
{
}
virtual ~EntityWatchdog() {}
virtual void SetPushSocket(std::shared_ptr<net::IWebSocket> inSocket) override
{
this->socket = inSocket;
}
virtual bool NotifyEntityUpdate(Object& message) override
{
std::lock_guard<std::mutex> guard(mutex);
utility::string_t action = std::get<utility::string_t>(message.GetProperty(_XPLATSTR("action")));
std::shared_ptr<Object> data = message.HasProperty(_XPLATSTR("data")) ? std::get<std::shared_ptr<Object>>(message.GetProperty(_XPLATSTR("data"))) : nullptr;
if (data == nullptr)
{
return false;
}
// Are we tracking the entity in question?
utility::string_t uid = std::get<utility::string_t>(data->GetProperty(_XPLATSTR("uid")));
auto entityIter = entities.find(uid);
if (entityIter == entities.cend())
{
return false;
}
// What kind of operation is this?
std::shared_ptr<T> entity = entityIter->second;
std::shared_ptr<T> newEntity = nullptr;
bool isDelete = false;
bool updated = false;
if (action.compare(_XPLATSTR("delete")) == 0)
{
isDelete = true;
updated = true;
}
else if (action.compare(_XPLATSTR("update")) == 0)
{
// Retrieve the entity version and compare to what we currently have
uint64_t version = (uint64_t)std::get<int64_t>(data->GetProperty(_XPLATSTR("version")));
if (version > entity->GetVersion())
{
// Update the entity's internal data
newEntity = std::make_shared<T>();
newEntity->MoveFrom(data);
updated = true;
}
}
if (updated)
{
// Notify all the callbacks
auto iter = callbacks.find(uid);
if (iter != callbacks.cend())
{
for (auto func : iter->second)
{
if (func != nullptr)
{
func(entity, newEntity);
}
}
}
// Clean up if this is a delete operation
if (isDelete)
{
entities.erase(entityIter);
callbacks.erase(iter);
}
}
return true;
}
virtual void Shutdown() override
{
std::lock_guard<std::mutex> guard(mutex);
pendingShutdown = true;
callbacks.clear();
entities.clear();
remoteUrls.clear();
}
void Start()
{
pplx::cancellation_token cancelToken = config->GetCancellationToken();
uint32_t pollingFrequency = config->GetPollingFrequency();
std::shared_ptr<EntityWatchdog> thisPtr = this->shared_from_this();
pplx::create_task([cancelToken, pollingFrequency, thisPtr]()
{
while (thisPtr != nullptr && !thisPtr->pendingShutdown && !cancelToken.is_canceled())
{
thisPtr->Run();
// Sleep
std::this_thread::sleep_for(std::chrono::milliseconds(pollingFrequency));
}
}, cancelToken).then([](pplx::task<void> task)
{
try
{
task.get();
}
catch (...)
{
// There's nothing to do here
}
});
}
size_t Watch(const std::map<utility::string_t, utility::string_t> searchParams, const uint16_t startIndex, const uint16_t maxResults, const utility::string_t& sortBy, std::function<void(std::shared_ptr<T> obj)> callback)
{
// TODO
return 0;
}
size_t Watch(std::shared_ptr<T> entity, EntityCallback callback)
{
size_t result = 0;
if (entity == nullptr)
{
throw Exception(400, _XPLATSTR("Provided entity is not valid. Was it deleted?"));
}
utility::string_t entityUid = entity->GetUid();
if (entity->GetRemoteUrl().empty())
{
throw Exception(400, _XPLATSTR("Provided entity does not have a remote url set."));
}
std::lock_guard<std::mutex> guard(mutex);
// Add the provided callback to the list for this entity
auto iter = callbacks.find(entityUid);
if (iter != callbacks.cend())
{
// Add the callback to the existing list
iter->second.push_back(callback);
result = iter->second.size() - 1;
}
else
{
// Create a new list of callbacks
std::vector<EntityCallback> cbs;
cbs.push_back(callback);
callbacks.insert_or_assign(entityUid, cbs);
result = cbs.size() - 1;
}
// Add the entity to the list being monitored
auto iter2 = entities.find(entityUid);
if (iter2 == entities.cend())
{
entities.insert_or_assign(entityUid, entity);
}
// Store the remote URL
auto iter3 = remoteUrls.find(entityUid);
if (iter3 == remoteUrls.cend())
{
remoteUrls.insert_or_assign(entityUid, entity->GetRemoteUrl());
}
return result;
}
void Unwatch(std::shared_ptr<T> entity, size_t id)
{
if (entity != nullptr)
{
std::lock_guard<std::mutex> guard(mutex);
utility::string_t entityUid = entity->GetUid();
if (!entityUid.empty())
{
auto iter = callbacks.find(entityUid);
if (iter != callbacks.cend())
{
// Remove the call back from our list
if (id >= 0 && id < iter->second.size())
{
auto idx = iter->second.begin() + id;
iter->second.erase(idx);
}
// If there are no more callbacks, stop monitoring this entity
if (iter->second.size() == 0)
{
entities.erase(entityUid);
remoteUrls.erase(entityUid);
}
}
}
}
}
private:
void RetrieveEntity(utility::string_t entityUid, utility::string_t url)
{
std::shared_ptr<EntityWatchdog> thisPtr = this->shared_from_this();
if (thisPtr != nullptr && !entityUid.empty() && !url.empty())
{
std::map<utility::string_t, utility::string_t> queryParams;
std::map<utility::string_t, utility::string_t> headerParams;
std::shared_ptr<T> oldEntity = thisPtr->entities.find(entityUid)->second;
apiClient->Get(url, queryParams, headerParams).then([entityUid, oldEntity, thisPtr, url](Variant result)
{
if (3 == result.index())
{
std::shared_ptr<Object> obj = std::get<std::shared_ptr<Object>>(result);
std::shared_ptr<T> newEntity = std::make_shared<T>();
newEntity->MoveFrom(obj);
newEntity->SetRemoteUrl(url);
// Did we receive a new version?
if (oldEntity == nullptr || newEntity->GetVersion() > oldEntity->GetVersion())
{
std::lock_guard<std::mutex> guard(thisPtr->mutex);
// Notify all the callbacks
auto iter = thisPtr->callbacks.find(entityUid);
if (iter != thisPtr->callbacks.cend())
{
for (auto func : iter->second)
{
if (func != nullptr)
{
func(oldEntity, newEntity);
}
}
}
// Update the internal cache
thisPtr->entities.insert_or_assign(entityUid, newEntity);
}
}
})
.then([entityUid, oldEntity, thisPtr](pplx::task<void> task)
{
try
{
task.get();
}
catch (axr::sdk::Exception err)
{
// Check if the entity was no longer found
if (err.error_code().value() == 404)
{
std::lock_guard<std::mutex> guard(thisPtr->mutex);
// Notify all the callbacks that the object is null
auto iter = thisPtr->callbacks.find(entityUid);
if (iter != thisPtr->callbacks.cend())
{
for (auto func : iter->second)
{
if (func != nullptr)
{
func(oldEntity, nullptr);
}
}
}
// The entity is no longer available so we need to clean up
thisPtr->callbacks.erase(entityUid);
thisPtr->entities.erase(entityUid);
thisPtr->remoteUrls.erase(entityUid);
}
}
catch (...)
{
// Nothing to do here
}
}, config->GetCancellationToken());
}
}
void Run()
{
// When an active WebSocket connection is available it is used to monitor for changes.
// Otherwise we fall back to polling.
if (socket == nullptr)
{
// Go through each monitored entity and attempt to update it
for (auto iter : remoteUrls)
{
RetrieveEntity(iter.first, iter.second);
}
}
}
std::shared_ptr<Configuration> config = nullptr;
std::shared_ptr<net::IApiClient> apiClient = nullptr;
std::map<utility::string_t, std::vector<EntityCallback>> callbacks;
std::map<utility::string_t, std::shared_ptr<T>> entities;
std::map<utility::string_t, utility::string_t> remoteUrls;
std::shared_ptr<net::IWebSocket> socket = nullptr;
std::mutex mutex;
bool pendingShutdown = false;
};
} // namespace utils
} // namespace sdk
} // namespace axr