diff --git a/ChaosMetadataService/object_storage/influxDB/InfluxDB.cpp b/ChaosMetadataService/object_storage/influxDB/InfluxDB.cpp index 2034bca514931d95e7b206896b83d2026e5e4df9..e0a5843540c85eb6eef7c15e1adf45cfbb5ba664 100644 --- a/ChaosMetadataService/object_storage/influxDB/InfluxDB.cpp +++ b/ChaosMetadataService/object_storage/influxDB/InfluxDB.cpp @@ -401,9 +401,9 @@ int InfluxDB::findObject(const std::string& chaos::common::direct_io::channel::opcode_headers::SearchSequence& last_record_found_seq) { int err = 0; - uint64_t seqid = last_record_found_seq.datapack_counter; - uint64_t runid = last_record_found_seq.run_id; - + uint64_t last_ts=0; + last_record_found_seq.run_id=0; + std::stringstream ss; ss<<"SELECT "; if(projection_keys.size()==0){ @@ -459,10 +459,13 @@ int InfluxDB::findObject(const std::string& if(data.hasKey("results")&& data.isVectorValue("results")){ chaos::common::data::CMultiTypeDataArrayWrapperSPtr results=data.getVectorValue("results"); - chaos::common::data::CDWUniquePtr serie=results->getCDataWrapperElementAtIndex(0); + for(int cnt_res=0;cnt_res<results->size();cnt_res++){ + + chaos::common::data::CDWUniquePtr serie=results->getCDataWrapperElementAtIndex(cnt_res); if(serie.get()&&serie->hasKey("series")&&serie->isVectorValue("series")){ chaos::common::data::CMultiTypeDataArrayWrapperSPtr cud=serie->getVectorValue("series"); - chaos::common::data::CDWUniquePtr cu=cud->getCDataWrapperElementAtIndex(0); + for(int cnt_series=0;cnt_series<cud->size();cnt_series++){ + chaos::common::data::CDWUniquePtr cu=cud->getCDataWrapperElementAtIndex(cnt_series); if(cu.get()&&cu->hasKey("name")){ std::string name=cu->getStringValue("name"); std::vector<std::string> cols; @@ -481,18 +484,23 @@ int InfluxDB::findObject(const std::string& int64_t ts=chaos::common::utility::TimingUtil::getTimestampFromString(val->getStringElementAtIndex(cntt),"%Y-%m-%dT%H:%M:%S%fZ"); dd->append(chaos::DataPackCommonKey::DPCK_DEVICE_ID,key); dd->append(chaos::DataPackCommonKey::DPCK_TIMESTAMP,ts); + if(ts>last_ts) last_ts=ts; } dd->append(cols[cntt],val->getBSONElementAtIndex(cntt)); } - // DBG<<cnt<<"] "<<dd->getCompliantJSONString(); + // DBG<<cnt<<"] "<<dd->getCompliantJSONString(); found_object_page.push_back(dd); + last_record_found_seq.datapack_counter++; + last_record_found_seq.ts=last_ts; } } } } + } + } } } @@ -567,7 +575,46 @@ int InfluxDB::countObject(const std::string& key, const uint64_t timestamp_from, const uint64_t timestamp_to, uint64_t& object_count) { - return 0; + + std::stringstream ss; + ss<<"SELECT COUNT(*) FROM \""<<key<<"\" WHERE time>="<<timestamp_from*1000000<<" AND time<"<<timestamp_to*1000000; + std::string resp; + int ret=influxdb_cpp::query(resp,ss.str(),si); + if(ret==0){ + // DBG<<"COUNT:"<<resp; + + chaos::common::data::CDataWrapper data; + data.setSerializedJsonData(resp.c_str()); + //DBG<<data.getJSONString(); + + if(data.hasKey("results")&& data.isVectorValue("results")){ + chaos::common::data::CMultiTypeDataArrayWrapperSPtr results=data.getVectorValue("results"); + for(int cnt_res=0;cnt_res<results->size();cnt_res++){ + + chaos::common::data::CDWUniquePtr serie=results->getCDataWrapperElementAtIndex(cnt_res); + if(serie.get()&&serie->hasKey("series")&&serie->isVectorValue("series")){ + chaos::common::data::CMultiTypeDataArrayWrapperSPtr cud=serie->getVectorValue("series"); + for(int cnt_series=0;cnt_series<cud->size();cnt_series++){ + chaos::common::data::CDWUniquePtr cu=cud->getCDataWrapperElementAtIndex(cnt_series); + if(cu->hasKey("values")&&cu->isVectorValue("values")){ + chaos::common::data::CMultiTypeDataArrayWrapperSPtr vals=cu->getVectorValue("values"); + object_count=0; + + + + + if(vals->size()>0){ + chaos::common::data::CMultiTypeDataArrayWrapperSPtr val=vals->getVectorElementAtIndex(0); + if(val->size()>1){ + // DBG<<"size: "<<vals->size(); + object_count=val->getInt32ElementAtIndex(val->size()-1); + + } + // 0 is time + } + }}}}} + } + return ret; } } // namespace object_storage diff --git a/chaos_service_common/ChaosManager.cpp b/chaos_service_common/ChaosManager.cpp index 3dc239cda59b1a670150ee18b00c7f8dbfd1a591..5e7beb4f267ac4425f42ad4d065e433cd2f98c08 100644 --- a/chaos_service_common/ChaosManager.cpp +++ b/chaos_service_common/ChaosManager.cpp @@ -7,7 +7,6 @@ #include "ChaosManager.h" #include <chaos/common/batch_command/BatchCommandConstants.h> #include <ChaosMetadataService/object_storage/abstraction/ObjectStorageDataAccess.h> - #include <ChaosMetadataService/ChaosMetadataService.h> #include <ChaosMetadataService/api/node/ClearCommandQueue.h> #include <ChaosMetadataService/api/node/CommandTemplateSubmit.h> @@ -229,20 +228,19 @@ int ChaosManager::init(const chaos::common::data::CDataWrapper& best_available_d InizializableService::initImplementation(DriverPoolManager::getInstance(), NULL, "DriverPoolManager", __PRETTY_FUNCTION__); } catch (...) { - DBGETERR << "Error Initializing alla drivers"; + DBGETERR << "Error Initializing all drivers"; } cache_driver = DriverPoolManager::getInstance()->getCacheDrvPtr(); if (cache_driver == NULL) { DBGETERR << "Cannot use direct cache"; - return -1; } else { DBGET << "Using direct cache"; } persistence_driver = DriverPoolManager::getInstance()->getPersistenceDrvPtr(); if (persistence_driver == NULL) { DBGETERR << "Cannot use direct persistence"; - return -2; + //return -2; } else { DBGET << "Using direct persistence"; @@ -310,7 +308,6 @@ int ChaosManager::queryTS(const std::string& key, return -4; } chaos::common::direct_io::channel::opcode_headers::SearchSequence last_sequence; - chaos::common::direct_io::channel::opcode_headers::QueryResultPage found_element_page; return obj_storage_da->findObject(key, meta_tags, @@ -318,11 +315,33 @@ int ChaosManager::queryTS(const std::string& key, start_ts, end_ts, page, - found_element_page, + elements, last_sequence); } return -2; } +int ChaosManager::queryTSCount(const std::string& key,const uint64_t start_ts,const uint64_t end_ts,const ChaosStringSet& tags,const ChaosStringSet&vars){ +if (log_driver) { + chaos::metadata_service::object_storage::abstraction::ObjectStorageDataAccess* obj_storage_da = DriverPoolManager::getInstance()->getLogDrv().getDataAccess<chaos::metadata_service::object_storage::abstraction::ObjectStorageDataAccess>(); + CHAOS_ASSERT(obj_storage_da); + if (obj_storage_da == NULL) { + DBGETERR << "Cannot retrieve log driver"; + return -4; + } + DBGET << "query Count of "<<key <<" from "<<start_ts<<" to "<<end_ts; + + + uint64_t count_obj=0; + obj_storage_da->countObject(key, + start_ts, + end_ts,count_obj); + return count_obj; + } + DBGETERR << "log driver not available "<<key <<" from "<<start_ts<<" to "<<end_ts; + + return -2; +} + int ChaosManager::queryDataCloud(const std::string& key, const ChaosStringSet& meta_tags, const ChaosStringSet& projection_keys, @@ -741,6 +760,7 @@ chaos::common::data::CDWUniquePtr ChaosManager::restoreSnapshot(const std::strin chaos::common::data::CDWUniquePtr ChaosManager::commandTemplateSubmit(const std::string& uid, const std::string& command_alias, const chaos::common::data::CDWUniquePtr& slow_command_data, const SubmissionRuleType::SubmissionRule submission_rule, const uint32_t priority, const uint64_t scheduler_steps_delay, const uint32_t submission_checker_steps_delay) { CDWUniquePtr res; + CALC_EXEC_START; CDWUniquePtr message(new CDataWrapper()); // this key need only to inform mds to redirect to node the slowcomand without porcess it @@ -757,12 +777,14 @@ chaos::common::data::CDWUniquePtr ChaosManager::commandTemplateSubmit(const std: message->appendAllElement(*slow_command_data); } if (persistence_driver) { - CALC_EXEC_START; CommandTemplateSubmit node; res = node.execute(MOVE(message)); - CALC_EXEC_END - } + } + + + CALC_EXEC_END + return res; } diff --git a/chaos_service_common/ChaosManager.h b/chaos_service_common/ChaosManager.h index 6c80473cc6ff920ff5bec6e19023e865ab007424..f8f7bb9cf88ee3818010345e1bdc0660c146f8f2 100644 --- a/chaos_service_common/ChaosManager.h +++ b/chaos_service_common/ChaosManager.h @@ -16,7 +16,7 @@ #include <chaos/common/io/IODataDriver.h> #include <chaos/common/batch_command/BatchCommandTypes.h> #include <chaos/common/property/property.h> - +#define MANAGER_NO_DIRECT_ACCESS -1000 namespace chaos { namespace metadata_service{ class ChaosMetadataService; @@ -156,6 +156,8 @@ int queryTS(const std::string& key, const uint64_t end_ts, const uint32_t page_dimension, chaos::common::data::VectorCDWShrdPtr& found_element_page); +int queryTSCount(const std::string& key,const uint64_t start_ts,const uint64_t end_ts,const ChaosStringSet& tags=ChaosStringSet(),const ChaosStringSet&vars=ChaosStringSet()); + int deleteDataCloud(const std::string& key, const uint64_t start_ts, const uint64_t end_ts,int32_t millisec_to_wait=10000); diff --git a/chaos_service_common/DriverPoolManager.cpp b/chaos_service_common/DriverPoolManager.cpp index d428932a8fa671d7252887c6e93df40acf658931..eb7712e6f0ea7f3c766f67ab979192048b3ccdb5 100644 --- a/chaos_service_common/DriverPoolManager.cpp +++ b/chaos_service_common/DriverPoolManager.cpp @@ -49,6 +49,7 @@ DriverPoolManager::~DriverPoolManager() {} void DriverPoolManager::init(void* init_data) { //init cache pool + int err=0; //InizializableService::initImplementation(cache_pool, NULL, "CacheDriverPool", __PRETTY_FUNCTION__); const std::string cache_impl_name = cacheSetting.cache_driver_impl + "CacheDriver"; if (cacheSetting.cache_driver_impl.size()) { @@ -64,11 +65,16 @@ void DriverPoolManager::init(void* init_data) { cache_driver.init((void*)&cacheSetting, __PRETTY_FUNCTION__); } catch (CException& e) { cache_driver.reset(NULL,cache_impl_name); - throw e; + err++; + DECODE_CHAOS_EXCEPTION(e); + + //throw e; } catch (...) { DP_LOG_ERR << " Undefined exception catchd during initialization of cache driver"; cache_driver.reset(NULL,cache_impl_name); + err++; + } } //init dirver instace @@ -81,11 +87,15 @@ void DriverPoolManager::init(void* init_data) { persistence_driver.init((void*)&persistentSetting, __PRETTY_FUNCTION__); } catch (CException& e) { persistence_driver.reset(NULL,persistence_impl_name); - throw e; + err++; + DECODE_CHAOS_EXCEPTION(e); + + // throw e; } catch (...) { DP_LOG_ERR << " Undefined exception catchd during initialization of persistent driver"; persistence_driver.reset(NULL,persistence_impl_name); + err++; } } const std::string storage_impl_name = objectSetting.persistence_implementation + "ObjectStorageDriver"; @@ -97,10 +107,12 @@ void DriverPoolManager::init(void* init_data) { storage_driver.init((void*)&objectSetting, __PRETTY_FUNCTION__); } catch (CException& e) { storage_driver.reset(NULL,storage_impl_name); - throw e; + //throw e; + err++; + DECODE_CHAOS_EXCEPTION(e); } catch (...) { DP_LOG_ERR << " Undefined exception catchd during initialization of storage driver"; - + err++; storage_driver.reset(NULL,storage_impl_name); } } @@ -109,20 +121,22 @@ void DriverPoolManager::init(void* init_data) { log_driver.reset(ObjectFactoryRegister<chaos::service_common::persistence::data_access::AbstractPersistenceDriver>::getInstance()->getNewInstanceByName(log_impl_name), log_impl_name); if (log_driver.get() == NULL) { - DP_LOG_INFO << " Log driver not defined"; + DP_LOG_ERR << " Log driver not defined "; } else { try { log_driver.init((void*)&logSetting, __PRETTY_FUNCTION__); } catch (CException& ex) { log_driver.reset(NULL,log_impl_name); DECODE_CHAOS_EXCEPTION(ex) - } catch (...) { DP_LOG_ERR << " Undefined exception catchd during initialization of LOG driver"; log_driver.reset(NULL,log_impl_name); } } } + if(err){ + throw chaos::CException(err,"Drivers cannot be initialized",__PRETTY_FUNCTION__); + } } void DriverPoolManager::deinit() {