diff --git a/ChaosMetadataService/ChaosMetadataService.cpp b/ChaosMetadataService/ChaosMetadataService.cpp index a4bcbc115b60fd02ef089d93eaa18d15b0dfeaad..c803293aef8053bdbda69f6d28c0dcef9fa1b3a4 100644 --- a/ChaosMetadataService/ChaosMetadataService.cpp +++ b/ChaosMetadataService/ChaosMetadataService.cpp @@ -19,24 +19,24 @@ * permissions and limitations under the Licence. */ -#include "mds_constants.h" #include "ChaosMetadataService.h" #include <chaos_service_common/DriverPoolManager.h> #include <chaos_service_common/health_system/HealtManagerDirect.h> +#include "mds_constants.h" #include "QueryDataConsumer.h" #include "QueryDataMsgPSConsumer.h" #include "object_storage/object_storage.h" -#include <csignal> +#include <chaos/common/configuration/GlobalConfiguration.h> #include <chaos/common/exception/CException.h> -#include <boost/format.hpp> -#include <boost/algorithm/string.hpp> #include <chaos/common/io/SharedManagedDirecIoDataDriver.h> -#include <regex> #include <chaos/common/utility/ObjectFactoryRegister.h> -#include <chaos/common/configuration/GlobalConfiguration.h> +#include <boost/algorithm/string.hpp> +#include <boost/format.hpp> +#include <csignal> +#include <regex> using namespace std; using namespace chaos; using namespace chaos::common::io; @@ -56,494 +56,519 @@ using namespace chaos::common::cache_system; using namespace chaos::service_common::persistence::data_access; WaitSemaphore ChaosMetadataService::waitCloseSemaphore; -uint64_t ChaosMetadataService::timePrecisionMask=0xFFFFFFFFFFFFFFF0ULL; - -#define LCND_LAPP INFO_LOG(ChaosMetadataService) -#define LCND_LDBG DBG_LOG(ChaosMetadataService) -#define LCND_LERR ERR_LOG(ChaosMetadataService) -#define log(x) LCND_LDBG<<x -ChaosMetadataService::ChaosMetadataService(){ingore_unreg_po = true; -is_present=false; +uint64_t ChaosMetadataService::timePrecisionMask = 0xFFFFFFFFFFFFFFF0ULL; + +#define LCND_LAPP INFO_LOG(ChaosMetadataService) +#define LCND_LDBG DBG_LOG(ChaosMetadataService) +#define LCND_LERR ERR_LOG(ChaosMetadataService) +#define log(x) LCND_LDBG << x +ChaosMetadataService::ChaosMetadataService() { + ingore_unreg_po = true; + is_present = false; }; -ChaosMetadataService::~ChaosMetadataService(){} +ChaosMetadataService::~ChaosMetadataService() {} //! C and C++ attribute parser /*! Specialized option for startup c and cpp program main options parameter */ -void ChaosMetadataService::init(int argc, const char* argv[]) { - is_present=false; - ChaosCommon<ChaosMetadataService>::init(argc, argv); +void ChaosMetadataService::init(int argc, const char* argv[]) { + is_present = false; + ChaosCommon<ChaosMetadataService>::init(argc, argv); } -//!stringbuffer parser +//! stringbuffer parser /* specialized option for string stream buffer with boost semantics */ -void ChaosMetadataService::init(istringstream &initStringStream) { - is_present=false; - ChaosCommon<ChaosMetadataService>::init(initStringStream); +void ChaosMetadataService::init(istringstream& initStringStream) { + is_present = false; + ChaosCommon<ChaosMetadataService>::init(initStringStream); } /* * */ -void ChaosMetadataService::init(void *init_data) { - try { - ChaosCommon<ChaosMetadataService>::init(init_data); - - if (signal((int) SIGINT, ChaosMetadataService::signalHanlder) == SIG_ERR) { - throw CException(-1, "Error registering SIGINT signal", __PRETTY_FUNCTION__); - } - - if (signal((int) SIGQUIT, ChaosMetadataService::signalHanlder) == SIG_ERR) { - throw CException(-2, "Error registering SIG_ERR signal", __PRETTY_FUNCTION__); - } - - if (signal((int) SIGTERM, ChaosMetadataService::signalHanlder) == SIG_ERR) { - throw CException(-3, "Error registering SIGTERM signal", __PRETTY_FUNCTION__); - } - - //scan the setting - if(!setting.persistence_implementation.size()) { - //no cache server provided - throw chaos::CException(-3, "No persistence implementation provided", __PRETTY_FUNCTION__); - } - - if(!setting.persistence_server_list.size()) { - //no cache server provided - throw chaos::CException(-4, "No persistence's server list provided", __PRETTY_FUNCTION__); - } - - if(getGlobalConfigurationInstance()->hasOption(chaos::service_common::persistence::OPT_PERSITENCE_KV_PARAMTER)) { - fillKVParameter(setting.persistence_kv_param_map, - getGlobalConfigurationInstance()->getOption< std::vector< std::string> >(chaos::service_common::persistence::OPT_PERSITENCE_KV_PARAMTER)); - } - - //check for mandatory configuration - if(!getGlobalConfigurationInstance()->hasOption(OPT_CACHE_SERVER_LIST)) { - //no cache server provided - throw chaos::CException(-3, "No cache server provided", __PRETTY_FUNCTION__); - } - - if(getGlobalConfigurationInstance()->hasOption(OPT_SYNCTIME_ERROR)) { - timeError_opt= getGlobalConfigurationInstance()->getOption< uint32_t >(OPT_SYNCTIME_ERROR); - - } - - if(timeError_opt>2048){ - std::stringstream ss; - ss<<"Specified time synchronization error to high:"<<timeError_opt<<" ms"; - throw chaos::CException(-4,ss.str(), __PRETTY_FUNCTION__); - - } - uint64_t tmp=pow(2,(uint32_t)log2(timeError_opt)); - timePrecisionMask=~(tmp-1); - is_present=false; - if(getGlobalConfigurationInstance()->hasOption(OPT_CACHE_DRIVER_KVP)) { - GlobalConfiguration::fillKVParameter(setting.cache_driver_setting.key_value_custom_param, - getGlobalConfigurationInstance()->getOption< std::vector<std::string> >(OPT_CACHE_DRIVER_KVP), ""); -// fillKVParameter(setting.cache_driver_setting.key_value_custom_param, -// getGlobalConfigurationInstance()->getOption< std::vector<std::string> >(OPT_CACHE_DRIVER_KVP)); - } - - if(getGlobalConfigurationInstance()->hasOption(OPT_OBJ_STORAGE_DRIVER_KVP)) { - GlobalConfiguration::fillKVParameter(setting.object_storage_setting.key_value_custom_param, - getGlobalConfigurationInstance()->getOption< std::vector<std::string> >(OPT_OBJ_STORAGE_DRIVER_KVP), ""); -// fillKVParameter(setting.object_storage_setting.key_value_custom_param, -// getGlobalConfigurationInstance()->getOption< std::vector<std::string> >(OPT_OBJ_STORAGE_DRIVER_KVP)); - } - if(getGlobalConfigurationInstance()->hasOption(OPT_LOG_STORAGE_DRIVER_KVP)) { - GlobalConfiguration::fillKVParameter(setting.log_storage_setting.key_value_custom_param, - getGlobalConfigurationInstance()->getOption< std::vector<std::string> >(OPT_LOG_STORAGE_DRIVER_KVP), ""); -// fillKVParameter(setting.object_storage_setting.key_value_custom_param, -// getGlobalConfigurationInstance()->getOption< std::vector<std::string> >(OPT_OBJ_STORAGE_DRIVER_KVP)); - } - //initilize driver pool manager - DriverPoolManager::persistentSetting.persistence_implementation=setting.persistence_implementation; - DriverPoolManager::persistentSetting.persistence_kv_param_map=setting.persistence_kv_param_map; - DriverPoolManager::persistentSetting.persistence_server_list=setting.persistence_server_list; - - DriverPoolManager::cacheSetting.cache_driver_impl=setting.cache_driver_setting.cache_driver_impl; - DriverPoolManager::cacheSetting.startup_chache_servers=setting.cache_driver_setting.startup_chache_servers; - DriverPoolManager::cacheSetting.caching_pool_min_instances_number=setting.cache_driver_setting.caching_pool_min_instances_number; - DriverPoolManager::cacheSetting.log_metric=setting.cache_driver_setting.log_metric; - DriverPoolManager::cacheSetting.key_value_custom_param=setting.cache_driver_setting.key_value_custom_param; - - DriverPoolManager::logSetting.persistence_kv_param_map=setting.log_storage_setting.key_value_custom_param; - DriverPoolManager::logSetting.persistence_implementation=setting.log_storage_setting.driver_impl; - DriverPoolManager::logSetting.persistence_server_list=setting.log_storage_setting.url_list; - - DriverPoolManager::objectSetting.persistence_kv_param_map=setting.object_storage_setting.key_value_custom_param; - DriverPoolManager::objectSetting.persistence_implementation=setting.object_storage_setting.driver_impl; - DriverPoolManager::objectSetting.persistence_server_list=setting.object_storage_setting.url_list; - - InizializableService::initImplementation(DriverPoolManager::getInstance(), NULL, "DriverPoolManager", __PRETTY_FUNCTION__); - - //! batch system - StartableService::initImplementation(MDSBatchExecutor::getInstance(), NULL, "MDSBatchExecutor", __PRETTY_FUNCTION__); - - //api system - api_managment_service.reset(new ApiManagment(), "ApiManagment"); - api_managment_service.init(NULL, __PRETTY_FUNCTION__); +void ChaosMetadataService::init(void* init_data) { + try { + ChaosCommon<ChaosMetadataService>::init(init_data); + + if (signal((int)SIGINT, ChaosMetadataService::signalHanlder) == SIG_ERR) { + throw CException(-1, "Error registering SIGINT signal", __PRETTY_FUNCTION__); + } + + if (signal((int)SIGQUIT, ChaosMetadataService::signalHanlder) == SIG_ERR) { + throw CException(-2, "Error registering SIG_ERR signal", __PRETTY_FUNCTION__); + } + + if (signal((int)SIGTERM, ChaosMetadataService::signalHanlder) == SIG_ERR) { + throw CException(-3, "Error registering SIGTERM signal", __PRETTY_FUNCTION__); + } + + // scan the setting + if (!setting.persistence_implementation.size()) { + // no cache server provided + throw chaos::CException(-3, "No persistence implementation provided", __PRETTY_FUNCTION__); + } + + if (!setting.persistence_server_list.size()) { + // no cache server provided + throw chaos::CException(-4, "No persistence's server list provided", __PRETTY_FUNCTION__); + } + + if (getGlobalConfigurationInstance()->hasOption(chaos::service_common::persistence::OPT_PERSITENCE_KV_PARAMTER)) { + fillKVParameter(setting.persistence_kv_param_map, + getGlobalConfigurationInstance()->getOption<std::vector<std::string> >(chaos::service_common::persistence::OPT_PERSITENCE_KV_PARAMTER)); + } + + // check for mandatory configuration + if (!getGlobalConfigurationInstance()->hasOption(OPT_CACHE_SERVER_LIST)) { + // no cache server provided + throw chaos::CException(-3, "No cache server provided", __PRETTY_FUNCTION__); + } + + if (getGlobalConfigurationInstance()->hasOption(OPT_SYNCTIME_ERROR)) { + timeError_opt = getGlobalConfigurationInstance()->getOption<uint32_t>(OPT_SYNCTIME_ERROR); + } + + if (timeError_opt > 2048) { + std::stringstream ss; + ss << "Specified time synchronization error to high:" << timeError_opt << " ms"; + throw chaos::CException(-4, ss.str(), __PRETTY_FUNCTION__); + } + uint64_t tmp = pow(2, (uint32_t)log2(timeError_opt)); + timePrecisionMask = ~(tmp - 1); + is_present = false; + if (getGlobalConfigurationInstance()->hasOption(OPT_CACHE_DRIVER_KVP)) { + GlobalConfiguration::fillKVParameter(setting.cache_driver_setting.key_value_custom_param, + getGlobalConfigurationInstance()->getOption<std::vector<std::string> >(OPT_CACHE_DRIVER_KVP), + ""); + // fillKVParameter(setting.cache_driver_setting.key_value_custom_param, + // getGlobalConfigurationInstance()->getOption< std::vector<std::string> >(OPT_CACHE_DRIVER_KVP)); + } + + if (getGlobalConfigurationInstance()->hasOption(OPT_OBJ_STORAGE_DRIVER_KVP)) { + GlobalConfiguration::fillKVParameter(setting.object_storage_setting.key_value_custom_param, + getGlobalConfigurationInstance()->getOption<std::vector<std::string> >(OPT_OBJ_STORAGE_DRIVER_KVP), + ""); + // fillKVParameter(setting.object_storage_setting.key_value_custom_param, + // getGlobalConfigurationInstance()->getOption< std::vector<std::string> >(OPT_OBJ_STORAGE_DRIVER_KVP)); + } + if (getGlobalConfigurationInstance()->hasOption(OPT_LOG_STORAGE_DRIVER_KVP)) { + GlobalConfiguration::fillKVParameter(setting.log_storage_setting.key_value_custom_param, + getGlobalConfigurationInstance()->getOption<std::vector<std::string> >(OPT_LOG_STORAGE_DRIVER_KVP), + ""); + // fillKVParameter(setting.object_storage_setting.key_value_custom_param, + // getGlobalConfigurationInstance()->getOption< std::vector<std::string> >(OPT_OBJ_STORAGE_DRIVER_KVP)); + } + // initilize driver pool manager + DriverPoolManager::persistentSetting.persistence_implementation = setting.persistence_implementation; + DriverPoolManager::persistentSetting.persistence_kv_param_map = setting.persistence_kv_param_map; + DriverPoolManager::persistentSetting.persistence_server_list = setting.persistence_server_list; + + DriverPoolManager::cacheSetting.cache_driver_impl = setting.cache_driver_setting.cache_driver_impl; + DriverPoolManager::cacheSetting.startup_chache_servers = setting.cache_driver_setting.startup_chache_servers; + DriverPoolManager::cacheSetting.caching_pool_min_instances_number = setting.cache_driver_setting.caching_pool_min_instances_number; + DriverPoolManager::cacheSetting.log_metric = setting.cache_driver_setting.log_metric; + DriverPoolManager::cacheSetting.key_value_custom_param = setting.cache_driver_setting.key_value_custom_param; + + DriverPoolManager::logSetting.persistence_kv_param_map = setting.log_storage_setting.key_value_custom_param; + DriverPoolManager::logSetting.persistence_implementation = setting.log_storage_setting.driver_impl; + DriverPoolManager::logSetting.persistence_server_list = setting.log_storage_setting.url_list; + + DriverPoolManager::objectSetting.persistence_kv_param_map = setting.object_storage_setting.key_value_custom_param; + DriverPoolManager::objectSetting.persistence_implementation = setting.object_storage_setting.driver_impl; + DriverPoolManager::objectSetting.persistence_server_list = setting.object_storage_setting.url_list; + + InizializableService::initImplementation(DriverPoolManager::getInstance(), NULL, "DriverPoolManager", __PRETTY_FUNCTION__); + + //! batch system + StartableService::initImplementation(MDSBatchExecutor::getInstance(), NULL, "MDSBatchExecutor", __PRETTY_FUNCTION__); + + // api system + api_managment_service.reset(new ApiManagment(), "ApiManagment"); + api_managment_service.init(NULL, __PRETTY_FUNCTION__); #if defined(KAFKA_RDK_ENABLE) || defined(KAFKA_ASIO_ENABLE) #warning "CDS NEEDS KAFKA" - message_consumer.reset(new QueryDataMsgPSConsumer("cds"), "QueryDataMsgPSConsumer"); - if(!message_consumer.get()) throw chaos::CException(-7, "Error instantiating message data consumer", __PRETTY_FUNCTION__); - message_consumer.init(NULL, __PRETTY_FUNCTION__); -#endif - data_consumer.reset(new QueryDataConsumer(), "QueryDataConsumer"); - - if(!data_consumer.get()) throw chaos::CException(-7, "Error instantiating data consumer", __PRETTY_FUNCTION__); - data_consumer.init(NULL, __PRETTY_FUNCTION__); - - //initialize cron manager - InizializableService::initImplementation(cron_job::MDSCronusManager::getInstance(), - NULL, - "MDSConousManager", - __PRETTY_FUNCTION__); - - InizializableService::initImplementation(SharedManagedDirecIoDataDriver::getInstance(), NULL, "SharedManagedDirecIoDataDriver", __PRETTY_FUNCTION__); - - StartableService::initImplementation(HealtManagerDirect::getInstance(), NULL, "HealtManagerDirect", __PRETTY_FUNCTION__); - - - } catch (CException& ex) { - DECODE_CHAOS_EXCEPTION(ex) - exit(1); - } - //start data manager + message_consumer.reset(new QueryDataMsgPSConsumer("cds"), "QueryDataMsgPSConsumer"); + if (!message_consumer.get()) throw chaos::CException(-7, "Error instantiating message data consumer", __PRETTY_FUNCTION__); + message_consumer.init(NULL, __PRETTY_FUNCTION__); +#endif + data_consumer.reset(new QueryDataConsumer(), "QueryDataConsumer"); + + if (!data_consumer.get()) throw chaos::CException(-7, "Error instantiating data consumer", __PRETTY_FUNCTION__); + data_consumer.init(NULL, __PRETTY_FUNCTION__); + + // initialize cron manager + InizializableService::initImplementation(cron_job::MDSCronusManager::getInstance(), + NULL, + "MDSConousManager", + __PRETTY_FUNCTION__); + + InizializableService::initImplementation(SharedManagedDirecIoDataDriver::getInstance(), NULL, "SharedManagedDirecIoDataDriver", __PRETTY_FUNCTION__); + + StartableService::initImplementation(HealtManagerDirect::getInstance(), NULL, "HealtManagerDirect", __PRETTY_FUNCTION__); + + } catch (CException& ex) { + DECODE_CHAOS_EXCEPTION(ex) + exit(1); + } + // start data manager } -int ChaosMetadataService::notifyNewNode(const std::string& nodeuid){ - LCND_LDBG<<" NEW NODE:"<<nodeuid; - return message_consumer->consumeHealthDataEvent(nodeuid, 0, ChaosStringSetConstSPtr(), ChaosMakeSharedPtr<Buffer>()); - +int ChaosMetadataService::notifyNewNode(const std::string& nodeuid) { + LCND_LDBG << " NEW NODE:" << nodeuid; + return message_consumer->consumeHealthDataEvent(nodeuid, 0, ChaosStringSetConstSPtr(), ChaosMakeSharedPtr<Buffer>()); } /* * */ -void ChaosMetadataService::start() { - //lock o monitor for waith the end - try { - ChaosCommon<ChaosMetadataService>::start(); - StartableService::startImplementation(MDSBatchExecutor::getInstance(), "MDSBatchExecutor", __PRETTY_FUNCTION__); - - //start batch system - data_consumer.start( __PRETTY_FUNCTION__); - LAPP_ <<"\n----------------------------------------------------------------------"<< - "\n!CHAOS Metadata service started" << - "\nRPC Server address: " << NetworkBroker::getInstance()->getRPCUrl() << - "\nDirectIO Server address: " << NetworkBroker::getInstance()->getDirectIOUrl() << - CHAOS_FORMAT("\nData Service published with url: %1%|0", %NetworkBroker::getInstance()->getDirectIOUrl()) << - "\nTime precision mask: "<<std::hex<<timePrecisionMask<<std::dec<<"\n----------------------------------------------------------------------"; - - //register this process on persistence database - persistence::data_access::DataServiceDataAccess *ds_da = DriverPoolManager::getInstance()->getPersistenceDataAccess<persistence::data_access::DataServiceDataAccess>(); - std::string unique_uid=NetworkBroker::getInstance()->getRPCUrl(); - LCND_LDBG<<"-----REGISTERING ---"; - - ds_da->registerNode(setting.ha_zone_name, - unique_uid, - NetworkBroker::getInstance()->getDirectIOUrl(), - 0,getBuildInfo(chaos::common::data::CDWUniquePtr())); - - //at this point i must with for end signal - chaos::common::async_central::AsyncCentralManager::getInstance()->addTimer(this, - 0, - chaos::common::constants::HBTimersTimeoutinMSec); - - - - StartableService::startImplementation(HealtManagerDirect::getInstance(), "HealtManagerDirect", __PRETTY_FUNCTION__); - HealtManagerDirect::getInstance()->addNewNode(unique_uid); - HealtManagerDirect::getInstance()->addNodeMetricValue(unique_uid, - NodeHealtDefinitionKey::NODE_HEALT_STATUS, - NodeHealtDefinitionValue::NODE_HEALT_STATUS_START); - #if defined(KAFKA_RDK_ENABLE) || defined(KAFKA_ASIO_ENABLE) - - sleep(chaos::common::constants::HBTimersTimeoutinMSec/1000); - LCND_LDBG<<"-----SUBSCRIBING---"; - - message_consumer.start( __PRETTY_FUNCTION__); - - #endif - - waitCloseSemaphore.wait(); - } catch (CException& ex) { - DECODE_CHAOS_EXCEPTION(ex) - } - //execute the deinitialization of CU - try{ - stop(); - } catch (CException& ex) { - DECODE_CHAOS_EXCEPTION(ex) - } - - try{ - deinit(); - } catch (CException& ex) { - DECODE_CHAOS_EXCEPTION(ex) - } +void ChaosMetadataService::start() { + // lock o monitor for waith the end + try { + ChaosCommon<ChaosMetadataService>::start(); + StartableService::startImplementation(MDSBatchExecutor::getInstance(), "MDSBatchExecutor", __PRETTY_FUNCTION__); + + // start batch system + data_consumer.start(__PRETTY_FUNCTION__); + LAPP_ << "\n----------------------------------------------------------------------" + << "\n!CHAOS Metadata service started" + << "\nRPC Server address: " << NetworkBroker::getInstance()->getRPCUrl() << "\nDirectIO Server address: " << NetworkBroker::getInstance()->getDirectIOUrl() << CHAOS_FORMAT("\nData Service published with url: %1%|0", % NetworkBroker::getInstance()->getDirectIOUrl()) << "\nTime precision mask: " << std::hex << timePrecisionMask << std::dec << "\n----------------------------------------------------------------------"; + + // register this process on persistence database + persistence::data_access::DataServiceDataAccess* ds_da = DriverPoolManager::getInstance()->getPersistenceDataAccess<persistence::data_access::DataServiceDataAccess>(); + std::string unique_uid = NetworkBroker::getInstance()->getRPCUrl(); + LCND_LDBG << "-----REGISTERING ---"; + + ds_da->registerNode(setting.ha_zone_name, + unique_uid, + NetworkBroker::getInstance()->getDirectIOUrl(), + 0, + getBuildInfo(chaos::common::data::CDWUniquePtr())); + + // at this point i must with for end signal + chaos::common::async_central::AsyncCentralManager::getInstance()->addTimer(this, + 0, + chaos::common::constants::HBTimersTimeoutinMSec); + + StartableService::startImplementation(HealtManagerDirect::getInstance(), "HealtManagerDirect", __PRETTY_FUNCTION__); + HealtManagerDirect::getInstance()->addNewNode(unique_uid); + HealtManagerDirect::getInstance()->addNodeMetricValue(unique_uid, + NodeHealtDefinitionKey::NODE_HEALT_STATUS, + NodeHealtDefinitionValue::NODE_HEALT_STATUS_START); +#if defined(KAFKA_RDK_ENABLE) || defined(KAFKA_ASIO_ENABLE) + + sleep(chaos::common::constants::HBTimersTimeoutinMSec / 1000); + LCND_LDBG << "-----SUBSCRIBING---"; + + message_consumer.start(__PRETTY_FUNCTION__); + +#endif + + waitCloseSemaphore.wait(); + } catch (CException& ex) { + DECODE_CHAOS_EXCEPTION(ex) + } + // execute the deinitialization of CU + try { + stop(); + } catch (CException& ex) { + DECODE_CHAOS_EXCEPTION(ex) + } + + try { + deinit(); + } catch (CException& ex) { + DECODE_CHAOS_EXCEPTION(ex) + } } void ChaosMetadataService::timeout() { - int err = 0; - HealthStat service_proc_stat; - const std::string ds_uid = NetworkBroker::getInstance()->getRPCUrl(); - persistence::data_access::DataServiceDataAccess *ds_da = DriverPoolManager::getInstance()->getPersistenceDataAccess<persistence::data_access::DataServiceDataAccess>(); - persistence::data_access::NodeDataAccess *n_da = DriverPoolManager::getInstance()->getPersistenceDataAccess<persistence::data_access::NodeDataAccess>(); - service_proc_stat.mds_received_timestamp = TimingUtil::getTimeStamp(); - if(is_present==false){ - bool presence=false; - if(n_da->checkNodePresence(presence, ds_uid) != 0) { - LCND_LERR << CHAOS_FORMAT("Error check if this mds [%1%] description is registered", %NetworkBroker::getInstance()->getRPCUrl()); - //return; - } - - if(presence == false) { - //reinsert mds - ds_da->registerNode(setting.ha_zone_name, - NetworkBroker::getInstance()->getRPCUrl(), - NetworkBroker::getInstance()->getDirectIOUrl(), - 0, - getBuildInfo(chaos::common::data::CDWUniquePtr())); - } - is_present=presence; + int err = 0; + HealthStat service_proc_stat; + const std::string ds_uid = NetworkBroker::getInstance()->getRPCUrl(); + persistence::data_access::DataServiceDataAccess* ds_da = DriverPoolManager::getInstance()->getPersistenceDataAccess<persistence::data_access::DataServiceDataAccess>(); + persistence::data_access::NodeDataAccess* n_da = DriverPoolManager::getInstance()->getPersistenceDataAccess<persistence::data_access::NodeDataAccess>(); + service_proc_stat.mds_received_timestamp = TimingUtil::getTimeStamp(); + if (is_present == false) { + bool presence = false; + if (n_da->checkNodePresence(presence, ds_uid) != 0) { + LCND_LERR << CHAOS_FORMAT("Error check if this mds [%1%] description is registered", % NetworkBroker::getInstance()->getRPCUrl()); + // return; } - //update proc stat - ProcStatCalculator::update(service_proc_stat); - if((err = n_da->setNodeHealthStatus(NetworkBroker::getInstance()->getRPCUrl(), - service_proc_stat))) { - LCND_LERR << CHAOS_FORMAT("error storing health data into database for this mds [%1%]", %NetworkBroker::getInstance()->getRPCUrl()); + if (presence == false) { + // reinsert mds + ds_da->registerNode(setting.ha_zone_name, + NetworkBroker::getInstance()->getRPCUrl(), + NetworkBroker::getInstance()->getDirectIOUrl(), + 0, + getBuildInfo(chaos::common::data::CDWUniquePtr())); } + is_present = presence; + } + + // update proc stat + ProcStatCalculator::update(service_proc_stat); + if ((err = n_da->setNodeHealthStatus(NetworkBroker::getInstance()->getRPCUrl(), + service_proc_stat))) { + LCND_LERR << CHAOS_FORMAT("error storing health data into database for this mds [%1%]", % NetworkBroker::getInstance()->getRPCUrl()); + } } -bool ChaosMetadataService::isNodeAlive(const std::string& uid){ - ChaosStringVector s; - s.push_back(uid); - bool alive=areNodeAlive(s)[0]; - //LCND_LDBG<<"NODE:"<<uid<<" "<<((alive)?"TRUE":"FALSE"); - return alive; - } +bool ChaosMetadataService::isNodeAlive(const std::string& uid) { + ChaosStringVector s; + uint64_t now = chaos::common::utility::TimingUtil::getTimeStamp(); + if (alive_cache.count(uid)){ + if((alive_cache[uid] >= (now - (2 * chaos::common::constants::HBTimersTimeoutinMSec)))){ + // LCND_LDBG << uid<<" found in cache"; + + return true; + } + } + s.push_back(uid); + bool alive = areNodeAlive(s)[0]; + // LCND_LDBG<<"NODE:"<<uid<<" "<<((alive)?"TRUE":"FALSE"); + return alive; +} using namespace chaos::metadata_service::object_storage::abstraction; using namespace chaos::metadata_service::persistence::data_access; -int ChaosMetadataService::removeStorageData(const std::string& control_unit_found,uint64_t start,uint64_t remove_until_ts){ - int err; - auto *obj_storage_da = DriverPoolManager::getInstance()->getObjectStorageDrv().getDataAccess<ObjectStorageDataAccess>(); +int ChaosMetadataService::removeStorageData(const std::string& control_unit_found, uint64_t start, uint64_t remove_until_ts) { + int err; + auto* obj_storage_da = DriverPoolManager::getInstance()->getObjectStorageDrv().getDataAccess<ObjectStorageDataAccess>(); + + if (obj_storage_da == NULL) { + LCND_LERR << " cannot access object storage resources"; + return -1; + } + LCND_LDBG << " deleting node storage " << control_unit_found << " from:" << start << " to:" << remove_until_ts; + const std::string output_key = control_unit_found + DataPackPrefixID::OUTPUT_DATASET_POSTFIX; + const std::string input_key = control_unit_found + DataPackPrefixID::INPUT_DATASET_POSTFIX; + const std::string system_key = control_unit_found + DataPackPrefixID::SYSTEM_DATASET_POSTFIX; + const std::string custom_key = control_unit_found + DataPackPrefixID::CUSTOM_DATASET_POSTFIX; + // const std::string health_key = control_unit_found + NodeHealtDefinitionKey::HEALT_KEY_POSTFIX; + const std::string dev_alarm_key = control_unit_found + DataPackPrefixID::DEV_ALARM_DATASET_POSTFIX; + const std::string cu_alarm_key = control_unit_found + DataPackPrefixID::CU_ALARM_DATASET_POSTFIX; + + try { + log(CHAOS_FORMAT("Remove data for key %1%", % output_key)); + + if ((err = obj_storage_da->deleteObject(output_key, + start, + remove_until_ts))) { + log(CHAOS_FORMAT("Error erasing key %1% for control unit %2% with error %3%", % output_key % control_unit_found % err)); + } + + log(CHAOS_FORMAT("Remove data for key %1%", % input_key)); + if ((err = obj_storage_da->deleteObject(input_key, + start, + remove_until_ts))) { + log(CHAOS_FORMAT("Error erasing key %1% for control unit %2% with error %3%", % input_key % control_unit_found % err)); + } + + log(CHAOS_FORMAT("Remove data for key %1%", % system_key)); + if ((err = obj_storage_da->deleteObject(system_key, + start, + remove_until_ts))) { + log(CHAOS_FORMAT("Error erasing key %1% for control unit %2% with error %3%", % system_key % control_unit_found % err)); + } + + log(CHAOS_FORMAT("Remove data for key %1%", % custom_key)); + if ((err = obj_storage_da->deleteObject(custom_key, + start, + remove_until_ts))) { + log(CHAOS_FORMAT("Error erasing key %1% for control unit %2% with error %3%", % custom_key % control_unit_found % err)); + } - if(obj_storage_da==NULL ){ - LCND_LERR<< " cannot access object storage resources"; - return -1; + /* log(CHAOS_FORMAT("Remove data for key %1%", %health_key)); + if((err = obj_storage_da->deleteObject(health_key, + start, + remove_until_ts))){ + log(CHAOS_FORMAT("Error erasing key %1% for control unit %2% with error %3%", %health_key%control_unit_found%err)); + } + */ + log(CHAOS_FORMAT("Remove data for key %1%", % dev_alarm_key)); + if ((err = obj_storage_da->deleteObject(dev_alarm_key, + start, + remove_until_ts))) { + log(CHAOS_FORMAT("Error erasing key %1% for control unit %2% with error %3%", % dev_alarm_key % control_unit_found % err)); } - LCND_LDBG<<" deleting node storage "<<control_unit_found<<" from:"<<start<<" to:"<<remove_until_ts; - const std::string output_key = control_unit_found + DataPackPrefixID::OUTPUT_DATASET_POSTFIX; - const std::string input_key = control_unit_found + DataPackPrefixID::INPUT_DATASET_POSTFIX; - const std::string system_key = control_unit_found + DataPackPrefixID::SYSTEM_DATASET_POSTFIX; - const std::string custom_key = control_unit_found + DataPackPrefixID::CUSTOM_DATASET_POSTFIX; - // const std::string health_key = control_unit_found + NodeHealtDefinitionKey::HEALT_KEY_POSTFIX; - const std::string dev_alarm_key = control_unit_found + DataPackPrefixID::DEV_ALARM_DATASET_POSTFIX; - const std::string cu_alarm_key = control_unit_found + DataPackPrefixID::CU_ALARM_DATASET_POSTFIX; - - try { - log(CHAOS_FORMAT("Remove data for key %1%", %output_key)); - - if((err = obj_storage_da->deleteObject(output_key, - start, - remove_until_ts))){ - log(CHAOS_FORMAT("Error erasing key %1% for control unit %2% with error %3%", %output_key%control_unit_found%err)); - } - - log(CHAOS_FORMAT("Remove data for key %1%", %input_key)); - if((err = obj_storage_da->deleteObject(input_key, - start, - remove_until_ts))){ - log(CHAOS_FORMAT("Error erasing key %1% for control unit %2% with error %3%", %input_key%control_unit_found%err)); - } - - log(CHAOS_FORMAT("Remove data for key %1%", %system_key)); - if((err = obj_storage_da->deleteObject(system_key, - start, - remove_until_ts))){ - log(CHAOS_FORMAT("Error erasing key %1% for control unit %2% with error %3%", %system_key%control_unit_found%err)); - } - - log(CHAOS_FORMAT("Remove data for key %1%", %custom_key)); - if((err = obj_storage_da->deleteObject(custom_key, - start, - remove_until_ts))){ - log(CHAOS_FORMAT("Error erasing key %1% for control unit %2% with error %3%", %custom_key%control_unit_found%err)); - } - - /* log(CHAOS_FORMAT("Remove data for key %1%", %health_key)); - if((err = obj_storage_da->deleteObject(health_key, - start, - remove_until_ts))){ - log(CHAOS_FORMAT("Error erasing key %1% for control unit %2% with error %3%", %health_key%control_unit_found%err)); - } - */ - log(CHAOS_FORMAT("Remove data for key %1%", %dev_alarm_key)); - if((err = obj_storage_da->deleteObject(dev_alarm_key, - start, - remove_until_ts))){ - log(CHAOS_FORMAT("Error erasing key %1% for control unit %2% with error %3%", %dev_alarm_key%control_unit_found%err)); - } - - log(CHAOS_FORMAT("Remove data for key %1%", %cu_alarm_key)); - if((err = obj_storage_da->deleteObject(cu_alarm_key, - start, - remove_until_ts))){ - log(CHAOS_FORMAT("Error erasing key %1% for control unit %2% with error %3%", %cu_alarm_key%control_unit_found%err)); - } - - log(CHAOS_FORMAT("Remove log for cu %1%", %control_unit_found)); - if((err = DriverPoolManager::getInstance()->getPersistenceDataAccess<persistence::data_access::LoggingDataAccess>()->eraseLogBeforTS(control_unit_found, - remove_until_ts))){ - log(CHAOS_FORMAT("Error erasing logging for control unit %1% with error %2%", %control_unit_found%err)); - } - }catch(CException& ex){ - log(ex.what()); - return -100; - }catch(...){ - log("Undeterminated error during ageing management"); - return -200; - } - return err; + log(CHAOS_FORMAT("Remove data for key %1%", % cu_alarm_key)); + if ((err = obj_storage_da->deleteObject(cu_alarm_key, + start, + remove_until_ts))) { + log(CHAOS_FORMAT("Error erasing key %1% for control unit %2% with error %3%", % cu_alarm_key % control_unit_found % err)); + } + + log(CHAOS_FORMAT("Remove log for cu %1%", % control_unit_found)); + if ((err = DriverPoolManager::getInstance()->getPersistenceDataAccess<persistence::data_access::LoggingDataAccess>()->eraseLogBeforTS(control_unit_found, + remove_until_ts))) { + log(CHAOS_FORMAT("Error erasing logging for control unit %1% with error %2%", % control_unit_found % err)); + } + } catch (CException& ex) { + log(ex.what()); + return -100; + } catch (...) { + log("Undeterminated error during ageing management"); + return -200; + } + return err; +} +static boost::mutex mutex_cache; +void ChaosMetadataService::updateLiveCache(const chaos::common::data::CDataWrapper* d) { + if (d && d->hasKey(DataPackCommonKey::DPCK_TIMESTAMP) && d->hasKey(chaos::NodeDefinitionKey::NODE_UNIQUE_ID)) { + std::string name = d->getStringValue(chaos::NodeDefinitionKey::NODE_UNIQUE_ID); + updateLiveCache(name,d->getInt64Value(DataPackCommonKey::DPCK_TIMESTAMP)); + } +} +void ChaosMetadataService::updateLiveCache(const std::string& name,int64_t t) { + boost::lock_guard<boost::mutex> l(mutex_cache); + if (alive_cache.count(name)) { + if (alive_cache[name] < t) { + // LCND_LDBG << name<<" updated cache"; + + alive_cache[name] = t; + } + } else { + alive_cache[name] = t; + } + } -std::vector<bool> ChaosMetadataService::areNodeAlive(const ChaosStringVector& uids){ - int err=0; - std::vector<bool> res; - CacheDriver& cache_slot = DriverPoolManager::getInstance()->getCacheDrv(); - DataBuffer data_buffer; - MultiCacheData multi_cached_data; - ChaosStringVector keys; - for(ChaosStringVector::const_iterator i=uids.begin();i!=uids.end();i++){ - keys.push_back((*i)+NodeHealtDefinitionKey::HEALT_KEY_POSTFIX); - } - if(keys.size()==0) return res; - err = cache_slot.getData(keys, - multi_cached_data); - uint64_t now=chaos::common::utility::TimingUtil::getTimeStamp(); - for(ChaosStringVectorConstIterator it = keys.begin(), - end = keys.end(); - it != end; - it++) { - const CacheData& cached_element = multi_cached_data[*it]; - if(!cached_element || - cached_element->size() == 0) { - res.push_back(false); - } else { - CDataWrapper ca(cached_element->data(),cached_element->size()); - uint64_t ts=0; - if(ca.hasKey(NodeHealtDefinitionKey::NODE_HEALT_MDS_TIMESTAMP)){ - ts=ca.getInt64Value(NodeHealtDefinitionKey::NODE_HEALT_MDS_TIMESTAMP); - } else if(ca.hasKey(DataPackCommonKey::DPCK_TIMESTAMP)){ - ts=ca.getInt64Value(DataPackCommonKey::DPCK_TIMESTAMP); - } - res.push_back((ts>(now-(2*chaos::common::constants::HBTimersTimeoutinMSec)))); - - } - } - return res; - +std::vector<bool> ChaosMetadataService::areNodeAlive(const ChaosStringVector& uids) { + int err = 0; + std::vector<bool> res; + CacheDriver& cache_slot = DriverPoolManager::getInstance()->getCacheDrv(); + DataBuffer data_buffer; + MultiCacheData multi_cached_data; + ChaosStringVector keys; + uint64_t now = chaos::common::utility::TimingUtil::getTimeStamp(); + + for (ChaosStringVector::const_iterator i = uids.begin(); i != uids.end(); i++) { + if ((alive_cache.count(*i) == 0)) { + keys.push_back((*i) + NodeHealtDefinitionKey::HEALT_KEY_POSTFIX); + } else if (alive_cache[*i] < (now - (2 * chaos::common::constants::HBTimersTimeoutinMSec))) { + keys.push_back((*i) + NodeHealtDefinitionKey::HEALT_KEY_POSTFIX); + } else { + res.push_back(true); + } + } + if (keys.size() == 0) return res; + err = cache_slot.getData(keys, + multi_cached_data); + for (ChaosStringVectorConstIterator it = keys.begin(), + end = keys.end(); + it != end; + it++) { + const CacheData& cached_element = multi_cached_data[*it]; + if (!cached_element || + cached_element->size() == 0) { + res.push_back(false); + } else { + CDataWrapper ca(cached_element->data(), cached_element->size()); + uint64_t ts = 0; + if (ca.hasKey(NodeHealtDefinitionKey::NODE_HEALT_MDS_TIMESTAMP)) { + ts = ca.getInt64Value(NodeHealtDefinitionKey::NODE_HEALT_MDS_TIMESTAMP); + } else if (ca.hasKey(DataPackCommonKey::DPCK_TIMESTAMP)) { + ts = ca.getInt64Value(DataPackCommonKey::DPCK_TIMESTAMP); + } + updateLiveCache(*it,ts); + res.push_back((ts > (now - (2 * chaos::common::constants::HBTimersTimeoutinMSec)))); + } + } + return res; } /* Stop the toolkit execution */ void ChaosMetadataService::stop() { - CHAOS_NOT_THROW(StartableService::stopImplementation(HealtManagerDirect::getInstance(), "HealtManagerDirect", __PRETTY_FUNCTION__);); - - chaos::common::async_central::AsyncCentralManager::getInstance()->removeTimer(this); - #if defined(KAFKA_RDK_ENABLE) || defined(KAFKA_ASIO_ENABLE) - message_consumer.stop( __PRETTY_FUNCTION__); - - #endif - //stop data consumer - data_consumer.stop( __PRETTY_FUNCTION__); - - StartableService::stopImplementation(MDSBatchExecutor::getInstance(), "MDSBatchExecutor", __PRETTY_FUNCTION__); - - ChaosCommon<ChaosMetadataService>::stop(); - //endWaithCondition.notify_one(); - waitCloseSemaphore.unlock(); + CHAOS_NOT_THROW(StartableService::stopImplementation(HealtManagerDirect::getInstance(), "HealtManagerDirect", __PRETTY_FUNCTION__);); + + chaos::common::async_central::AsyncCentralManager::getInstance()->removeTimer(this); +#if defined(KAFKA_RDK_ENABLE) || defined(KAFKA_ASIO_ENABLE) + message_consumer.stop(__PRETTY_FUNCTION__); + +#endif + // stop data consumer + data_consumer.stop(__PRETTY_FUNCTION__); + + StartableService::stopImplementation(MDSBatchExecutor::getInstance(), "MDSBatchExecutor", __PRETTY_FUNCTION__); + + ChaosCommon<ChaosMetadataService>::stop(); + // endWaithCondition.notify_one(); + waitCloseSemaphore.unlock(); } /* Deiniti all the manager */ void ChaosMetadataService::deinit() { - InizializableService::deinitImplementation(SharedManagedDirecIoDataDriver::getInstance(), "SharedManagedDirecIoDataDriver", __PRETTY_FUNCTION__); - - CHAOS_NOT_THROW(StartableService::deinitImplementation(HealtManagerDirect::getInstance(), "HealtManagerDirect", __PRETTY_FUNCTION__);); - - InizializableService::deinitImplementation(cron_job::MDSCronusManager::getInstance(), - "MDSConousManager", - __PRETTY_FUNCTION__); - //deinit api system - CHAOS_NOT_THROW(api_managment_service.deinit(__PRETTY_FUNCTION__);) - if(message_consumer.get()) { - message_consumer.deinit(__PRETTY_FUNCTION__); - } - if(data_consumer.get()) { - data_consumer.deinit(__PRETTY_FUNCTION__); - } - - StartableService::deinitImplementation(MDSBatchExecutor::getInstance(), "MDSBatchExecutor", __PRETTY_FUNCTION__); - - //deinitilize driver pool manager - InizializableService::deinitImplementation(DriverPoolManager::getInstance(), "DriverPoolManager", __PRETTY_FUNCTION__); - - ChaosCommon<ChaosMetadataService>::stop(); - LAPP_ << "-----------------------------------------"; - LAPP_ << "Metadata service has been stopped"; - LAPP_ << "-----------------------------------------"; + InizializableService::deinitImplementation(SharedManagedDirecIoDataDriver::getInstance(), "SharedManagedDirecIoDataDriver", __PRETTY_FUNCTION__); + + CHAOS_NOT_THROW(StartableService::deinitImplementation(HealtManagerDirect::getInstance(), "HealtManagerDirect", __PRETTY_FUNCTION__);); + + InizializableService::deinitImplementation(cron_job::MDSCronusManager::getInstance(), + "MDSConousManager", + __PRETTY_FUNCTION__); + // deinit api system + CHAOS_NOT_THROW(api_managment_service.deinit(__PRETTY_FUNCTION__);) + if (message_consumer.get()) { + message_consumer.deinit(__PRETTY_FUNCTION__); + } + if (data_consumer.get()) { + data_consumer.deinit(__PRETTY_FUNCTION__); + } + + StartableService::deinitImplementation(MDSBatchExecutor::getInstance(), "MDSBatchExecutor", __PRETTY_FUNCTION__); + + // deinitilize driver pool manager + InizializableService::deinitImplementation(DriverPoolManager::getInstance(), "DriverPoolManager", __PRETTY_FUNCTION__); + + ChaosCommon<ChaosMetadataService>::stop(); + LAPP_ << "-----------------------------------------"; + LAPP_ << "Metadata service has been stopped"; + LAPP_ << "-----------------------------------------"; } /* * */ void ChaosMetadataService::signalHanlder(int signalNumber) { - waitCloseSemaphore.unlock(); + waitCloseSemaphore.unlock(); } void ChaosMetadataService::fillKVParameter(std::map<std::string, std::string>& kvmap, - const std::vector<std::string>& multitoken_param) { - //! Regular expression for check server endpoint with the sintax hostname:[priority_port:service_port] - std::regex KVParamRegex("[a-zA-Z0-9/_-]+:[a-zA-Z0-9/_-]+"); - std::vector<std::string> kv_splitted; - std::vector<std::string> kvtokens; - for(std::vector<std::string>::const_iterator it = multitoken_param.begin(); - it != multitoken_param.end(); - it++) { - - const std::string& param_key = *it; - - - - if(!std::regex_match(param_key, KVParamRegex)) { - throw chaos::CException(-3, "Malformed kv parameter string", __PRETTY_FUNCTION__); - } - - - boost::algorithm::split(kvtokens, - param_key, - boost::algorithm::is_any_of("-"), - boost::algorithm::token_compress_on); - - //clear previosly pair - kv_splitted.clear(); - - //get new pair - boost::algorithm::split(kv_splitted, - param_key, - boost::algorithm::is_any_of(":"), - boost::algorithm::token_compress_on); - // add key/value pair - kvmap.insert(std::pair<std::string,std::string>(kv_splitted[0], kv_splitted[1])); + const std::vector<std::string>& multitoken_param) { + //! Regular expression for check server endpoint with the sintax hostname:[priority_port:service_port] + std::regex KVParamRegex("[a-zA-Z0-9/_-]+:[a-zA-Z0-9/_-]+"); + std::vector<std::string> kv_splitted; + std::vector<std::string> kvtokens; + for (std::vector<std::string>::const_iterator it = multitoken_param.begin(); + it != multitoken_param.end(); + it++) { + const std::string& param_key = *it; + + if (!std::regex_match(param_key, KVParamRegex)) { + throw chaos::CException(-3, "Malformed kv parameter string", __PRETTY_FUNCTION__); } + + boost::algorithm::split(kvtokens, + param_key, + boost::algorithm::is_any_of("-"), + boost::algorithm::token_compress_on); + + // clear previosly pair + kv_splitted.clear(); + + // get new pair + boost::algorithm::split(kv_splitted, + param_key, + boost::algorithm::is_any_of(":"), + boost::algorithm::token_compress_on); + // add key/value pair + kvmap.insert(std::pair<std::string, std::string>(kv_splitted[0], kv_splitted[1])); + } } diff --git a/ChaosMetadataService/ChaosMetadataService.h b/ChaosMetadataService/ChaosMetadataService.h index c2689320f563aa6e9453bc12df3024b507097ed6..112fc6700830f2064b0c98c72082586096ac5337 100644 --- a/ChaosMetadataService/ChaosMetadataService.h +++ b/ChaosMetadataService/ChaosMetadataService.h @@ -95,6 +95,18 @@ namespace chaos { */ std::vector<bool> areNodeAlive(const ChaosStringVector& uids); bool isNodeAlive(const std::string& uid); + std::map <std::string,int64_t> alive_cache; + /** + * @brief update alive_cache if dataset timestamp is newwer + * + */ + void updateLiveCache(const chaos::common::data::CDataWrapper*); + + /** + * @brief update alive_cache if ts is newer + * + */ + void updateLiveCache(const std::string& name,int64_t te); /** * @brief remove storage data to from diff --git a/ChaosMetadataService/QueryDataMsgPSConsumer.cpp b/ChaosMetadataService/QueryDataMsgPSConsumer.cpp index 8af97756948be75a7c928cab3e4ec98c91c98e0f..af2342d41a86588a48dd8ac113c499db1f6013a5 100644 --- a/ChaosMetadataService/QueryDataMsgPSConsumer.cpp +++ b/ChaosMetadataService/QueryDataMsgPSConsumer.cpp @@ -89,7 +89,7 @@ void QueryDataMsgPSConsumer::messageHandler(const chaos::common::message::ele_t& } else if(pktype==DataPackCommonKey::DPCK_DATASET_TYPE_HEALTH) { uint64_t ts=0; - + // ChaosMetadataService::getInstance()->alive_cache[data.cd->getStringValue(NodeDefinitionKey::NODE_UNIQUE_ID)]=TimingUtil::getTimeStamp(); if(data.cd->hasKey(DataPackCommonKey::DPCK_TIMESTAMP)){ ts=data.cd->getInt64Value(DataPackCommonKey::DPCK_TIMESTAMP); if((TimingUtil::getTimeStamp()-ts)>(chaos::common::constants::HBTimersTimeoutinMSec*2)){ diff --git a/ChaosMetadataService/object_storage/influxDB/InfluxDB.cpp b/ChaosMetadataService/object_storage/influxDB/InfluxDB.cpp index dfe938adfce40b44e30389540c7ab4a435bc3d33..9a0aaebc4fb5893d852f486b99b0cc76ef0fbe73 100644 --- a/ChaosMetadataService/object_storage/influxDB/InfluxDB.cpp +++ b/ChaosMetadataService/object_storage/influxDB/InfluxDB.cpp @@ -82,6 +82,10 @@ inline bool skipDefault(const std::string& name){ if(name==chaos::DataPackCommonKey::DPCK_DEVICE_ID) return true; if(name==chaos::DataServiceNodeDefinitionKey::DS_STORAGE_TYPE) return true; if(name==chaos::DataPackCommonKey::NODE_MDS_TIMEDIFF) return true; + if(name==chaos::ControlUnitDatapackCommonKey::RUN_ID) return true; + if(name==chaos::DataPackCommonKey::DPCK_DEVICE_ID) return true; + if(name==chaos::NodeHealtDefinitionKey::NODE_HEALT_MDS_TIMESTAMP) return true; + return false; diff --git a/chaos/common/caching_system/CacheDriver.cpp b/chaos/common/caching_system/CacheDriver.cpp index f5da770bb77a997587340403855d59b72d4f33e3..6eea41936d5d9b7420fb11a903f8cdb98b9ec911 100644 --- a/chaos/common/caching_system/CacheDriver.cpp +++ b/chaos/common/caching_system/CacheDriver.cpp @@ -20,6 +20,8 @@ */ #include "CacheDriver.h" #include <chaos/common/data/CDataWrapper.h> +#include <chaos/common/utility/TimingUtil.h> +#include <chaos/common/global.h> using namespace chaos::common::cache_system; CacheDriver::CacheDriver(std::string alias): @@ -27,6 +29,8 @@ NamedService(alias){} CacheDriver::~CacheDriver() {} +std::map<std::string,std::pair<int64_t,chaos::common::data::CDWShrdPtr> > CacheDriver::first_level_cache; +std::map<std::string,int32_t> CacheDriver::enable_cache_for_ms; //! init /*! Need a point to a structure DBDriverSetting for the setting @@ -42,31 +46,88 @@ void CacheDriver::deinit() {} chaos::common::data::CDWShrdPtr CacheDriver::getData(const std::string& key){ CacheData d; chaos::common::data::CDWShrdPtr ret; + if(enable_cache_for_ms.count(key)&&first_level_cache.count(key)){ + uint64_t now = chaos::common::utility::TimingUtil::getTimeStamp(); + if((now-first_level_cache[key].first)<enable_cache_for_ms[key]){ + // LDBG_ << "retrive from caching:"<<key; + + return first_level_cache[key].second; + } + } + if(getData(key,d)==0){ if(d.get()&&d->size()){ chaos::common::data::CDataWrapper* tmp = new chaos::common::data::CDataWrapper(d->data(),d->size()); ret.reset(tmp); + if(enable_cache_for_ms.count(key)){ + uint64_t now = chaos::common::utility::TimingUtil::getTimeStamp(); + // LDBG_ << "mupdate caching:"<<key; + + first_level_cache[key]={now,ret}; + + } return ret; } } return ret; } +void CacheDriver::enableCache(const std::string&key,int32_t validity_ms){ + if(validity_ms>0){ + enable_cache_for_ms[key]=validity_ms; + LDBG_ << "enabling caching for key:"<<key<<" validity:"<<validity_ms<<" ms"; + + } else { + LDBG_ << "disabling caching for key:"<<key; + enable_cache_for_ms.erase(key); + } + +} + std::vector<chaos::common::data::CDWShrdPtr> CacheDriver::getData(const ChaosStringVector& keys){ std::vector<chaos::common::data::CDWShrdPtr> ret; MultiCacheData multi_cached_data; - if(getData(keys,multi_cached_data)==0){ + std::map<std::string,bool> is_cached; + uint64_t now = chaos::common::utility::TimingUtil::getTimeStamp(); + + int res=0; + if(enable_cache_for_ms.size()==0){ + res=getData(keys,multi_cached_data); + } else{ + ChaosStringVector nocached; + for(ChaosStringVector::const_iterator i=keys.begin();i!=keys.end();i++){ + if(!(enable_cache_for_ms.count(*i)&&first_level_cache.count(*i)&&((now-first_level_cache[*i].first)<enable_cache_for_ms[*i]))){ + nocached.push_back(*i); + is_cached[*i]=false; + } else { + is_cached[*i]=true; + + } + } + res=getData(nocached,multi_cached_data); + + } + if(res==0){ for(ChaosStringVectorConstIterator it = keys.begin(), end = keys.end(); it != end; it++) { - const CacheData& cached_element = multi_cached_data[*it]; - if((cached_element.get()==NULL) || (cached_element->size() == 0)){ - ret.push_back(chaos::common::data::CDWShrdPtr()); - } else { - chaos::common::data::CDWShrdPtr r =chaos::common::data::CDWShrdPtr(new chaos::common::data::CDataWrapper(cached_element->data(),cached_element->size())); - ret.push_back(r); + if((is_cached.size()==0) || (is_cached[*it]==false)){ + const CacheData& cached_element = multi_cached_data[*it]; + if((cached_element.get()==NULL) || (cached_element->size() == 0)){ + ret.push_back(chaos::common::data::CDWShrdPtr()); + } else { + chaos::common::data::CDWShrdPtr r =chaos::common::data::CDWShrdPtr(new chaos::common::data::CDataWrapper(cached_element->data(),cached_element->size())); + ret.push_back(r); + if(enable_cache_for_ms[*it]>0){ + // LDBG_ << "mupdate caching:"<<*it; + first_level_cache[*it]={now,r}; + } + } + } else { + // LDBG_ << "mretrive from caching:"<<*it; + ret.push_back(first_level_cache[*it].second); } } diff --git a/chaos/common/caching_system/CacheDriver.h b/chaos/common/caching_system/CacheDriver.h index 1c34a92d425b82e55b0a6a491e508072560f5bde..bdd65d6189a66c746b09ee4f76c7981448eb251b 100644 --- a/chaos/common/caching_system/CacheDriver.h +++ b/chaos/common/caching_system/CacheDriver.h @@ -54,7 +54,15 @@ namespace chaos { CacheDriver(std::string alias); public: CacheDriverSetting cache_settings; - + static std::map<std::string,std::pair<int64_t,chaos::common::data::CDWShrdPtr> > first_level_cache; + static std::map<std::string,int32_t> enable_cache_for_ms; + /** + * @brief enable cache for key + * + * + * @param validity_ms =0 means disable + */ + void enableCache(const std::string&key,int32_t validity_ms); virtual ~CacheDriver(); virtual int putData(const std::string& key, diff --git a/chaos_service_common/ChaosManager.cpp b/chaos_service_common/ChaosManager.cpp index 2aee1e7a0f7dd6c35b21acaa7d126dd734f7d632..18065b55412228f5c7240b1c4a4dcc5be2f469e6 100644 --- a/chaos_service_common/ChaosManager.cpp +++ b/chaos_service_common/ChaosManager.cpp @@ -5,6 +5,7 @@ * Created on 21/04/2021 */ #include "ChaosManager.h" +#include <ChaosMetadataService/ChaosMetadataService.h> #include <chaos/common/message/MDSMessageChannel.h> #include <ChaosMetadataService/api/node/ClearCommandQueue.h> #include <ChaosMetadataService/api/node/CommandTemplateSubmit.h> @@ -93,7 +94,9 @@ using namespace chaos::metadata_service::api::service; CDWShrdPtr ChaosManager::getLiveChannel(const std::string& key) { ChaosSharedPtr<chaos::common::data::CDataWrapper> ret; if (cache_driver) { - return cache_driver->getData(key); + ret=cache_driver->getData(key); + context->updateLiveCache(ret.get()); + return ret; } return ret; } @@ -103,7 +106,9 @@ CDWShrdPtr ChaosManager::getLiveChannel(const std::string& key, int domain) { std::string lkey = key + chaos::datasetTypeToPostfix(domain); char* value; if (cache_driver) { - return cache_driver->getData(key); + ret=cache_driver->getData(key); + context->updateLiveCache(ret.get()); + return ret; } return ret; } @@ -112,10 +117,14 @@ ChaosManager::ChaosManager(const chaos::common::data::CDataWrapper& conf) if (init(conf) != 0) { throw chaos::CException(-1, "Cannot initialize ", __PRETTY_FUNCTION__); } + context= chaos::metadata_service::ChaosMetadataService::getInstance(); + } ChaosManager::ChaosManager() : cache_driver(NULL), persistence_driver(NULL),storage_driver(NULL) { chaos::common::message::MDSMessageChannel* mdsChannel = chaos::common::network::NetworkBroker::getInstance()->getMetadataserverMessageChannel(); + context= chaos::metadata_service::ChaosMetadataService::getInstance(); + if (mdsChannel) { CDWUniquePtr best_available_da_ptr; if (!mdsChannel->getDataDriverBestConfiguration(best_available_da_ptr, 5000)) { @@ -298,6 +307,9 @@ chaos::common::data::VectorCDWShrdPtr ChaosManager::getLiveChannel(const std::ve chaos::common::data::VectorCDWShrdPtr results; if (cache_driver) { results = cache_driver->getData(channels); + for(int cnt=0;cnt<results.size();cnt++){ + context->updateLiveCache(results[cnt].get()); + } return results; } return results; @@ -1099,6 +1111,15 @@ CDWUniquePtr ChaosManager::cuGetFullDescription(const std::string& uid) { } return res; } +int ChaosManager::enableLiveCaching(const std::string key,int32_t duration_ms){ + if (cache_driver) { + cache_driver->enableCache(key,duration_ms); + + return 0; + } + return -1; + +} int ChaosManager::nodeSearch(const std::string& unique_id_filter, chaos::NodeType::NodeSearchType node_type_filter, bool alive_only, diff --git a/chaos_service_common/ChaosManager.h b/chaos_service_common/ChaosManager.h index 76d55ddbdc8c0800f1c0caea4d64b36f8ae58b84..93dda4483d72f88b23b1d8d067e456287670ae28 100644 --- a/chaos_service_common/ChaosManager.h +++ b/chaos_service_common/ChaosManager.h @@ -17,6 +17,9 @@ #include <chaos/common/batch_command/BatchCommandTypes.h> #define DEFAULT_TIMEOUT_FOR_CONTROLLER 10000000 namespace chaos { + namespace metadata_service{ + class ChaosMetadataService; + } namespace common { namespace cache_system { class CacheDriver; @@ -38,7 +41,7 @@ class ChaosManager : public chaos::common::utility::SingletonCW<ChaosManager>{ chaos::service_common::persistence::data_access::AbstractPersistenceDriver* storage_driver; chaos::service_common::persistence::data_access::AbstractPersistenceDriver* log_driver; - + chaos::metadata_service::ChaosMetadataService* context; // ::common::misc::data::DBbase* db; // NetworkBroker *broker; //chaos::common::message::MDSMessageChannel *mdsChannel; @@ -125,6 +128,7 @@ chaos::common::data::CDWUniquePtr checkAgentHostedProcess(const std::string&name chaos::common::data::CDWUniquePtr clearCommandQueue(const std::string&name); chaos::common::data::CDWUniquePtr killCurrentCommand(const std::string&name); +int enableLiveCaching(const std::string key,int32_t duration_ms); }; } // namespace service_common