From e7ffa7e3d5b3fd5311285f98c9ebf95e5fed1152 Mon Sep 17 00:00:00 2001 From: amichelo <andrea.michelotti@lnf.infn.it> Date: Thu, 4 Nov 2021 17:00:06 +0100 Subject: [PATCH] snapshot optimised and no with direct io --- .../TestDatasetIO/ChaosDatasetIO.cpp | 14 +- ChaosMetadataService/CMakeLists.txt | 1 + .../api/service/RetrieveMultipleData.cpp | 102 ++ .../api/service/RetrieveMultipleData.h | 48 + .../api/service/ServiceApiGroup.cpp | 2 + .../mongodb/MongoDBSnapshotDataAccess.cpp | 4 +- chaos/common/io/IODataDriver.cpp | 25 +- chaos/common/io/IODataDriver.h | 13 +- chaos/common/io/IODirectIODriver.cpp | 12 +- chaos/common/io/IODirectIODriver.h | 5 +- chaos/common/io/IODirectIOPSMsgDriver.cpp | 37 +- chaos/common/io/IODirectIOPSMsgDriver.h | 12 +- chaos/common/message/MDSMessageChannel.cpp | 1631 +++++++++-------- chaos/common/message/MDSMessageChannel.h | 8 + .../data_manager/KeyDataStorage.cpp | 35 +- .../monitor_system/QuantumSlotScheduler.cpp | 10 +- .../monitor_system/QuantumSlotScheduler.h | 2 +- .../node_controller/CUController.cpp | 8 +- chaos_service_common/ChaosManager.cpp | 23 +- 19 files changed, 1148 insertions(+), 844 deletions(-) create mode 100644 ChaosMetadataService/api/service/RetrieveMultipleData.cpp create mode 100644 ChaosMetadataService/api/service/RetrieveMultipleData.h diff --git a/ChaosDataExport/TestDatasetIO/ChaosDatasetIO.cpp b/ChaosDataExport/TestDatasetIO/ChaosDatasetIO.cpp index c0216aaca..8476d003a 100644 --- a/ChaosDataExport/TestDatasetIO/ChaosDatasetIO.cpp +++ b/ChaosDataExport/TestDatasetIO/ChaosDatasetIO.cpp @@ -173,10 +173,9 @@ namespace driver{ ChaosDataSet ChaosDatasetIO::getDataset(const std::string &dsname,int type){ size_t dim; ChaosDataSet tmp; - char*ptr=ioLiveDataDriver->retriveRawData(dsname+chaos::datasetTypeToPostfix(type),&dim); - if(ptr){ - tmp.reset(new chaos::common::data::CDataWrapper(ptr)); - delete[](ptr); + CDWUniquePtr ptr=ioLiveDataDriver->retrieveData(dsname+chaos::datasetTypeToPostfix(type)); + if(ptr.get()){ + tmp.reset(ptr.release()); } return tmp; @@ -185,10 +184,9 @@ namespace driver{ ChaosDataSet ChaosDatasetIO::getDataset(int type){ size_t dim; ChaosDataSet tmp; - char*ptr=ioLiveDataDriver->retriveRawData(uid+chaos::datasetTypeToPostfix(type),&dim); - if(ptr){ - tmp.reset(new chaos::common::data::CDataWrapper(ptr)); - delete[](ptr); + CDWUniquePtr ptr=ioLiveDataDriver->retrieveData(uid+chaos::datasetTypeToPostfix(type)); + if(ptr.get()){ + tmp.reset(ptr.release()); } return tmp; diff --git a/ChaosMetadataService/CMakeLists.txt b/ChaosMetadataService/CMakeLists.txt index e82375bed..e1152c912 100644 --- a/ChaosMetadataService/CMakeLists.txt +++ b/ChaosMetadataService/CMakeLists.txt @@ -271,6 +271,7 @@ SET(api_src ${api_src} api/service/ServiceApiGroup.cpp api/service/SetVariable.cpp api/service/RemoveVariable.cpp api/service/GetVariable.cpp + api/service/RetrieveMultipleData.cpp api/service/QueryDataCloud.cpp api/service/DeleteDataCloud.cpp) diff --git a/ChaosMetadataService/api/service/RetrieveMultipleData.cpp b/ChaosMetadataService/api/service/RetrieveMultipleData.cpp new file mode 100644 index 000000000..d8a7e6e5a --- /dev/null +++ b/ChaosMetadataService/api/service/RetrieveMultipleData.cpp @@ -0,0 +1,102 @@ +/* + * Copyright 2012, 2017 INFN + * + * Licensed under the EUPL, Version 1.2 or – as soon they + * will be approved by the European Commission - subsequent + * versions of the EUPL (the "Licence"); + * You may not use this work except in compliance with the + * Licence. + * You may obtain a copy of the Licence at: + * + * https://joinup.ec.europa.eu/software/page/eupl + * + * Unless required by applicable law or agreed to in + * writing, software distributed under the Licence is + * distributed on an "AS IS" basis, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the Licence for the specific language governing + * permissions and limitations under the Licence. + */ + +#include "RetrieveMultipleData.h" + +#include <chaos/common/chaos_types.h> +#include <chaos/common/utility/ObjectFactoryRegister.h> +#include <chaos_service_common/DriverPoolManager.h> +#include "../../object_storage/abstraction/ObjectStorageDataAccess.h" +#define INFO INFO_LOG(RetrieveMultipleData) +#define DBG DBG_LOG(RetrieveMultipleData) +#define ERR ERR_LOG(RetrieveMultipleData) + +using namespace chaos::common::data; +using namespace chaos::metadata_service::api::service; +using namespace chaos::metadata_service::persistence::data_access; +using namespace chaos::metadata_service::object_storage::abstraction; +using namespace chaos::service_common; +using namespace chaos::common::cache_system; + +CHAOS_MDS_DEFINE_API_CLASS_CD(RetrieveMultipleData, "retrieveMultipleData") + +CDWUniquePtr RetrieveMultipleData::execute(CDWUniquePtr api_data) { + int err=-1; + CHECK_CDW_THROW_AND_LOG(api_data, ERR, -1, "No parameter found"); + CDWUniquePtr result(new CDataWrapper()); + + ChaosStringVector nodes; + chaos::common::data::VectorCDWShrdPtr res; + if(api_data->hasKey("nodes")&&api_data->isVectorValue("nodes")){ + CMultiTypeDataArrayWrapperSPtr d = api_data->getVectorValue("tags"); + for(int idx = 0;idx < d->size();idx++) { + nodes.push_back(d->getStringElementAtIndex(idx)); + } + err=execute(nodes,res); + if(err==0){ + for(VectorObject::iterator i=res.begin();i!=res.end();i++){ + result->appendCDataWrapperToArray(*(i->get())); + } + result->finalizeArrayForKey("data"); + + } + } else { + ERR<<" NO 'nodes' list provided"; + + } + + result->addInt32Value("error",err); + + return result; + +} +int RetrieveMultipleData::execute(const ChaosStringVector& keys,chaos::common::data::VectorCDWShrdPtr& result){ + int err=0; +CacheDriver& cache_slot = DriverPoolManager::getInstance()->getCacheDrv(); + try { + //get data + MultiCacheData multi_cached_data; + err = cache_slot.getData(keys, + multi_cached_data); + for (ChaosStringVectorConstIterator it = keys.begin(), + end = keys.end(); + it != end; + it++) { + const CacheData& cached_element = multi_cached_data[*it]; + if (!cached_element || + cached_element->size() == 0) { + result.push_back(chaos::common::data::CDWShrdPtr()); + + } else { + result.push_back(chaos::common::data::CDWShrdPtr((chaos::common::data::CDataWrapper*)cached_element->data())); + + + } + } + + } catch (...) { + ERR<<" CATCH ERROR"; + err=-666; + } + return err; + + +} diff --git a/ChaosMetadataService/api/service/RetrieveMultipleData.h b/ChaosMetadataService/api/service/RetrieveMultipleData.h new file mode 100644 index 000000000..40002ddaf --- /dev/null +++ b/ChaosMetadataService/api/service/RetrieveMultipleData.h @@ -0,0 +1,48 @@ +/* + * Copyright 2012, 2017 INFN + * + * Licensed under the EUPL, Version 1.2 or – as soon they + * will be approved by the European Commission - subsequent + * versions of the EUPL (the "Licence"); + * You may not use this work except in compliance with the + * Licence. + * You may obtain a copy of the Licence at: + * + * https://joinup.ec.europa.eu/software/page/eupl + * + * Unless required by applicable law or agreed to in + * writing, software distributed under the Licence is + * distributed on an "AS IS" basis, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the Licence for the specific language governing + * permissions and limitations under the Licence. + */ + +#ifndef __CHAOSFramework__D12C0AE_833D_4BC5_B5F6_76CF1D4950A0_RetrieveMultipleData_h +#define __CHAOSFramework__D12C0AE_833D_4BC5_B5F6_76CF1D4950A0_RetrieveMultipleData_h + +#include "../AbstractApi.h" +#include <chaos/common/direct_io/channel/DirectIODeviceChannelGlobal.h> + +namespace chaos { + namespace metadata_service { + namespace api { + namespace service { + class RetrieveMultipleData: + public AbstractApi { + protected: + public: + RetrieveMultipleData(); + ~RetrieveMultipleData(); + chaos::common::data::CDWUniquePtr execute(chaos::common::data::CDWUniquePtr api_data); + int execute(const ChaosStringVector& keys,chaos::common::data::VectorCDWShrdPtr& result); + +}; + + } + } + } +} + +#endif /* __CHAOSFramework__D12C0AE_833D_4BC5_B5F6_76CF1D4950A0_RetrieveMultipleData_h */ diff --git a/ChaosMetadataService/api/service/ServiceApiGroup.cpp b/ChaosMetadataService/api/service/ServiceApiGroup.cpp index f30665925..9f82c4e4e 100644 --- a/ChaosMetadataService/api/service/ServiceApiGroup.cpp +++ b/ChaosMetadataService/api/service/ServiceApiGroup.cpp @@ -35,6 +35,7 @@ #include "RemoveVariable.h" #include "QueryDataCloud.h" #include "DeleteDataCloud.h" +#include "RetrieveMultipleData.h" using namespace chaos::metadata_service::api::service; @@ -54,6 +55,7 @@ AbstractApiGroup("service"){ addApi<GetSnapshotDatasetsForNode>(); addApi<QueryDataCloud>(); addApi<DeleteDataCloud>(); + addApi<RetrieveMultipleData>(); addApi<SetVariable>(); addApi<GetVariable>(); diff --git a/ChaosMetadataService/persistence/mongodb/MongoDBSnapshotDataAccess.cpp b/ChaosMetadataService/persistence/mongodb/MongoDBSnapshotDataAccess.cpp index 4b37b0548..30ab08f43 100644 --- a/ChaosMetadataService/persistence/mongodb/MongoDBSnapshotDataAccess.cpp +++ b/ChaosMetadataService/persistence/mongodb/MongoDBSnapshotDataAccess.cpp @@ -211,7 +211,7 @@ int MongoDBSnapshotDataAccess::snapshotGetDatasetForProducerKey(const std::strin mongo::BSONObj q = search_snapshot.obj(); DEBUG_CODE(MDBDSDA_DBG<<log_message("snapshotGetDatasetForProducerKey", - "finedOne", + "findOne", DATA_ACCESS_LOG_1_ENTRY("Query", q.jsonString()));) @@ -384,7 +384,7 @@ int MongoDBSnapshotDataAccess::getSnapshotWorkingState(const std::string& snapsh MDBDSDA_ERR << "Error getting snapsho description with code:" << err; } else if(snapshot_info.isEmpty()) { err = - 10000; - MDBDSDA_ERR << "Snapshto description has not been found" << err; + MDBDSDA_ERR << "Snapshot description has not been found" << err; } else { //we have snapshot description if(snapshot_info.hasField("job_concurency")) { diff --git a/chaos/common/io/IODataDriver.cpp b/chaos/common/io/IODataDriver.cpp index ffcca520a..a795fce38 100644 --- a/chaos/common/io/IODataDriver.cpp +++ b/chaos/common/io/IODataDriver.cpp @@ -8,6 +8,8 @@ #include <chaos/common/global.h> #include <chaos/common/io/IODataDriver.h> +#include <chaos/common/network/NetworkBroker.h> +#include <chaos/common/message/MDSMessageChannel.h> #define IODataDriverLOG_HEAD "[IODataDriver] - " @@ -59,13 +61,12 @@ ArrayPointer<CDataWrapper>* IODataDriver::retriveData(const std::string& key) { ArrayPointer<CDataWrapper> *result = new ArrayPointer<CDataWrapper>(); - char *value = retriveRawData(key); - if (value) { + CDWUniquePtr value=retrieveData(key); + if (value.get()) { //some value has been received //allocate the data wrapper object with serialization got from memcached //CDataWrapper *dataWrapper = - result->add(new CDataWrapper(value)); - free(value); + result->add(value.release()); } return result; } @@ -91,3 +92,19 @@ int IODataDriver::addHandler(chaos::common::message::msgHandler cb){ return 0; } + +int IODataDriver::loadDatasetFromSnapshot(const std::string& restore_point_tag_name, + const std::string& key, + chaos_data::CDWShrdPtr& cdw_shrd_ptr){ + //return IODirectIODriver::loadDatasetTypeFromSnapshotTag(restore_point_tag_name,key,dataset_type,cdw_shrd_ptr); + chaos::common::data::CDataWrapper data_set; + int err = chaos::common::network::NetworkBroker::getInstance()->getMetadataserverMessageChannel()->loadSnapshotNodeDataset(restore_point_tag_name,key,data_set); + // IODirectIOPSMsgDriver_DLDBG_<<"SNAPSHOT:"<<data_set.getJSONString(); + if((!err)){ + cdw_shrd_ptr.reset(data_set.clone().release()); + + } + + return err; + +} diff --git a/chaos/common/io/IODataDriver.h b/chaos/common/io/IODataDriver.h index ba0708a86..086f0f99c 100644 --- a/chaos/common/io/IODataDriver.h +++ b/chaos/common/io/IODataDriver.h @@ -103,8 +103,7 @@ namespace chaos{ * This method retrive the cached object by CSDawrapperUsed as query key and * return a pointer to the class ArrayPointer of CDataWrapper type */ - virtual char * retriveRawData(const std::string& key, - size_t* dataDim=NULL) = 0; + virtual chaos::common::data::CDWUniquePtr retrieveData(const std::string& key) = 0; virtual int retriveMultipleData(const ChaosStringVector& key, @@ -122,6 +121,16 @@ namespace chaos{ const std::string& key, uint32_t dataset_type, chaos_data::CDWShrdPtr& cdw_shrd_ptr) = 0; + //! restore from a tag a dataset associated to a key + /*! + try to load a dataset from snapshot identified by the tag + \param snapshot_tag_name the name of the tag that identify the snapshot + \param key is the unique key of the producer + \param cdatawrapper_handler handler for the found dataset(the deallocation need to be managed by caller) + */ + int loadDatasetFromSnapshot(const std::string& restore_point_tag_name, + const std::string& key, + chaos_data::CDWShrdPtr& cdw_shrd_ptr); /*! Update the driver configuration */ diff --git a/chaos/common/io/IODirectIODriver.cpp b/chaos/common/io/IODirectIODriver.cpp index 145951d32..bd4717b6e 100644 --- a/chaos/common/io/IODirectIODriver.cpp +++ b/chaos/common/io/IODirectIODriver.cpp @@ -205,20 +205,20 @@ int IODirectIODriver::storeHealthData(const std::string& key, } -char* IODirectIODriver::retriveRawData(const std::string& key, size_t *dim) { - char* result = NULL; - +CDWUniquePtr IODirectIODriver::retrieveData(const std::string& key) { + CDWUniquePtr result; + char*ptr; boost::shared_lock<boost::shared_mutex> rl(mutext_feeder); IODirectIODriverClientChannels *next_client = static_cast<IODirectIODriverClientChannels*>(connectionFeeder.getService()); if(!next_client) return NULL; uint32_t size =0; - int err = (int)next_client->device_client_channel->requestLastOutputData(key, &result, size); + int err = (int)next_client->device_client_channel->requestLastOutputData(key, &ptr, size); if(err) { IODirectIODriver_LERR_ << "Error retriving data from data service "<<next_client->connection->getServerDescription()<< " with code:" << err; } else { - *dim = (size_t)size; + result.reset((CDataWrapper*)ptr); } return result; } @@ -272,6 +272,8 @@ int IODirectIODriver::loadDatasetTypeFromSnapshotTag(const std::string& restore_ //we have the dataaset try { cdw_shrd_ptr = snapshot_result.channel_data; + IODirectIODriver_DLDBG_<<"SNAPSHOT type:"<<dataset_type<<" VAL:"<<cdw_shrd_ptr->getJSONString(); + IODirectIODriver_DLDBG_ << "Got dataset type:"<<dataset_type<< " for key:" << key << " from snapshot tag:" <<restore_point_tag_name; } catch (std::exception& ex) { IODirectIODriver_LERR_ << "Error deserializing the dataset type:"<<dataset_type<< " for key:" << key << " from snapshot tag:" <<restore_point_tag_name << " with error:" << ex.what(); diff --git a/chaos/common/io/IODirectIODriver.h b/chaos/common/io/IODirectIODriver.h index d6c0c4e69..db3eee805 100644 --- a/chaos/common/io/IODirectIODriver.h +++ b/chaos/common/io/IODirectIODriver.h @@ -152,10 +152,9 @@ namespace chaos{ chaos::common::data::VectorCDWShrdPtr& result); /* - * retriveRawData + * retrieveRawData */ - char * retriveRawData(const std::string& key, - size_t *dim=NULL); + chaos::common::data::CDWUniquePtr retrieveData(const std::string& key); //! restore from a tag a dataset associated to a key int loadDatasetTypeFromSnapshotTag(const std::string& restore_point_tag_name, diff --git a/chaos/common/io/IODirectIOPSMsgDriver.cpp b/chaos/common/io/IODirectIOPSMsgDriver.cpp index b92dc23d9..bb870eba4 100644 --- a/chaos/common/io/IODirectIOPSMsgDriver.cpp +++ b/chaos/common/io/IODirectIOPSMsgDriver.cpp @@ -229,13 +229,46 @@ int IODirectIOPSMsgDriver::removeData(const std::string& key, uint64_t end_ts) { return chaos::common::network::NetworkBroker::getInstance()->getMetadataserverMessageChannel()->deleteDataCloud(key,start_ts,end_ts); } + /** + * + */ +int IODirectIOPSMsgDriver::retriveMultipleData(const ChaosStringVector& key, + chaos::common::data::VectorCDWShrdPtr& result){ + return chaos::common::network::NetworkBroker::getInstance()->getMetadataserverMessageChannel()->retriveMultipleData(key,result); + } + + /* + * retrieveRawData + */ + CDWUniquePtr IODirectIOPSMsgDriver::retrieveData(const std::string& key){ + return chaos::common::network::NetworkBroker::getInstance()->getMetadataserverMessageChannel()->retrieveData(key); + + } + + int IODirectIOPSMsgDriver::loadDatasetTypeFromSnapshotTag(const std::string& restore_point_tag_name, const std::string& key, uint32_t dataset_type, chaos_data::CDWShrdPtr& cdw_shrd_ptr) { - int err = -1; - + //return IODirectIODriver::loadDatasetTypeFromSnapshotTag(restore_point_tag_name,key,dataset_type,cdw_shrd_ptr); + chaos::common::data::CDataWrapper data_set; + int err = chaos::common::network::NetworkBroker::getInstance()->getMetadataserverMessageChannel()->loadSnapshotNodeDataset(restore_point_tag_name,key,data_set); + // IODirectIOPSMsgDriver_DLDBG_<<"SNAPSHOT:"<<data_set.getJSONString(); + if((dataset_type==chaos::DataType::DatasetType::DatasetTypeInput)&&data_set.hasKey(DataPackID::INPUT_DATASET_ID)&&data_set.isCDataWrapperValue(DataPackID::INPUT_DATASET_ID)){ + cdw_shrd_ptr.reset(data_set.getCSDataValue(DataPackID::INPUT_DATASET_ID).release()); + + } else if((dataset_type==chaos::DataType::DatasetType::DatasetTypeOutput)&&data_set.hasKey(DataPackID::OUTPUT_DATASET_ID)&&data_set.isCDataWrapperValue(DataPackID::OUTPUT_DATASET_ID)){ + cdw_shrd_ptr.reset(data_set.getCSDataValue(DataPackID::OUTPUT_DATASET_ID).release()); + + } else { + IODirectIOPSMsgDriver_LERR_<<" NOR INPUT OR OUTPUT snapshot selected "<<data_set.getJSONString(); + // cdw_shrd_ptr.reset(data_set.clone().release()); + + } + if(cdw_shrd_ptr.get()){ + IODirectIOPSMsgDriver_DLDBG_<<"SNAPSHOT type:"<<dataset_type<<" VAL:"<<cdw_shrd_ptr->getJSONString(); + } return err; } diff --git a/chaos/common/io/IODirectIOPSMsgDriver.h b/chaos/common/io/IODirectIOPSMsgDriver.h index 6e195cf58..b2058a0bd 100644 --- a/chaos/common/io/IODirectIOPSMsgDriver.h +++ b/chaos/common/io/IODirectIOPSMsgDriver.h @@ -90,8 +90,18 @@ namespace chaos{ int removeData(const std::string& key, uint64_t start_ts, uint64_t end_ts); + /** + * + */ + int retriveMultipleData(const ChaosStringVector& key, + chaos::common::data::VectorCDWShrdPtr& result); + + /* + * retrieveRawData + */ + chaos::common::data::CDWUniquePtr retrieveData(const std::string& key); - //! restore from a tag a dataset associated to a key + //! restore from a tag a dataset associated to a key int loadDatasetTypeFromSnapshotTag(const std::string& restore_point_tag_name, const std::string& key, uint32_t dataset_type, diff --git a/chaos/common/message/MDSMessageChannel.cpp b/chaos/common/message/MDSMessageChannel.cpp index 95b05c843..526010b6b 100644 --- a/chaos/common/message/MDSMessageChannel.cpp +++ b/chaos/common/message/MDSMessageChannel.cpp @@ -29,947 +29,992 @@ using namespace chaos::common::utility; using namespace chaos::common::message; using namespace chaos::common::network; -#define NORMAL_NUMBER_OF_ENDPOINT 3 +#define NORMAL_NUMBER_OF_ENDPOINT 3 #define MSG_INFO INFO_LOG(MDSMessageChannel) #define MSG_DBG DBG_LOG(MDSMessageChannel) #define MSG_ERR ERR_LOG(MDSMessageChannel) -#define DECODE_ERROR(x) \ -if((last_error_code = x->getError())){\ -last_error_message = x->getErrorMessage();\ -last_error_domain = x->getErrorDomain();\ -} else {\ -last_error_message = "No Error";\ -last_error_domain = "No Domain";\ -} +#define DECODE_ERROR(x) \ + if ((last_error_code = x->getError())) { \ + last_error_message = x->getErrorMessage(); \ + last_error_domain = x->getErrorDomain(); \ + } else { \ + last_error_message = "No Error"; \ + last_error_domain = "No Domain"; \ + } -MDSMessageChannel::MDSMessageChannel(NetworkBroker *network_broker, +MDSMessageChannel::MDSMessageChannel(NetworkBroker* network_broker, const VectorNetworkAddress& mds_node_address, - MessageRequestDomainSHRDPtr _new_message_request_domain): -MultiAddressMessageChannel(network_broker, - mds_node_address, - _new_message_request_domain), -auto_configure_endpoint(GlobalConfiguration::getInstance()->hasOption(InitOption::OPT_METADATASERVER_AUTO_CONF)) { - //register eviction feature on superclass - setEvitionHandler(ChaosBind(&MDSMessageChannel::evictionHandler, this, ChaosBindPlaceholder(_1))); - setAutoEvitionForDeadUrl(true); + MessageRequestDomainSHRDPtr _new_message_request_domain) + : MultiAddressMessageChannel(network_broker, + mds_node_address, + _new_message_request_domain) + , auto_configure_endpoint(GlobalConfiguration::getInstance()->hasOption(InitOption::OPT_METADATASERVER_AUTO_CONF)) { + // register eviction feature on superclass + setEvitionHandler(ChaosBind(&MDSMessageChannel::evictionHandler, this, ChaosBindPlaceholder(_1))); + setAutoEvitionForDeadUrl(true); } -MDSMessageChannel::~MDSMessageChannel(){} +MDSMessageChannel::~MDSMessageChannel() {} void MDSMessageChannel::init() { - MultiAddressMessageChannel::init(); + MultiAddressMessageChannel::init(); } -void MDSMessageChannel::deinit(){ - MultiAddressMessageChannel::deinit(); +void MDSMessageChannel::deinit() { + MultiAddressMessageChannel::deinit(); } void MDSMessageChannel::manageResource() { - int err = 0; - CDWUniquePtr bestEndpointConf; - - //run multimessage layer task - int nmnode=MultiAddressMessageChannel::getNumberOfManagedNodes() ; - if(nmnode== NORMAL_NUMBER_OF_ENDPOINT || - !auto_configure_endpoint) return; - - //try to get the number of remote url to maximum number - if((err = getDataDriverBestConfiguration(bestEndpointConf)) || (bestEndpointConf.get() == NULL)) { - MSG_ERR << "Error fetching best endpoint"; - return; - } - - if(!bestEndpointConf->hasKey(chaos::NodeDefinitionKey::NODE_RPC_ADDR) || - !bestEndpointConf->isVectorValue(chaos::NodeDefinitionKey::NODE_RPC_ADDR)) { - MSG_ERR << "Node key has not been found"; - return; - } - unsigned int server_to_add = (unsigned int)(NORMAL_NUMBER_OF_ENDPOINT - nmnode); - - CMultiTypeDataArrayWrapperSPtr vec = bestEndpointConf->getVectorValue(chaos::NodeDefinitionKey::NODE_RPC_ADDR); - for(int idx = 0; - idx < vec->size() && - server_to_add; - idx++) { - CNetworkAddress na(vec->getStringElementAtIndex(idx)); - if(!MultiAddressMessageChannel::hasNode(na)) { - MultiAddressMessageChannel::addNode(na); - server_to_add--; - MSG_INFO << CHAOS_FORMAT("Configured new server -> %1%", %na.ip_port); - } + int err = 0; + CDWUniquePtr bestEndpointConf; + + // run multimessage layer task + int nmnode = MultiAddressMessageChannel::getNumberOfManagedNodes(); + if (nmnode == NORMAL_NUMBER_OF_ENDPOINT || + !auto_configure_endpoint) return; + + // try to get the number of remote url to maximum number + if ((err = getDataDriverBestConfiguration(bestEndpointConf)) || (bestEndpointConf.get() == NULL)) { + MSG_ERR << "Error fetching best endpoint"; + return; + } + + if (!bestEndpointConf->hasKey(chaos::NodeDefinitionKey::NODE_RPC_ADDR) || + !bestEndpointConf->isVectorValue(chaos::NodeDefinitionKey::NODE_RPC_ADDR)) { + MSG_ERR << "Node key has not been found"; + return; + } + unsigned int server_to_add = (unsigned int)(NORMAL_NUMBER_OF_ENDPOINT - nmnode); + + CMultiTypeDataArrayWrapperSPtr vec = bestEndpointConf->getVectorValue(chaos::NodeDefinitionKey::NODE_RPC_ADDR); + for (int idx = 0; + idx < vec->size() && + server_to_add; + idx++) { + CNetworkAddress na(vec->getStringElementAtIndex(idx)); + if (!MultiAddressMessageChannel::hasNode(na)) { + MultiAddressMessageChannel::addNode(na); + server_to_add--; + MSG_INFO << CHAOS_FORMAT("Configured new server -> %1%", % na.ip_port); } + } } void MDSMessageChannel::evictionHandler(const chaos::common::network::ServiceRetryInformation& service_retry_information) { - MSG_INFO << CHAOS_FORMAT("Evicted server -> %1%", %service_retry_information.service_url); + MSG_INFO << CHAOS_FORMAT("Evicted server -> %1%", % service_retry_information.service_url); } void MDSMessageChannel::setEndpointAutoConfiguration(bool _auto_configure_endpoint) { - auto_configure_endpoint = _auto_configure_endpoint; + auto_configure_endpoint = _auto_configure_endpoint; } int32_t MDSMessageChannel::getLastErrorCode() { - return MultiAddressMessageChannel::getLastErrorCode(); + return MultiAddressMessageChannel::getLastErrorCode(); } const std::string& MDSMessageChannel::getLastErrorMessage() { - return MultiAddressMessageChannel::getLastErrorMessage(); + return MultiAddressMessageChannel::getLastErrorMessage(); } const std::string& MDSMessageChannel::getLastErrorDomain() { - return MultiAddressMessageChannel::getLastErrorDomain(); + return MultiAddressMessageChannel::getLastErrorDomain(); } void MDSMessageChannel::sendHeartBeatForDeviceID(const std::string& identification_id) { - ChaosUniquePtr<chaos::common::data::CDataWrapper> hb_message(new CDataWrapper()); - hb_message->addStringValue(NodeDefinitionKey::NODE_UNIQUE_ID, identification_id); - sendMessage(NodeDomainAndActionRPC::RPC_DOMAIN, - ChaosSystemDomainAndActionLabel::MDS_CU_HEARTBEAT, - MOVE(hb_message)); + ChaosUniquePtr<chaos::common::data::CDataWrapper> hb_message(new CDataWrapper()); + hb_message->addStringValue(NodeDefinitionKey::NODE_UNIQUE_ID, identification_id); + sendMessage(NodeDomainAndActionRPC::RPC_DOMAIN, + ChaosSystemDomainAndActionLabel::MDS_CU_HEARTBEAT, + MOVE(hb_message)); } int MDSMessageChannel::sendEchoMessage(CDWUniquePtr data, CDWUniquePtr& result) { - int err = 0; - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture(NodeDomainAndActionRPC::RPC_DOMAIN, - NodeDomainAndActionRPC::ACTION_ECHO_TEST, - MOVE(data)); - if(request_future.get()==NULL){ - MSG_ERR<<"Invalid Future response"; - return -1; - } - if(request_future->wait()) { - result = MOVE(request_future->detachResult()); - } else { - result.reset(); - err = request_future->getError(); - } - return err; + int err = 0; + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture(NodeDomainAndActionRPC::RPC_DOMAIN, + NodeDomainAndActionRPC::ACTION_ECHO_TEST, + MOVE(data)); + if (request_future.get() == NULL) { + MSG_ERR << "Invalid Future response"; + return -1; + } + if (request_future->wait()) { + result = MOVE(request_future->detachResult()); + } else { + result.reset(); + err = request_future->getError(); + } + return err; } int MDSMessageChannel::getBuildInfo(CDWUniquePtr& result) { - int err = 0; - CDWUniquePtr data; - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture(NodeDomainAndActionRPC::RPC_DOMAIN, - NodeDomainAndActionRPC::ACTION_GET_BUILD_INFO, - MOVE(data)); - if(request_future->wait()) { - result = MOVE(request_future->detachResult()); - } else { - result.reset(); - err = request_future->getError(); - } - return err; + int err = 0; + CDWUniquePtr data; + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture(NodeDomainAndActionRPC::RPC_DOMAIN, + NodeDomainAndActionRPC::ACTION_GET_BUILD_INFO, + MOVE(data)); + if (request_future->wait()) { + result = MOVE(request_future->detachResult()); + } else { + result.reset(); + err = request_future->getError(); + } + return err; } int MDSMessageChannel::getProcessInfo(CDWUniquePtr& result) { - int err = 0; - CDWUniquePtr data; - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture(NodeDomainAndActionRPC::RPC_DOMAIN, - NodeDomainAndActionRPC::ACTION_GET_PROCESS_INFO, - MOVE(data)); - if(request_future->wait()) { - result = MOVE(request_future->detachResult()); - } else { - result.reset(); - err = request_future->getError(); - } - return err; + int err = 0; + CDWUniquePtr data; + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture(NodeDomainAndActionRPC::RPC_DOMAIN, + NodeDomainAndActionRPC::ACTION_GET_PROCESS_INFO, + MOVE(data)); + if (request_future->wait()) { + result = MOVE(request_future->detachResult()); + } else { + result.reset(); + err = request_future->getError(); + } + return err; } int MDSMessageChannel::sendUnitServerCUStates(CDWUniquePtr device_dataset, - bool requestCheck, - uint32_t millisec_to_wait) { - string currentBrokerIpPort; - getRpcPublishedHostAndPort(currentBrokerIpPort); - device_dataset->addStringValue(NodeDefinitionKey::NODE_RPC_ADDR, currentBrokerIpPort); - - if(requestCheck){ - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture(NodeDomainAndActionRPC::RPC_DOMAIN, - ChaosSystemDomainAndActionLabel::UNIT_SERVER_STATES_ANSWER, - MOVE(device_dataset)); - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - DECODE_ERROR(request_future) - } else { - last_error_code = -1; - } + bool requestCheck, + uint32_t millisec_to_wait) { + string currentBrokerIpPort; + getRpcPublishedHostAndPort(currentBrokerIpPort); + device_dataset->addStringValue(NodeDefinitionKey::NODE_RPC_ADDR, currentBrokerIpPort); + + if (requestCheck) { + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture(NodeDomainAndActionRPC::RPC_DOMAIN, + ChaosSystemDomainAndActionLabel::UNIT_SERVER_STATES_ANSWER, + MOVE(device_dataset)); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + DECODE_ERROR(request_future) } else { - sendMessage(NodeDomainAndActionRPC::RPC_DOMAIN, - ChaosSystemDomainAndActionLabel::UNIT_SERVER_STATES_ANSWER, - MOVE(device_dataset)); + last_error_code = -1; } - return last_error_code; + } else { + sendMessage(NodeDomainAndActionRPC::RPC_DOMAIN, + ChaosSystemDomainAndActionLabel::UNIT_SERVER_STATES_ANSWER, + MOVE(device_dataset)); + } + return last_error_code; } int MDSMessageChannel::sendNodeRegistration(CDWUniquePtr node_description, - bool requestCheck, - uint32_t millisec_to_wait) { - std::string currentBrokerIpPort; - getRpcPublishedHostAndPort(currentBrokerIpPort); - node_description->addStringValue(NodeDefinitionKey::NODE_RPC_ADDR, currentBrokerIpPort); - node_description->addStringValue(NodeDefinitionKey::NODE_HOST_NAME, InetUtility::getHostname()); - //set our timestamp - node_description->addInt64Value(chaos::NodeDefinitionKey::NODE_TIMESTAMP, - chaos::common::utility::TimingUtil::getTimeStamp()); - if(requestCheck){ - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture(NodeDomainAndActionRPC::RPC_DOMAIN, - MetadataServerNodeDefinitionKeyRPC::ACTION_REGISTER_NODE, - MOVE(node_description)); - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - DECODE_ERROR(request_future) - } else { - last_error_code = -1; - } + bool requestCheck, + uint32_t millisec_to_wait) { + std::string currentBrokerIpPort; + getRpcPublishedHostAndPort(currentBrokerIpPort); + node_description->addStringValue(NodeDefinitionKey::NODE_RPC_ADDR, currentBrokerIpPort); + node_description->addStringValue(NodeDefinitionKey::NODE_HOST_NAME, InetUtility::getHostname()); + // set our timestamp + node_description->addInt64Value(chaos::NodeDefinitionKey::NODE_TIMESTAMP, + chaos::common::utility::TimingUtil::getTimeStamp()); + if (requestCheck) { + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture(NodeDomainAndActionRPC::RPC_DOMAIN, + MetadataServerNodeDefinitionKeyRPC::ACTION_REGISTER_NODE, + MOVE(node_description)); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + DECODE_ERROR(request_future) } else { - sendMessage(NodeDomainAndActionRPC::RPC_DOMAIN, - MetadataServerNodeDefinitionKeyRPC::ACTION_REGISTER_NODE, - MOVE(node_description)); + last_error_code = -1; } - return last_error_code; + } else { + sendMessage(NodeDomainAndActionRPC::RPC_DOMAIN, + MetadataServerNodeDefinitionKeyRPC::ACTION_REGISTER_NODE, + MOVE(node_description)); + } + return last_error_code; } int MDSMessageChannel::sentNodeHealthStatus(CDWUniquePtr node_health_data, - bool request_check, - uint32_t millisec_to_wait) { - std::string currentBrokerIpPort; - - if(request_check){ - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture(NodeDomainAndActionRPC::RPC_DOMAIN, - MetadataServerNodeDefinitionKeyRPC::ACTION_NODE_HEALTH_STATUS, - MOVE(node_health_data)); - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - DECODE_ERROR(request_future) - } else { - last_error_code = -1; - } + bool request_check, + uint32_t millisec_to_wait) { + std::string currentBrokerIpPort; + + if (request_check) { + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture(NodeDomainAndActionRPC::RPC_DOMAIN, + MetadataServerNodeDefinitionKeyRPC::ACTION_NODE_HEALTH_STATUS, + MOVE(node_health_data)); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + DECODE_ERROR(request_future) } else { - sendMessage(NodeDomainAndActionRPC::RPC_DOMAIN, - MetadataServerNodeDefinitionKeyRPC::ACTION_NODE_LOAD_COMPLETION, - MOVE(node_health_data)); + last_error_code = -1; } - return last_error_code; + } else { + sendMessage(NodeDomainAndActionRPC::RPC_DOMAIN, + MetadataServerNodeDefinitionKeyRPC::ACTION_NODE_LOAD_COMPLETION, + MOVE(node_health_data)); + } + return last_error_code; } int MDSMessageChannel::sendNodeLoadCompletion(CDWUniquePtr node_information, - bool requestCheck, - uint32_t millisec_to_wait) { - std::string currentBrokerIpPort; - - //get rpc receive port - getRpcPublishedHostAndPort(currentBrokerIpPort); - //ChaosUniquePtr<chaos::common::data::CDataWrapper> data(new CDataWrapper(node_information.getBSONRawData(size_bson))); - node_information->addStringValue(NodeDefinitionKey::NODE_RPC_ADDR, currentBrokerIpPort); - - //set our timestamp - node_information->addInt64Value(chaos::NodeDefinitionKey::NODE_TIMESTAMP, - chaos::common::utility::TimingUtil::getTimeStamp()); - if(requestCheck){ - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture(NodeDomainAndActionRPC::RPC_DOMAIN, - MetadataServerNodeDefinitionKeyRPC::ACTION_NODE_LOAD_COMPLETION, - MOVE(node_information)); - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - DECODE_ERROR(request_future) - } else { - last_error_code = -1; - } + bool requestCheck, + uint32_t millisec_to_wait) { + std::string currentBrokerIpPort; + + // get rpc receive port + getRpcPublishedHostAndPort(currentBrokerIpPort); + // ChaosUniquePtr<chaos::common::data::CDataWrapper> data(new CDataWrapper(node_information.getBSONRawData(size_bson))); + node_information->addStringValue(NodeDefinitionKey::NODE_RPC_ADDR, currentBrokerIpPort); + + // set our timestamp + node_information->addInt64Value(chaos::NodeDefinitionKey::NODE_TIMESTAMP, + chaos::common::utility::TimingUtil::getTimeStamp()); + if (requestCheck) { + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture(NodeDomainAndActionRPC::RPC_DOMAIN, + MetadataServerNodeDefinitionKeyRPC::ACTION_NODE_LOAD_COMPLETION, + MOVE(node_information)); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + DECODE_ERROR(request_future) } else { - sendMessage(NodeDomainAndActionRPC::RPC_DOMAIN, - MetadataServerNodeDefinitionKeyRPC::ACTION_NODE_LOAD_COMPLETION, - MOVE(node_information)); + last_error_code = -1; } - return last_error_code; + } else { + sendMessage(NodeDomainAndActionRPC::RPC_DOMAIN, + MetadataServerNodeDefinitionKeyRPC::ACTION_NODE_LOAD_COMPLETION, + MOVE(node_information)); + } + return last_error_code; } -int MDSMessageChannel::getNetworkAddressForDevice(const std::string& identification_id, +int MDSMessageChannel::getNetworkAddressForDevice(const std::string& identification_id, CDeviceNetworkAddress** deviceNetworkAddress, - uint32_t millisec_to_wait) { - if(!deviceNetworkAddress) return -1; - ChaosUniquePtr<chaos::common::data::CDataWrapper> data(new CDataWrapper()); - data->addStringValue(NodeDefinitionKey::NODE_UNIQUE_ID, identification_id); - - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture(NodeDomainAndActionRPC::RPC_DOMAIN, - "getNodeDescription", - MOVE(data)); - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - DECODE_ERROR(request_future) - if((last_error_code == ErrorCode::EC_NO_ERROR) && - request_future->getResult() && - request_future->getResult()->hasKey(NodeDefinitionKey::NODE_RPC_ADDR) && - request_future->getResult()->hasKey(NodeDefinitionKey::NODE_RPC_DOMAIN)) { - - *deviceNetworkAddress = new CDeviceNetworkAddress(); - (*deviceNetworkAddress)->ip_port = request_future->getResult()->getStringValue(NodeDefinitionKey::NODE_RPC_ADDR); - (*deviceNetworkAddress)->node_id = request_future->getResult()->getStringValue(NodeDefinitionKey::NODE_RPC_DOMAIN); - (*deviceNetworkAddress)->device_id = identification_id; - } + uint32_t millisec_to_wait) { + if (!deviceNetworkAddress) return -1; + ChaosUniquePtr<chaos::common::data::CDataWrapper> data(new CDataWrapper()); + data->addStringValue(NodeDefinitionKey::NODE_UNIQUE_ID, identification_id); + + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture(NodeDomainAndActionRPC::RPC_DOMAIN, + "getNodeDescription", + MOVE(data)); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + DECODE_ERROR(request_future) + if ((last_error_code == ErrorCode::EC_NO_ERROR) && + request_future->getResult() && + request_future->getResult()->hasKey(NodeDefinitionKey::NODE_RPC_ADDR) && + request_future->getResult()->hasKey(NodeDefinitionKey::NODE_RPC_DOMAIN)) { + *deviceNetworkAddress = new CDeviceNetworkAddress(); + (*deviceNetworkAddress)->ip_port = request_future->getResult()->getStringValue(NodeDefinitionKey::NODE_RPC_ADDR); + (*deviceNetworkAddress)->node_id = request_future->getResult()->getStringValue(NodeDefinitionKey::NODE_RPC_DOMAIN); + (*deviceNetworkAddress)->device_id = identification_id; } - return last_error_code; + } + return last_error_code; } int MDSMessageChannel::getLastDatasetForDevice(const std::string& identification_id, - CDWUniquePtr& device_definition, - uint32_t millisec_to_wait) { - ChaosUniquePtr<chaos::common::data::CDataWrapper> data(new CDataWrapper()); - data->addStringValue(NodeDefinitionKey::NODE_UNIQUE_ID, identification_id); - - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("control_unit", - "getFullDescription", - MOVE(data)); + CDWUniquePtr& device_definition, + uint32_t millisec_to_wait) { + ChaosUniquePtr<chaos::common::data::CDataWrapper> data(new CDataWrapper()); + data->addStringValue(NodeDefinitionKey::NODE_UNIQUE_ID, identification_id); + + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("control_unit", + "getFullDescription", + MOVE(data)); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + DECODE_ERROR(request_future) + if ((last_error_code == ErrorCode::EC_NO_ERROR) && + request_future->getResult()) { + device_definition = request_future->detachResult(); + } + } + return last_error_code; +} + +int MDSMessageChannel::loadSnapshotNodeDataset(const std::string& snapname, + const std::string& node_uid, + chaos::common::data::CDataWrapper& data_set, + uint32_t millisec_to_wait) { + int err = 0; + std::map<uint64_t, std::string> mapsnap_res; + if (searchSnapshot(snapname, mapsnap_res, millisec_to_wait) == 0) { + ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); + message->addStringValue(NodeDefinitionKey::NODE_UNIQUE_ID, node_uid); + message->addStringValue("snapshot_name", snapname); + + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", + "getSnapshotDatasetForNode", + MOVE(message)); request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - DECODE_ERROR(request_future) - if((last_error_code == ErrorCode::EC_NO_ERROR) && - request_future->getResult()) { - device_definition = request_future->detachResult(); + if (request_future->wait()) { + DECODE_ERROR(request_future) + err = request_future->getError(); + if (err == 0) { + CMultiTypeDataArrayWrapperSPtr snapshot_list = request_future->getResult()->getVectorValue("dataset_list"); + if (snapshot_list->size()) { + data_set.addStringValue("name", node_uid); + data_set.addInt64Value("timestamp", mapsnap_res.begin()->first); } - } - return last_error_code; -} - -int MDSMessageChannel::loadSnapshotNodeDataset(const std::string& snapname, - const std::string& node_uid, - chaos::common::data::CDataWrapper& data_set, - uint32_t millisec_to_wait){ - int err=0; - std::map<uint64_t,std::string> mapsnap_res; - if(searchSnapshot(snapname,mapsnap_res,millisec_to_wait)==0){ - ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); - message->addStringValue(NodeDefinitionKey::NODE_UNIQUE_ID, node_uid); - message->addStringValue("snapshot_name", snapname); - - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", - "getSnapshotDatasetForNode", - MOVE(message)); - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - DECODE_ERROR(request_future) - err = request_future->getError(); - if(err==0){ - - CMultiTypeDataArrayWrapperSPtr snapshot_list = request_future->getResult()->getVectorValue("dataset_list"); - if(snapshot_list->size()){ - data_set.addStringValue("name",node_uid); - data_set.addInt64Value("timestamp",mapsnap_res.begin()->first); - } - for(int idx = 0; - idx < snapshot_list->size(); - idx++) { - ChaosUniquePtr<chaos::common::data::CDataWrapper> snapshot_dataset_element(snapshot_list->getCDataWrapperElementAtIndex(idx)); - - const std::string dataset_name = snapshot_dataset_element->getStringValue(ControlUnitNodeDefinitionKey::CONTROL_UNIT_DATASET_NAME); - ChaosUniquePtr<chaos::common::data::CDataWrapper> val(snapshot_dataset_element->getCSDataValue("dataset_value")); - if(val->hasKey(chaos::DataPackCommonKey::DPCK_DATASET_TYPE)){ - std::string ret=datasetTypeToHuman(val->getUInt32Value(chaos::DataPackCommonKey::DPCK_DATASET_TYPE)); - data_set.addCSDataValue(ret,*val); - } - } - } - } else { - err = ErrorCode::EC_TIMEOUT; + for (int idx = 0; + idx < snapshot_list->size(); + idx++) { + ChaosUniquePtr<chaos::common::data::CDataWrapper> snapshot_dataset_element(snapshot_list->getCDataWrapperElementAtIndex(idx)); + + const std::string dataset_name = snapshot_dataset_element->getStringValue(ControlUnitNodeDefinitionKey::CONTROL_UNIT_DATASET_NAME); + ChaosUniquePtr<chaos::common::data::CDataWrapper> val(snapshot_dataset_element->getCSDataValue("dataset_value")); + if (val->hasKey(chaos::DataPackCommonKey::DPCK_DATASET_TYPE)) { + std::string ret = datasetTypeToHuman(val->getUInt32Value(chaos::DataPackCommonKey::DPCK_DATASET_TYPE)); + data_set.addCSDataValue(ret, *val); + } } + } + } else { + err = ErrorCode::EC_TIMEOUT; } - return err; + } + return err; } int MDSMessageChannel::getFullNodeDescription(const std::string& identification_id, - CDWUniquePtr& device_definition, - uint32_t millisec_to_wait){ - ChaosUniquePtr<chaos::common::data::CDataWrapper> data(new CDataWrapper()); - data->addStringValue(NodeDefinitionKey::NODE_UNIQUE_ID, identification_id); - data->addBoolValue("all",true); - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("control_unit", - "getFullDescription", - MOVE(data)); - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - DECODE_ERROR(request_future) - if((last_error_code == ErrorCode::EC_NO_ERROR) && - request_future->getResult()) { - device_definition = request_future->detachResult(); - } + CDWUniquePtr& device_definition, + uint32_t millisec_to_wait) { + ChaosUniquePtr<chaos::common::data::CDataWrapper> data(new CDataWrapper()); + data->addStringValue(NodeDefinitionKey::NODE_UNIQUE_ID, identification_id); + data->addBoolValue("all", true); + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("control_unit", + "getFullDescription", + MOVE(data)); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + DECODE_ERROR(request_future) + if ((last_error_code == ErrorCode::EC_NO_ERROR) && + request_future->getResult()) { + device_definition = request_future->detachResult(); } - return last_error_code; -} -int MDSMessageChannel::getScriptDesc(const std::string& scriptID,chaos::common::data::CDWUniquePtr&res,uint32_t millisec_to_wait){ -int err=0; -chaos::common::data::CDWUniquePtr data(new CDataWrapper()); - data->addStringValue("search_string", scriptID); - // first load a list of scripts - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("script", - "searchScript", - MOVE(data)); - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - if((request_future->getError() != ErrorCode::EC_NO_ERROR)){ - MSG_ERR<<" cannot retrieve script list:"<<request_future->getErrorMessage(); - return request_future->getError(); + } + return last_error_code; +} +int MDSMessageChannel::getScriptDesc(const std::string& scriptID, chaos::common::data::CDWUniquePtr& res, uint32_t millisec_to_wait) { + int err = 0; + chaos::common::data::CDWUniquePtr data(new CDataWrapper()); + data->addStringValue("search_string", scriptID); + // first load a list of scripts + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("script", + "searchScript", + MOVE(data)); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + if ((request_future->getError() != ErrorCode::EC_NO_ERROR)) { + MSG_ERR << " cannot retrieve script list:" << request_future->getErrorMessage(); + return request_future->getError(); + } + + if (request_future->getResult() && + request_future->getResult()->hasKey(chaos::MetadataServerApiKey::script::search_script::FOUND_SCRIPT_LIST) && + request_future->getResult()->isVectorValue(chaos::MetadataServerApiKey::script::search_script::FOUND_SCRIPT_LIST)) { + CMultiTypeDataArrayWrapperSPtr scripts = request_future->getResult()->getVectorValue(chaos::MetadataServerApiKey::script::search_script::FOUND_SCRIPT_LIST); + int64_t lastid = -1; + chaos::common::data::CDWUniquePtr last; + for (int cnt = 0; cnt < scripts->size(); cnt++) { + chaos::common::data::CDWUniquePtr curr = scripts->getCDataWrapperElementAtIndex(cnt); + if (curr->getInt64Value("seq") >= lastid) { + lastid = curr->getInt64Value("seq"); + last.reset(curr.release()); } - - if(request_future->getResult() && - request_future->getResult()->hasKey(chaos::MetadataServerApiKey::script::search_script::FOUND_SCRIPT_LIST) && - request_future->getResult()->isVectorValue(chaos::MetadataServerApiKey::script::search_script::FOUND_SCRIPT_LIST)) { - CMultiTypeDataArrayWrapperSPtr scripts=request_future->getResult()->getVectorValue(chaos::MetadataServerApiKey::script::search_script::FOUND_SCRIPT_LIST); - int64_t lastid=-1; - chaos::common::data::CDWUniquePtr last; - for(int cnt=0;cnt<scripts->size();cnt++){ - chaos::common::data::CDWUniquePtr curr=scripts->getCDataWrapperElementAtIndex(cnt); - if(curr->getInt64Value("seq")>=lastid){ - lastid=curr->getInt64Value("seq"); - last.reset(curr.release()); - } - } - if(lastid>-1){ - - chaos::common::data::CDWUniquePtr data(new CDataWrapper()); - data->addStringValue(ExecutionUnitNodeDefinitionKey::CHAOS_SBD_NAME, scriptID); - data->addInt64Value("seq", lastid); - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("script", - "loadFullScript", - MOVE(data)); - - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - if((request_future->getError() != ErrorCode::EC_NO_ERROR)){ - MSG_ERR<<" cannot retrieve load script:"<<request_future->getErrorMessage(); - return request_future->getError(); - } - if(request_future->getResult()){ - res= request_future->detachResult(); - return 0; - } - } - - } - } + } + if (lastid > -1) { + chaos::common::data::CDWUniquePtr data(new CDataWrapper()); + data->addStringValue(ExecutionUnitNodeDefinitionKey::CHAOS_SBD_NAME, scriptID); + data->addInt64Value("seq", lastid); + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("script", + "loadFullScript", + MOVE(data)); + + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + if ((request_future->getError() != ErrorCode::EC_NO_ERROR)) { + MSG_ERR << " cannot retrieve load script:" << request_future->getErrorMessage(); + return request_future->getError(); + } + if (request_future->getResult()) { + res = request_future->detachResult(); + return 0; + } } - MSG_ERR<<" Timeout cannot retrieve load script: for "<<scriptID; + } + } + } + MSG_ERR << " Timeout cannot retrieve load script: for " << scriptID; - return -100; + return -100; } int MDSMessageChannel::getDataDriverBestConfiguration(CDWUniquePtr& device_definition, - uint32_t millisec_to_wait) { - int err = ErrorCode::EC_NO_ERROR; - //send request and wait the response - ChaosUniquePtr<MultiAddressMessageRequestFuture> future = sendRequestWithFuture(DataServiceNodeDomainAndActionRPC::RPC_DOMAIN, - "getBestEndpoints"); - future->setTimeout(millisec_to_wait); - if(future->wait()) { - if((err = future->getError()) == ErrorCode::EC_NO_ERROR) { - device_definition = future->detachResult(); - if(device_definition.get() == NULL) {throw chaos::CException(-1, "Empty result", __PRETTY_FUNCTION__);} - } - } else { - err = ErrorCode::EC_TIMEOUT; + uint32_t millisec_to_wait) { + int err = ErrorCode::EC_NO_ERROR; + // send request and wait the response + ChaosUniquePtr<MultiAddressMessageRequestFuture> future = sendRequestWithFuture(DataServiceNodeDomainAndActionRPC::RPC_DOMAIN, + "getBestEndpoints"); + future->setTimeout(millisec_to_wait); + if (future->wait()) { + if ((err = future->getError()) == ErrorCode::EC_NO_ERROR) { + device_definition = future->detachResult(); + if (device_definition.get() == NULL) { + throw chaos::CException(-1, "Empty result", __PRETTY_FUNCTION__); + } } - return err; + } else { + err = ErrorCode::EC_TIMEOUT; + } + return err; } -int MDSMessageChannel::createNewSnapshot(const std::string& snapshot_name, +int MDSMessageChannel::createNewSnapshot(const std::string& snapshot_name, const ChaosStringVector& node_list, - uint32_t millisec_to_wait) { - int err = ErrorCode::EC_NO_ERROR; - ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); - message->addStringValue("snapshot_name", snapshot_name); - - for(ChaosStringVectorConstIterator it = node_list.begin(), - end = node_list.end(); - it != end; - it++) { - message->appendStringToArray(*it); - } - message->finalizeArrayForKey("node_list"); - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", - "createNewSnapshot", - MOVE(message)); - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - err = request_future->getError(); - } else { - err = ErrorCode::EC_TIMEOUT; - } - return err; + uint32_t millisec_to_wait) { + int err = ErrorCode::EC_NO_ERROR; + ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); + message->addStringValue("snapshot_name", snapshot_name); + + for (ChaosStringVectorConstIterator it = node_list.begin(), + end = node_list.end(); + it != end; + it++) { + message->appendStringToArray(*it); + } + message->finalizeArrayForKey("node_list"); + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", + "createNewSnapshot", + MOVE(message)); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + err = request_future->getError(); + } else { + err = ErrorCode::EC_TIMEOUT; + } + return err; } int MDSMessageChannel::restoreSnapshot(const std::string& snapshot_name, - uint32_t millisec_to_wait) { - int err = ErrorCode::EC_NO_ERROR; - ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); - message->addStringValue("snapshot_name", snapshot_name); - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", - "restoreSnapshot", - MOVE(message)); - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - err = request_future->getError(); - } else { - err = ErrorCode::EC_TIMEOUT; - } - return err; + uint32_t millisec_to_wait) { + int err = ErrorCode::EC_NO_ERROR; + ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); + message->addStringValue("snapshot_name", snapshot_name); + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", + "restoreSnapshot", + MOVE(message)); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + err = request_future->getError(); + } else { + err = ErrorCode::EC_TIMEOUT; + } + return err; } int MDSMessageChannel::deleteSnapshot(const std::string& snapshot_name, - uint32_t millisec_to_wait) { - int err = ErrorCode::EC_NO_ERROR; - ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); - message->addStringValue("snapshot_name", snapshot_name); - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", - "deleteSnapshot", - MOVE(message)); - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - err = request_future->getError(); - } else { - err = ErrorCode::EC_TIMEOUT; - } - return err; -} - -int MDSMessageChannel::searchSnapshot(const std::string& query_filter, - std::map<uint64_t,std::string>& snapshot_found, - uint32_t millisec_to_wait){ - int err = ErrorCode::EC_NO_ERROR; - - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", - "getAllSnapshot"); - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - DECODE_ERROR(request_future) - if((err = request_future->getError()) == ErrorCode::EC_NO_ERROR) { - - if(!request_future->getResult() || - !request_future->getResult()->hasKey("snapshot_list_result") || - !request_future->getResult()->isVectorValue("snapshot_list_result")) return err; - - CMultiTypeDataArrayWrapperSPtr snapshot_desc_list = request_future->getResult()->getVectorValue("snapshot_list_result"); - for(int idx = 0; - idx < snapshot_desc_list->size(); - idx++) { - ChaosUniquePtr<chaos::common::data::CDataWrapper> tmp_desc(snapshot_desc_list->getCDataWrapperElementAtIndex(idx)); - - if(tmp_desc->hasKey("snap_name")) { - if(!query_filter.empty()){ - std::string cmp=tmp_desc->getStringValue("snap_name"); - // TODO: implement filter in DB query - if(strstr(cmp.c_str(),query_filter.c_str())){ - uint64_t tm=tmp_desc->getInt64Value("snap_ts"); - snapshot_found[tm]=cmp; - - } - } else { - uint64_t tm=tmp_desc->getInt64Value("snap_ts"); - snapshot_found[tm]=tmp_desc->getStringValue("snap_name"); - - } - } + uint32_t millisec_to_wait) { + int err = ErrorCode::EC_NO_ERROR; + ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); + message->addStringValue("snapshot_name", snapshot_name); + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", + "deleteSnapshot", + MOVE(message)); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + err = request_future->getError(); + } else { + err = ErrorCode::EC_TIMEOUT; + } + return err; +} + +int MDSMessageChannel::searchSnapshot(const std::string& query_filter, + std::map<uint64_t, std::string>& snapshot_found, + uint32_t millisec_to_wait) { + int err = ErrorCode::EC_NO_ERROR; + + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", + "getAllSnapshot"); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + DECODE_ERROR(request_future) + if ((err = request_future->getError()) == ErrorCode::EC_NO_ERROR) { + if (!request_future->getResult() || + !request_future->getResult()->hasKey("snapshot_list_result") || + !request_future->getResult()->isVectorValue("snapshot_list_result")) return err; + + CMultiTypeDataArrayWrapperSPtr snapshot_desc_list = request_future->getResult()->getVectorValue("snapshot_list_result"); + for (int idx = 0; + idx < snapshot_desc_list->size(); + idx++) { + ChaosUniquePtr<chaos::common::data::CDataWrapper> tmp_desc(snapshot_desc_list->getCDataWrapperElementAtIndex(idx)); + + if (tmp_desc->hasKey("snap_name")) { + if (!query_filter.empty()) { + std::string cmp = tmp_desc->getStringValue("snap_name"); + // TODO: implement filter in DB query + if (strstr(cmp.c_str(), query_filter.c_str())) { + uint64_t tm = tmp_desc->getInt64Value("snap_ts"); + snapshot_found[tm] = cmp; } + } else { + uint64_t tm = tmp_desc->getInt64Value("snap_ts"); + snapshot_found[tm] = tmp_desc->getStringValue("snap_name"); + } } - } else { - err = ErrorCode::EC_TIMEOUT; + } } - return err; + } else { + err = ErrorCode::EC_TIMEOUT; + } + return err; } int MDSMessageChannel::searchSnapshot(const std::string& query_filter, ChaosStringVector& snapshot_found, - uint32_t millisec_to_wait) { - std::map<uint64_t,std::string> found; - int ret=searchSnapshot(query_filter, - found,millisec_to_wait); - - if(ret==0){ - for(std::map<uint64_t,std::string>::iterator i=found.begin();i!=found.end();i++){ - snapshot_found.push_back(i->second); - } + uint32_t millisec_to_wait) { + std::map<uint64_t, std::string> found; + int ret = searchSnapshot(query_filter, + found, + millisec_to_wait); + + if (ret == 0) { + for (std::map<uint64_t, std::string>::iterator i = found.begin(); i != found.end(); i++) { + snapshot_found.push_back(i->second); } - return ret; + } + return ret; } int MDSMessageChannel::searchNodeForSnapshot(const std::string& snapshot_name, ChaosStringVector& node_found, - uint32_t millisec_to_wait) { - int err = ErrorCode::EC_NO_ERROR; - ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); - message->addStringValue("snapshot_name", snapshot_name); - - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", - "getNodesForSnapshot", - MOVE(message)); - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - DECODE_ERROR(request_future) - if((err = request_future->getError()) == ErrorCode::EC_NO_ERROR) { - - if(request_future->getResult() && - request_future->getResult()->hasKey("node_in_snapshot") && - request_future->getResult()->isVectorValue("node_in_snapshot")) { - //we have result - CMultiTypeDataArrayWrapperSPtr snapshot_desc_list = request_future->getResult()->getVectorValue("node_in_snapshot"); - for(int idx = 0; - idx < snapshot_desc_list->size(); - idx++) { - const std::string node_uid = snapshot_desc_list->getStringElementAtIndex(idx); - - node_found.push_back(node_uid); - } - } + uint32_t millisec_to_wait) { + int err = ErrorCode::EC_NO_ERROR; + ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); + message->addStringValue("snapshot_name", snapshot_name); + + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", + "getNodesForSnapshot", + MOVE(message)); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + DECODE_ERROR(request_future) + if ((err = request_future->getError()) == ErrorCode::EC_NO_ERROR) { + if (request_future->getResult() && + request_future->getResult()->hasKey("node_in_snapshot") && + request_future->getResult()->isVectorValue("node_in_snapshot")) { + // we have result + CMultiTypeDataArrayWrapperSPtr snapshot_desc_list = request_future->getResult()->getVectorValue("node_in_snapshot"); + for (int idx = 0; + idx < snapshot_desc_list->size(); + idx++) { + const std::string node_uid = snapshot_desc_list->getStringElementAtIndex(idx); + + node_found.push_back(node_uid); } - } else { - err = ErrorCode::EC_TIMEOUT; + } } - return err; + } else { + err = ErrorCode::EC_TIMEOUT; + } + return err; } int MDSMessageChannel::searchSnapshotForNode(const std::string& node_uid, ChaosStringVector& snapshot_found, - uint32_t millisec_to_wait){ - int err = ErrorCode::EC_NO_ERROR; - ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); - message->addStringValue(NodeDefinitionKey::NODE_UNIQUE_ID, node_uid); - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", - "getSnapshotForNode", - MOVE(message)); - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - DECODE_ERROR(request_future) - if((err = request_future->getError()) == ErrorCode::EC_NO_ERROR) { - - if(request_future->getResult() && - request_future->getResult()->hasKey("snapshot_for_node") && - request_future->getResult()->isVectorValue("snapshot_for_node")) { - //we have result - CMultiTypeDataArrayWrapperSPtr snapshot_desc_list = request_future->getResult()->getVectorValue("snapshot_for_node"); - for(int idx = 0; - idx < snapshot_desc_list->size(); - idx++) { - const std::string node_uid = snapshot_desc_list->getStringElementAtIndex(idx); - snapshot_found.push_back(node_uid); - } - } + uint32_t millisec_to_wait) { + int err = ErrorCode::EC_NO_ERROR; + ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); + message->addStringValue(NodeDefinitionKey::NODE_UNIQUE_ID, node_uid); + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", + "getSnapshotForNode", + MOVE(message)); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + DECODE_ERROR(request_future) + if ((err = request_future->getError()) == ErrorCode::EC_NO_ERROR) { + if (request_future->getResult() && + request_future->getResult()->hasKey("snapshot_for_node") && + request_future->getResult()->isVectorValue("snapshot_for_node")) { + // we have result + CMultiTypeDataArrayWrapperSPtr snapshot_desc_list = request_future->getResult()->getVectorValue("snapshot_for_node"); + for (int idx = 0; + idx < snapshot_desc_list->size(); + idx++) { + const std::string node_uid = snapshot_desc_list->getStringElementAtIndex(idx); + snapshot_found.push_back(node_uid); } - } else { - err = ErrorCode::EC_TIMEOUT; + } } - return err; + } else { + err = ErrorCode::EC_TIMEOUT; + } + return err; } -int MDSMessageChannel::setVariable(const std::string& variable_name, +int MDSMessageChannel::setVariable(const std::string& variable_name, chaos::common::data::CDataWrapper& variable_value, - uint32_t millisec_to_wait) { - int err = ErrorCode::EC_NO_ERROR; - ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); - message->addStringValue(VariableDefinitionKey::VARIABLE_NAME_KEY, - variable_name); - message->addCSDataValue(VariableDefinitionKey::VARIABLE_VALUE_KEY, - variable_value); - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", - "setVariable", - MOVE(message)); - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - DECODE_ERROR(request_future) - err = request_future->getError(); - } else { - err = ErrorCode::EC_TIMEOUT; - } - return err; -} - -int MDSMessageChannel::searchVariable(const std::string& variable_name,ChaosStringVector& variable_found, - uint32_t millisec_to_wait){ - int err = ErrorCode::EC_NO_ERROR; - ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); - message->addStringValue(VariableDefinitionKey::VARIABLE_NAME_KEY, - variable_name); - - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", - "searchVariable", - MOVE(message)); - request_future->setTimeout(millisec_to_wait); - - if(request_future->wait()) { - DECODE_ERROR(request_future) - if((err = request_future->getError()) == ErrorCode::EC_NO_ERROR) { - - if(request_future->getResult() && - request_future->getResult()->hasKey("varlist") && - request_future->getResult()->isVectorValue("varlist")) { - //we have result - CMultiTypeDataArrayWrapperSPtr snapshot_desc_list = request_future->getResult()->getVectorValue("varlist"); - for(int idx = 0; - idx < snapshot_desc_list->size(); - idx++) { - const std::string node_uid = snapshot_desc_list->getStringElementAtIndex(idx); - - variable_found.push_back(node_uid); - } - } + uint32_t millisec_to_wait) { + int err = ErrorCode::EC_NO_ERROR; + ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); + message->addStringValue(VariableDefinitionKey::VARIABLE_NAME_KEY, + variable_name); + message->addCSDataValue(VariableDefinitionKey::VARIABLE_VALUE_KEY, + variable_value); + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", + "setVariable", + MOVE(message)); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + DECODE_ERROR(request_future) + err = request_future->getError(); + } else { + err = ErrorCode::EC_TIMEOUT; + } + return err; +} + +int MDSMessageChannel::searchVariable(const std::string& variable_name, ChaosStringVector& variable_found, uint32_t millisec_to_wait) { + int err = ErrorCode::EC_NO_ERROR; + ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); + message->addStringValue(VariableDefinitionKey::VARIABLE_NAME_KEY, + variable_name); + + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", + "searchVariable", + MOVE(message)); + request_future->setTimeout(millisec_to_wait); + + if (request_future->wait()) { + DECODE_ERROR(request_future) + if ((err = request_future->getError()) == ErrorCode::EC_NO_ERROR) { + if (request_future->getResult() && + request_future->getResult()->hasKey("varlist") && + request_future->getResult()->isVectorValue("varlist")) { + // we have result + CMultiTypeDataArrayWrapperSPtr snapshot_desc_list = request_future->getResult()->getVectorValue("varlist"); + for (int idx = 0; + idx < snapshot_desc_list->size(); + idx++) { + const std::string node_uid = snapshot_desc_list->getStringElementAtIndex(idx); + + variable_found.push_back(node_uid); } - } else { - err = ErrorCode::EC_TIMEOUT; + } } - return err; + } else { + err = ErrorCode::EC_TIMEOUT; + } + return err; } -int MDSMessageChannel::getVariable(const std::string& variable_name, +int MDSMessageChannel::getVariable(const std::string& variable_name, chaos::common::data::CDWUniquePtr& variable_value, - uint32_t millisec_to_wait) { - int err = ErrorCode::EC_NO_ERROR; - ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); - message->addStringValue(VariableDefinitionKey::VARIABLE_NAME_KEY, - variable_name); - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", - "getVariable", - MOVE(message)); - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - DECODE_ERROR(request_future) - err = request_future->getError(); - variable_value = request_future->detachResult(); - } else { - err = ErrorCode::EC_TIMEOUT; - } - return err; + uint32_t millisec_to_wait) { + int err = ErrorCode::EC_NO_ERROR; + ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); + message->addStringValue(VariableDefinitionKey::VARIABLE_NAME_KEY, + variable_name); + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", + "getVariable", + MOVE(message)); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + DECODE_ERROR(request_future) + err = request_future->getError(); + variable_value = request_future->detachResult(); + } else { + err = ErrorCode::EC_TIMEOUT; + } + return err; } int MDSMessageChannel::removeVariable(const std::string& variable_name, - uint32_t millisec_to_wait) { - int err = ErrorCode::EC_NO_ERROR; - ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); - message->addStringValue(VariableDefinitionKey::VARIABLE_NAME_KEY, - variable_name); - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", - "removeVariable", - MOVE(message)); - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - DECODE_ERROR(request_future) - err = request_future->getError(); - } else { - err = ErrorCode::EC_TIMEOUT; + uint32_t millisec_to_wait) { + int err = ErrorCode::EC_NO_ERROR; + ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); + message->addStringValue(VariableDefinitionKey::VARIABLE_NAME_KEY, + variable_name); + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", + "removeVariable", + MOVE(message)); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + DECODE_ERROR(request_future) + err = request_future->getError(); + } else { + err = ErrorCode::EC_TIMEOUT; + } + return err; +} +int MDSMessageChannel::searchNode(const std::string& unique_id_filter, + chaos::NodeType::NodeSearchType node_type_filter, + bool alive_only, + unsigned int start_page, + unsigned int page_length, + unsigned int& num_of_page, + ChaosStringVector& node_found, + uint32_t millisec_to_wait, + const std::string& impl) { + uint64_t lastid = 0; + int ret; + num_of_page = 0; + ChaosStringVector tmp; + int size; + do { + size = tmp.size(); + ret = searchNodeInt(unique_id_filter, node_type_filter, alive_only, lastid, 100000 /*page_length*/, lastid, tmp, millisec_to_wait, impl); + MSG_DBG << "searchNode start page:" << start_page << " page len:" << page_length << " lastid:" << lastid << "size:" << tmp.size() << " sizebefore:" << size << " ret:" << ret; + + if (tmp.size() < page_length) { + break; } - return err; -} -int MDSMessageChannel::searchNode(const std::string& unique_id_filter, - chaos::NodeType::NodeSearchType node_type_filter, - bool alive_only, - unsigned int start_page, - unsigned int page_length, - unsigned int& num_of_page, - ChaosStringVector& node_found, - uint32_t millisec_to_wait, - const std::string& impl){ - uint64_t lastid=0; - int ret; - num_of_page=0; - ChaosStringVector tmp; - int size; - do{ - size=tmp.size(); - ret=searchNodeInt(unique_id_filter,node_type_filter,alive_only,lastid,100000/*page_length*/,lastid,tmp,millisec_to_wait,impl); - MSG_DBG<<"searchNode start page:"<<start_page<<" page len:"<<page_length<<" lastid:"<<lastid<<"size:"<<tmp.size()<<" sizebefore:"<<size<<" ret:"<<ret; - - if(tmp.size()<page_length){ - break; + } while ((size < tmp.size()) && (ret == ErrorCode::EC_NO_ERROR)); + num_of_page = (tmp.size()) ? (tmp.size() / page_length) + (((tmp.size() % page_length) == 0) ? 0 : 1) : 0; + for (int cnt = start_page * page_length; (cnt < tmp.size()) && (cnt < ((start_page + 1) * page_length)); cnt++) { + node_found.push_back(tmp[cnt]); + } + return ErrorCode::EC_NO_ERROR; +} + +int MDSMessageChannel::searchNodeInt(const std::string& unique_id_filter, + chaos::NodeType::NodeSearchType node_type_filter, + bool alive_only, + unsigned int last_node_sequence_id, + unsigned int page_length, + uint64_t& lastid, + + ChaosStringVector& node_found, + uint32_t millisec_to_wait, + const std::string& impl) { + int err = ErrorCode::EC_NO_ERROR; + ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); + message->addStringValue("unique_id_filter", unique_id_filter); + if (impl.size() > 0) + message->addStringValue("impl", impl); + + message->addInt32Value("node_type_filter", (int32_t)node_type_filter); + message->addInt32Value("last_node_sequence_id", last_node_sequence_id); + message->addBoolValue("alive_only", alive_only); + message->addInt32Value("result_page_length", page_length); + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("system", + "nodeSearch", + MOVE(message)); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + DECODE_ERROR(request_future) + if ((err = request_future->getError()) == ErrorCode::EC_NO_ERROR) { + if (request_future->getResult() && + request_future->getResult()->hasKey(chaos::NodeType::NODE_SEARCH_LIST_KEY) && + request_future->getResult()->isVectorValue(chaos::NodeType::NODE_SEARCH_LIST_KEY)) { + // we have result + CMultiTypeDataArrayWrapperSPtr snapshot_desc_list = request_future->getResult()->getVectorValue(chaos::NodeType::NODE_SEARCH_LIST_KEY); + for (int idx = 0; + idx < snapshot_desc_list->size(); + idx++) { + ChaosUniquePtr<chaos::common::data::CDataWrapper> element(snapshot_desc_list->getCDataWrapperElementAtIndex(idx)); + if (element.get() && + element->hasKey(NodeDefinitionKey::NODE_UNIQUE_ID)) { + node_found.push_back(element->getStringValue(NodeDefinitionKey::NODE_UNIQUE_ID)); + if (element->hasKey("seq")) { + lastid = element->getInt64Value("seq"); } - } while((size<tmp.size())&&(ret==ErrorCode::EC_NO_ERROR)); - num_of_page=(tmp.size())?(tmp.size()/page_length)+(((tmp.size()%page_length)==0)?0:1):0; - for(int cnt=start_page*page_length;(cnt<tmp.size())&&(cnt<((start_page+1)*page_length));cnt++){ - node_found.push_back(tmp[cnt]); + } } - return ErrorCode::EC_NO_ERROR; -} - -int MDSMessageChannel::searchNodeInt(const std::string& unique_id_filter, - chaos::NodeType::NodeSearchType node_type_filter, - bool alive_only, - unsigned int last_node_sequence_id, - unsigned int page_length, - uint64_t& lastid, - - ChaosStringVector& node_found, - uint32_t millisec_to_wait,const std::string& impl){ - int err = ErrorCode::EC_NO_ERROR; - ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); - message->addStringValue("unique_id_filter", unique_id_filter); - if(impl.size()>0) - message->addStringValue("impl", impl); - - message->addInt32Value("node_type_filter", (int32_t)node_type_filter); - message->addInt32Value("last_node_sequence_id", last_node_sequence_id); - message->addBoolValue("alive_only", alive_only); - message->addInt32Value("result_page_length", page_length); - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("system", - "nodeSearch", - MOVE(message)); - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - DECODE_ERROR(request_future) - if((err = request_future->getError()) == ErrorCode::EC_NO_ERROR) { - if(request_future->getResult() && - request_future->getResult()->hasKey(chaos::NodeType::NODE_SEARCH_LIST_KEY) && - request_future->getResult()->isVectorValue(chaos::NodeType::NODE_SEARCH_LIST_KEY)) { - //we have result - CMultiTypeDataArrayWrapperSPtr snapshot_desc_list = request_future->getResult()->getVectorValue(chaos::NodeType::NODE_SEARCH_LIST_KEY); - for(int idx = 0; - idx < snapshot_desc_list->size(); - idx++) { - ChaosUniquePtr<chaos::common::data::CDataWrapper> element(snapshot_desc_list->getCDataWrapperElementAtIndex(idx)); - if(element.get() && - element->hasKey(NodeDefinitionKey::NODE_UNIQUE_ID)) { - node_found.push_back(element->getStringValue(NodeDefinitionKey::NODE_UNIQUE_ID)); - if(element->hasKey("seq")){ - lastid=element->getInt64Value("seq"); - } - } - } - } - } - } else { - err = ErrorCode::EC_TIMEOUT; + } } - return err; - } -int MDSMessageChannel::searchNode(const std::string& unique_id_filter, + } else { + err = ErrorCode::EC_TIMEOUT; + } + return err; +} +int MDSMessageChannel::searchNode(const std::string& unique_id_filter, chaos::NodeType::NodeSearchType node_type_filter, - bool alive_only, - unsigned int last_node_sequence_id, - unsigned int page_length, - ChaosStringVector& node_found, - uint32_t millisec_to_wait,const std::string&impl) { - - uint64_t lastid=0; + bool alive_only, + unsigned int last_node_sequence_id, + unsigned int page_length, + ChaosStringVector& node_found, + uint32_t millisec_to_wait, + const std::string& impl) { + uint64_t lastid = 0; -return searchNodeInt(unique_id_filter,node_type_filter,alive_only,last_node_sequence_id,page_length,lastid,node_found,millisec_to_wait,impl); - + return searchNodeInt(unique_id_filter, node_type_filter, alive_only, last_node_sequence_id, page_length, lastid, node_found, millisec_to_wait, impl); } -ChaosUniquePtr<MultiAddressMessageRequestFuture> MDSMessageChannel::sendRequestWithFuture(const std::string& action_domain, - const std::string& action_name, +ChaosUniquePtr<MultiAddressMessageRequestFuture> MDSMessageChannel::sendRequestWithFuture(const std::string& action_domain, + const std::string& action_name, chaos::common::data::CDWUniquePtr request_pack, - int32_t request_timeout) { - return MultiAddressMessageChannel::sendRequestWithFuture(action_domain, - action_name, - MOVE(request_pack), - request_timeout); + int32_t request_timeout) { + return MultiAddressMessageChannel::sendRequestWithFuture(action_domain, + action_name, + MOVE(request_pack), + request_timeout); } -void MDSMessageChannel::sendMessage(const std::string& action_domain, - const std::string& action_name, +void MDSMessageChannel::sendMessage(const std::string& action_domain, + const std::string& action_name, chaos::common::data::CDWUniquePtr request_pack) { - return MultiAddressMessageChannel::sendMessage(action_domain, - action_name, - MOVE(request_pack)); + return MultiAddressMessageChannel::sendMessage(action_domain, + action_name, + MOVE(request_pack)); } int MDSMessageChannel::deleteDataCloud(const std::string& key, - uint64_t start_ts, - uint64_t end_ts,int32_t millisec_to_wait){ - int err = ErrorCode::EC_NO_ERROR; - ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); - message->addStringValue("key", key); - message->addInt64Value("start_ts", start_ts); - message->addInt64Value("end_ts", end_ts); - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", - "deleteDataCloud", - MOVE(message)); - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - DECODE_ERROR(request_future); - err = request_future->getError(); - if((err = request_future->getError()) == ErrorCode::EC_NO_ERROR) { - if(request_future->getResult()->hasKey("error")){ - err=request_future->getResult()->getInt32Value("error"); - } - } + uint64_t start_ts, + uint64_t end_ts, + int32_t millisec_to_wait) { + int err = ErrorCode::EC_NO_ERROR; + ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); + message->addStringValue("key", key); + message->addInt64Value("start_ts", start_ts); + message->addInt64Value("end_ts", end_ts); + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", + "deleteDataCloud", + MOVE(message)); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + DECODE_ERROR(request_future); + err = request_future->getError(); + if ((err = request_future->getError()) == ErrorCode::EC_NO_ERROR) { + if (request_future->getResult()->hasKey("error")) { + err = request_future->getResult()->getInt32Value("error"); + } + } - } else { - err= ErrorCode::EC_TIMEOUT; + } else { + err = ErrorCode::EC_TIMEOUT; + } + return err; +} + +CDWUniquePtr MDSMessageChannel::retrieveData(const std::string& key, int32_t millisec_to_wait) { + ChaosStringVector v; + CDWUniquePtr ret; + chaos::common::data::VectorCDWShrdPtr result; + v.push_back(key); + int rett = retriveMultipleData(v, result); + if (rett == 0) { + if (result.size()) { + chaos::common::data::CDWShrdPtr p = result[0]; + if (p.get()) { + ret = p->clone(); + } } - return err; -} -int MDSMessageChannel::queryDataCloud(const std::string& key, - const ChaosStringSet& meta_tags, - const ChaosStringSet& projection_keys, - const uint64_t start_ts, - const uint64_t end_ts, - const uint32_t page_dimension, - chaos::common::direct_io::channel::opcode_headers::SearchSequence& last_sequence, - chaos::common::data::VectorCDWShrdPtr& found_element_page, - bool only_index,int32_t millisec_to_wait){ - int err = ErrorCode::EC_NO_ERROR; - ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); - message->addStringValue(NodeDefinitionKey::NODE_UNIQUE_ID, key); - { - std::vector<std::string> v,pv; - for(ChaosStringSet::iterator i=meta_tags.begin();i!=meta_tags.end();i++){ - v.push_back(*i); + } + return ret; +} + +int MDSMessageChannel::retriveMultipleData(const ChaosStringVector& keys, chaos::common::data::VectorCDWShrdPtr& result, int32_t millisec_to_wait) { + int err = -1; + ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); + for (ChaosStringVector::const_iterator i = keys.begin(); i != keys.end(); i++) { + message->appendStringToArray(*i); + } + message->finalizeArrayForKey("nodes"); + + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", + "retriveMultipleData", + MOVE(message)); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + DECODE_ERROR(request_future); + if ((err = request_future->getError()) == ErrorCode::EC_NO_ERROR) { + if (request_future->getResult()) { + chaos::common::data::CDataWrapper* res = request_future->getResult(); + MSG_DBG << "DATA:" << res->getJSONString(); + if (res->hasKey("data") && res->isVectorValue("data")) { + CMultiTypeDataArrayWrapperSPtr d = res->getVectorValue("data"); + for (int idx = 0; + idx < d->size(); + idx++) { + chaos::common::data::CDWUniquePtr element(d->getCDataWrapperElementAtIndex(idx)); + if (element.get()) { + ChaosSharedPtr<chaos::common::data::CDataWrapper> sh(element.release()); + result.push_back(sh); + } + } } - for(ChaosStringSet::iterator i=projection_keys.begin();i!=projection_keys.end();i++){ - pv.push_back(*i); + } + } + } else { + err = ErrorCode::EC_TIMEOUT; + } + return err; +} +int MDSMessageChannel::queryDataCloud(const std::string& key, + const ChaosStringSet& meta_tags, + const ChaosStringSet& projection_keys, + const uint64_t start_ts, + const uint64_t end_ts, + const uint32_t page_dimension, + chaos::common::direct_io::channel::opcode_headers::SearchSequence& last_sequence, + chaos::common::data::VectorCDWShrdPtr& found_element_page, + bool only_index, + int32_t millisec_to_wait) { + int err = ErrorCode::EC_NO_ERROR; + ChaosUniquePtr<chaos::common::data::CDataWrapper> message(new CDataWrapper()); + message->addStringValue(NodeDefinitionKey::NODE_UNIQUE_ID, key); + { + std::vector<std::string> v, pv; + for (ChaosStringSet::iterator i = meta_tags.begin(); i != meta_tags.end(); i++) { + v.push_back(*i); + } + for (ChaosStringSet::iterator i = projection_keys.begin(); i != projection_keys.end(); i++) { + pv.push_back(*i); + } + if (v.size()) { + message->append("tags", v); + } + if (pv.size()) { + message->append("prj", pv); + } + } + + message->addInt64Value("start_ts", start_ts); + message->addInt64Value("end_ts", end_ts); + message->addInt32Value("page", page_dimension); + message->addInt64Value("runid", last_sequence.run_id); + message->addInt64Value("seq", last_sequence.datapack_counter); + message->addInt64Value("ts", last_sequence.ts); + + ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", + "queryDataCloud", + MOVE(message)); + request_future->setTimeout(millisec_to_wait); + if (request_future->wait()) { + DECODE_ERROR(request_future); + if ((err = request_future->getError()) == ErrorCode::EC_NO_ERROR) { + if (request_future->getResult()) { + chaos::common::data::CDataWrapper* res = request_future->getResult(); + if (res->hasKey("runid")) { + last_sequence.run_id = res->getInt64Value("runid"); } - if(v.size()){ - message->append("tags", v); - + if (res->hasKey("seq")) { + last_sequence.datapack_counter = res->getInt64Value("seq"); } - if(pv.size()){ - message->append("prj", pv); - + if (res->hasKey("ts")) { + last_sequence.ts = res->getInt64Value("ts"); } - } - - message->addInt64Value("start_ts", start_ts); - message->addInt64Value("end_ts", end_ts); - message->addInt32Value("page", page_dimension); - message->addInt64Value("runid", last_sequence.run_id); - message->addInt64Value("seq", last_sequence.datapack_counter); - message->addInt64Value("ts", last_sequence.ts); - - ChaosUniquePtr<MultiAddressMessageRequestFuture> request_future = sendRequestWithFuture("service", - "queryDataCloud", - MOVE(message)); - request_future->setTimeout(millisec_to_wait); - if(request_future->wait()) { - DECODE_ERROR(request_future); - if((err = request_future->getError()) == ErrorCode::EC_NO_ERROR) { - if(request_future->getResult()){ - chaos::common::data::CDataWrapper *res=request_future->getResult(); - if(res->hasKey("runid")){ - last_sequence.run_id=res->getInt64Value("runid"); - } - if(res->hasKey("seq")){ - last_sequence.datapack_counter=res->getInt64Value("seq"); - } - if(res->hasKey("ts")){ - last_sequence.ts=res->getInt64Value("ts"); - } - MSG_DBG<<"DATA:"<<res->getJSONString(); - if(res->hasKey("data")&&res->isVectorValue("data")){ - CMultiTypeDataArrayWrapperSPtr d = res->getVectorValue("data"); - for(int idx = 0; - idx < d->size(); - idx++) { - chaos::common::data::CDWUniquePtr element(d->getCDataWrapperElementAtIndex(idx)); - if(element.get()){ - ChaosSharedPtr<chaos::common::data::CDataWrapper> sh(element.release()); - found_element_page.push_back(sh); - } - - } - } + MSG_DBG << "DATA:" << res->getJSONString(); + if (res->hasKey("data") && res->isVectorValue("data")) { + CMultiTypeDataArrayWrapperSPtr d = res->getVectorValue("data"); + for (int idx = 0; + idx < d->size(); + idx++) { + chaos::common::data::CDWUniquePtr element(d->getCDataWrapperElementAtIndex(idx)); + if (element.get()) { + ChaosSharedPtr<chaos::common::data::CDataWrapper> sh(element.release()); + found_element_page.push_back(sh); } + } } - } else { - err = ErrorCode::EC_TIMEOUT; + } } - return err; - + } else { + err = ErrorCode::EC_TIMEOUT; + } + return err; } void MDSMessageChannel::callMethod(const std::string& action_domain, const std::string& action_name) { - return MultiAddressMessageChannel::sendMessage(action_domain, - action_name); + return MultiAddressMessageChannel::sendMessage(action_domain, + action_name); } diff --git a/chaos/common/message/MDSMessageChannel.h b/chaos/common/message/MDSMessageChannel.h index 12f2491bb..e3b6277e8 100644 --- a/chaos/common/message/MDSMessageChannel.h +++ b/chaos/common/message/MDSMessageChannel.h @@ -378,6 +378,14 @@ namespace chaos { int deleteDataCloud(const std::string& key, uint64_t start_ts, uint64_t end_ts,int32_t millisec_to_wait=10000); + /*! + * This method retrive the cached object by CSDawrapperUsed as query key and + * return a pointer to the class ArrayPointer of CDataWrapper type + */ + chaos::common::data::CDWUniquePtr retrieveData(const std::string& key,int32_t millisec_to_wait=10000); + + int retriveMultipleData(const ChaosStringVector& key, + chaos::common::data::VectorCDWShrdPtr& result,int32_t millisec_to_wait=10000); }; } diff --git a/chaos/cu_toolkit/data_manager/KeyDataStorage.cpp b/chaos/cu_toolkit/data_manager/KeyDataStorage.cpp index 2bdf5f047..0e395495c 100644 --- a/chaos/cu_toolkit/data_manager/KeyDataStorage.cpp +++ b/chaos/cu_toolkit/data_manager/KeyDataStorage.cpp @@ -282,7 +282,28 @@ int KeyDataStorage::loadRestorePoint(const std::string& restore_point_tag) { //allocate map for the restore tag restore_point_map.insert(make_pair(restore_point_tag, std::map<std::string, ChaosSharedPtr<chaos_data::CDataWrapper> >())); } - + if((err = io_data_driver->loadDatasetFromSnapshot(restore_point_tag, + key, + dataset))) { + KeyDataStorageLERR << " Error loaading dataset of domain from restore point:" << restore_point_tag << " for the key:" << key; + clearRestorePoint(restore_point_tag); + return err; + } + if(dataset.get()){ + if(dataset->hasKey(DataPackID::INPUT_DATASET_ID)&&dataset->isCDataWrapperValue(DataPackID::INPUT_DATASET_ID)){ + restore_point_map[restore_point_tag].insert(make_pair(input_key, MOVE(dataset->getCSDataValue(DataPackID::INPUT_DATASET_ID)))); + } + if(dataset->hasKey(DataPackID::OUTPUT_DATASET_ID)&&dataset->isCDataWrapperValue(DataPackID::OUTPUT_DATASET_ID)){ + restore_point_map[restore_point_tag].insert(make_pair(output_key, MOVE(dataset->getCSDataValue(DataPackID::OUTPUT_DATASET_ID)))); + } + if(dataset->hasKey(DataPackID::SYSTEM_DATASETID)&&dataset->isCDataWrapperValue(DataPackID::SYSTEM_DATASETID)){ + restore_point_map[restore_point_tag].insert(make_pair(system_key, MOVE(dataset->getCSDataValue(DataPackID::SYSTEM_DATASETID)))); + } + if(dataset->hasKey(DataPackID::CUSTOM_DATASET_ID)&&dataset->isCDataWrapperValue(DataPackID::CUSTOM_DATASET_ID)){ + restore_point_map[restore_point_tag].insert(make_pair(custom_key, MOVE(dataset->getCSDataValue(DataPackID::CUSTOM_DATASET_ID)))); + } + } + #if 0 if((err = io_data_driver->loadDatasetTypeFromSnapshotTag(restore_point_tag, key, KeyDataStorageDomainOutput, @@ -292,7 +313,8 @@ int KeyDataStorage::loadRestorePoint(const std::string& restore_point_tag) { return err; } else { if(dataset){ - restore_point_map[restore_point_tag].insert(make_pair(output_key, MOVE(dataset)));dataset.reset(); + restore_point_map[restore_point_tag].insert(make_pair(output_key, MOVE(dataset))); + dataset.reset(); } } @@ -335,7 +357,7 @@ int KeyDataStorage::loadRestorePoint(const std::string& restore_point_tag) { restore_point_map[restore_point_tag].insert(make_pair(system_key, MOVE(dataset)));dataset.reset(); } } - + #endif return err; } @@ -438,11 +460,10 @@ int KeyDataStorage::performLiveFetch(const KeyDataStorageDomain dataset_domain, int err = 0; size_t size; std::string node_dataset = getDomainString(dataset_domain); - char * raw_data = io_data_driver->retriveRawData(node_dataset, &size); - if(raw_data) { - found_dataset.reset(new CDataWrapper(raw_data)); + CDWUniquePtr raw_data = io_data_driver->retrieveData(node_dataset); + if(raw_data.get()) { + found_dataset.reset(raw_data.release()); } - delete[](raw_data); return err; } diff --git a/chaos_metadata_service_client/monitor_system/QuantumSlotScheduler.cpp b/chaos_metadata_service_client/monitor_system/QuantumSlotScheduler.cpp index 95636ef47..8ce2c84ac 100644 --- a/chaos_metadata_service_client/monitor_system/QuantumSlotScheduler.cpp +++ b/chaos_metadata_service_client/monitor_system/QuantumSlotScheduler.cpp @@ -263,7 +263,7 @@ void QuantumSlotScheduler::addNewfetcherThread() { void QuantumSlotScheduler::dispath_new_value_async(const boost::system::error_code& error, QuantumSlot *cur_slot, - char *data_found) { + CDWUniquePtr& data_found) { std::string quantum_slot_key = CHAOS_QSS_COMPOSE_QUANTUM_SLOT_KEY(cur_slot->key, cur_slot->quantum_multiplier); boost::unique_lock<boost::mutex> lock_on_condition(mutex_condition_scan); @@ -272,8 +272,8 @@ void QuantumSlotScheduler::dispath_new_value_async(const boost::system::error_co //slot has handler so we need to broadcast data to it in this case we unlock to permit other handler to be inserted lock_on_condition.unlock(); try{ - if(data_found) { - cur_slot->sendNewValueConsumer(KeyValue(new CDataWrapper(data_found))); + if(data_found.get()) { + cur_slot->sendNewValueConsumer(KeyValue(data_found.get())); } else { cur_slot->sendNoValueToConsumer(); } @@ -297,7 +297,6 @@ void QuantumSlotScheduler::dispath_new_value_async(const boost::system::error_co //delete the slot set_slots_index_key_slot.erase(it); } - delete[](data_found); } void QuantumSlotScheduler::fetchValue(ChaosSharedPtr<IODataDriver> data_driver) { @@ -308,8 +307,7 @@ void QuantumSlotScheduler::fetchValue(ChaosSharedPtr<IODataDriver> data_driver) while(work_on_fetch) { if(queue_active_slot.pop(cur_slot)) { //we have slot available - size_t data_found_size; - char * data_found = data_driver->retriveRawData(cur_slot->key, &data_found_size); + CDWUniquePtr data_found = data_driver->retrieveData(cur_slot->key); //dispatch data dispath_new_value_async(error, diff --git a/chaos_metadata_service_client/monitor_system/QuantumSlotScheduler.h b/chaos_metadata_service_client/monitor_system/QuantumSlotScheduler.h index 67b7736df..cd42388db 100644 --- a/chaos_metadata_service_client/monitor_system/QuantumSlotScheduler.h +++ b/chaos_metadata_service_client/monitor_system/QuantumSlotScheduler.h @@ -232,7 +232,7 @@ mutex_ ## queue_name.unlock(); void dispath_new_value_async(const boost::system::error_code& error, QuantumSlot *cur_slot, - char *data_found); + chaos::common::data::CDWUniquePtr& data_found); //! manage the registration in internal layer for new consumer void _addKeyConsumer(SlotConsumerInfo *ci); //!manage in the internal layer the request for remove the consumer diff --git a/chaos_metadata_service_client/node_controller/CUController.cpp b/chaos_metadata_service_client/node_controller/CUController.cpp index a9bde4d5f..d218fa415 100644 --- a/chaos_metadata_service_client/node_controller/CUController.cpp +++ b/chaos_metadata_service_client/node_controller/CUController.cpp @@ -979,11 +979,9 @@ int CUController::fetchAllDataset() { ChaosSharedPtr<chaos::common::data::CDataWrapper> CUController::fetchCurrentDatatasetFromDomain(DatasetDomain domain) { // ChaosReadLock lock(trackMutext); size_t value_len = 0; - char *value = ioLiveDataDriver->retriveRawData(channel_keys[domain],(size_t*)&value_len); - if(value){ - chaos::common::data::CDataWrapper *tmp = new CDataWrapper(value); - current_dataset[domain].reset(tmp); - delete [] value; + CDWUniquePtr value = ioLiveDataDriver->retrieveData(channel_keys[domain]); + if(value.get()){ + current_dataset[domain].reset(value.release()); return current_dataset[domain]; } return current_dataset[domain]; diff --git a/chaos_service_common/ChaosManager.cpp b/chaos_service_common/ChaosManager.cpp index 03f12260f..d58e4ca48 100644 --- a/chaos_service_common/ChaosManager.cpp +++ b/chaos_service_common/ChaosManager.cpp @@ -96,9 +96,14 @@ CDWShrdPtr ChaosManager::getLiveChannel(const std::string& key) { ChaosSharedPtr<chaos::common::data::CDataWrapper> ret; if (cache_driver) { ret=cache_driver->getData(key); - context->updateLiveCache(ret.get()); - return ret; + } else { + chaos::common::message::MDSMessageChannel* mdsChannel = chaos::common::network::NetworkBroker::getInstance()->getMetadataserverMessageChannel(); + ret= mdsChannel->retrieveData(key); } + if(ret.get()){ + context->updateLiveCache(ret.get()); + } + return ret; } CDWShrdPtr ChaosManager::getLiveChannel(const std::string& key, int domain) { @@ -110,6 +115,10 @@ CDWShrdPtr ChaosManager::getLiveChannel(const std::string& key, int domain) { ret=cache_driver->getData(key); context->updateLiveCache(ret.get()); return ret; + } else { + chaos::common::message::MDSMessageChannel* mdsChannel = chaos::common::network::NetworkBroker::getInstance()->getMetadataserverMessageChannel(); + return mdsChannel->retrieveData(key); + } return ret; } @@ -354,10 +363,14 @@ chaos::common::data::VectorCDWShrdPtr ChaosManager::getLiveChannel(const std::ve chaos::common::data::VectorCDWShrdPtr results; if (cache_driver) { results = cache_driver->getData(channels); - for(int cnt=0;cnt<results.size();cnt++){ + + } else { + chaos::common::message::MDSMessageChannel* mdsChannel = chaos::common::network::NetworkBroker::getInstance()->getMetadataserverMessageChannel(); + mdsChannel->retriveMultipleData(channels,results); + + } + for(int cnt=0;cnt<results.size();cnt++){ context->updateLiveCache(results[cnt].get()); - } - return results; } return results; } -- GitLab