From 917ea4a25f0f09e5a79dc208f9051a344395ac5b Mon Sep 17 00:00:00 2001
From: amichelo <andrea.michelotti@lnf.infn.it>
Date: Tue, 18 Apr 2023 18:38:13 +0200
Subject: [PATCH] inserted TDB in root, Chaosanager continue scanning direct
 drivers

---
 .../object_storage/influxDB/InfluxDB.cpp      | 61 ++++++++++++++++---
 chaos_service_common/ChaosManager.cpp         | 40 +++++++++---
 chaos_service_common/ChaosManager.h           |  4 +-
 chaos_service_common/DriverPoolManager.cpp    | 26 ++++++--
 4 files changed, 108 insertions(+), 23 deletions(-)

diff --git a/ChaosMetadataService/object_storage/influxDB/InfluxDB.cpp b/ChaosMetadataService/object_storage/influxDB/InfluxDB.cpp
index 2034bca51..e0a584354 100644
--- a/ChaosMetadataService/object_storage/influxDB/InfluxDB.cpp
+++ b/ChaosMetadataService/object_storage/influxDB/InfluxDB.cpp
@@ -401,9 +401,9 @@ int InfluxDB::findObject(const std::string&
                          chaos::common::direct_io::channel::opcode_headers::SearchSequence& last_record_found_seq) {
   int err = 0;
 
-  uint64_t seqid = last_record_found_seq.datapack_counter;
-  uint64_t runid = last_record_found_seq.run_id;
-
+  uint64_t last_ts=0;
+  last_record_found_seq.run_id=0;
+  
   std::stringstream ss;
   ss<<"SELECT ";
   if(projection_keys.size()==0){
@@ -459,10 +459,13 @@ int InfluxDB::findObject(const std::string&
 
     if(data.hasKey("results")&& data.isVectorValue("results")){
      chaos::common::data::CMultiTypeDataArrayWrapperSPtr results=data.getVectorValue("results");
-     chaos::common::data::CDWUniquePtr serie=results->getCDataWrapperElementAtIndex(0);
+    for(int cnt_res=0;cnt_res<results->size();cnt_res++){
+
+     chaos::common::data::CDWUniquePtr serie=results->getCDataWrapperElementAtIndex(cnt_res);
      if(serie.get()&&serie->hasKey("series")&&serie->isVectorValue("series")){
            chaos::common::data::CMultiTypeDataArrayWrapperSPtr cud=serie->getVectorValue("series");
-           chaos::common::data::CDWUniquePtr cu=cud->getCDataWrapperElementAtIndex(0);
+           for(int cnt_series=0;cnt_series<cud->size();cnt_series++){
+           chaos::common::data::CDWUniquePtr cu=cud->getCDataWrapperElementAtIndex(cnt_series);
             if(cu.get()&&cu->hasKey("name")){
               std::string name=cu->getStringValue("name");
               std::vector<std::string> cols;
@@ -481,18 +484,23 @@ int InfluxDB::findObject(const std::string&
                             int64_t ts=chaos::common::utility::TimingUtil::getTimestampFromString(val->getStringElementAtIndex(cntt),"%Y-%m-%dT%H:%M:%S%fZ");
                             dd->append(chaos::DataPackCommonKey::DPCK_DEVICE_ID,key);
                             dd->append(chaos::DataPackCommonKey::DPCK_TIMESTAMP,ts);
+                            if(ts>last_ts) last_ts=ts;
                           }
                           dd->append(cols[cntt],val->getBSONElementAtIndex(cntt));
                         }
-                 //   DBG<<cnt<<"] "<<dd->getCompliantJSONString();
+                   // DBG<<cnt<<"] "<<dd->getCompliantJSONString();
    
                     found_object_page.push_back(dd);
+                    last_record_found_seq.datapack_counter++;
+                    last_record_found_seq.ts=last_ts;
                     }
                   
                 }
               }
 
             }
+           }
+     }
 
      }
     }
@@ -567,7 +575,46 @@ int InfluxDB::countObject(const std::string& key,
                           const uint64_t     timestamp_from,
                           const uint64_t     timestamp_to,
                           uint64_t&          object_count) {
-  return 0;
+
+  std::stringstream ss;
+  ss<<"SELECT COUNT(*) FROM \""<<key<<"\" WHERE time>="<<timestamp_from*1000000<<" AND time<"<<timestamp_to*1000000;
+  std::string resp;
+  int ret=influxdb_cpp::query(resp,ss.str(),si);                        
+  if(ret==0){
+   // DBG<<"COUNT:"<<resp;
+
+    chaos::common::data::CDataWrapper data;
+    data.setSerializedJsonData(resp.c_str());
+    //DBG<<data.getJSONString();
+
+    if(data.hasKey("results")&& data.isVectorValue("results")){
+     chaos::common::data::CMultiTypeDataArrayWrapperSPtr results=data.getVectorValue("results");
+    for(int cnt_res=0;cnt_res<results->size();cnt_res++){
+
+     chaos::common::data::CDWUniquePtr serie=results->getCDataWrapperElementAtIndex(cnt_res);
+     if(serie.get()&&serie->hasKey("series")&&serie->isVectorValue("series")){
+           chaos::common::data::CMultiTypeDataArrayWrapperSPtr cud=serie->getVectorValue("series");
+           for(int cnt_series=0;cnt_series<cud->size();cnt_series++){
+           chaos::common::data::CDWUniquePtr cu=cud->getCDataWrapperElementAtIndex(cnt_series);
+              if(cu->hasKey("values")&&cu->isVectorValue("values")){
+                chaos::common::data::CMultiTypeDataArrayWrapperSPtr vals=cu->getVectorValue("values");
+                object_count=0;
+
+                 
+                
+
+                if(vals->size()>0){
+                    chaos::common::data::CMultiTypeDataArrayWrapperSPtr val=vals->getVectorElementAtIndex(0);
+                    if(val->size()>1){
+                     // DBG<<"size: "<<vals->size();
+                      object_count=val->getInt32ElementAtIndex(val->size()-1);
+
+                    }
+                  // 0 is time
+                }
+              }}}}}
+  }
+  return ret;
 }
 
 }  // namespace object_storage
diff --git a/chaos_service_common/ChaosManager.cpp b/chaos_service_common/ChaosManager.cpp
index 3dc239cda..5e7beb4f2 100644
--- a/chaos_service_common/ChaosManager.cpp
+++ b/chaos_service_common/ChaosManager.cpp
@@ -7,7 +7,6 @@
 #include "ChaosManager.h"
 #include <chaos/common/batch_command/BatchCommandConstants.h>
 #include <ChaosMetadataService/object_storage/abstraction/ObjectStorageDataAccess.h>
-
 #include <ChaosMetadataService/ChaosMetadataService.h>
 #include <ChaosMetadataService/api/node/ClearCommandQueue.h>
 #include <ChaosMetadataService/api/node/CommandTemplateSubmit.h>
@@ -229,20 +228,19 @@ int ChaosManager::init(const chaos::common::data::CDataWrapper& best_available_d
       InizializableService::initImplementation(DriverPoolManager::getInstance(), NULL, "DriverPoolManager", __PRETTY_FUNCTION__);
 
     } catch (...) {
-      DBGETERR << "Error Initializing alla drivers";
+      DBGETERR << "Error Initializing all drivers";
     }
 
     cache_driver = DriverPoolManager::getInstance()->getCacheDrvPtr();
     if (cache_driver == NULL) {
       DBGETERR << "Cannot use direct cache";
-      return -1;
     } else {
       DBGET << "Using direct cache";
     }
     persistence_driver = DriverPoolManager::getInstance()->getPersistenceDrvPtr();
     if (persistence_driver == NULL) {
       DBGETERR << "Cannot use direct persistence";
-      return -2;
+      //return -2;
 
     } else {
       DBGET << "Using direct persistence";
@@ -310,7 +308,6 @@ int ChaosManager::queryTS(const std::string&                     key,
       return -4;
     }
     chaos::common::direct_io::channel::opcode_headers::SearchSequence last_sequence;
-    chaos::common::direct_io::channel::opcode_headers::QueryResultPage found_element_page;
 
         return obj_storage_da->findObject(key,
                                           meta_tags,
@@ -318,11 +315,33 @@ int ChaosManager::queryTS(const std::string&                     key,
                                           start_ts,
                                           end_ts,
                                           page,
-                                          found_element_page,
+                                          elements,
                                           last_sequence);
   }
   return -2;
 }
+int ChaosManager::queryTSCount(const std::string& key,const uint64_t start_ts,const uint64_t end_ts,const ChaosStringSet& tags,const ChaosStringSet&vars){
+if (log_driver) {
+    chaos::metadata_service::object_storage::abstraction::ObjectStorageDataAccess* obj_storage_da = DriverPoolManager::getInstance()->getLogDrv().getDataAccess<chaos::metadata_service::object_storage::abstraction::ObjectStorageDataAccess>();
+    CHAOS_ASSERT(obj_storage_da);
+    if (obj_storage_da == NULL) {
+      DBGETERR << "Cannot retrieve log driver";
+      return -4;
+    }
+  DBGET << "query Count of "<<key <<" from "<<start_ts<<" to "<<end_ts;
+
+
+  uint64_t count_obj=0;
+    obj_storage_da->countObject(key,
+                                          start_ts,
+                                          end_ts,count_obj);
+   return    count_obj;                                    
+  }
+  DBGETERR << "log driver not available "<<key <<" from "<<start_ts<<" to "<<end_ts;
+
+  return -2;
+}
+
 int ChaosManager::queryDataCloud(const std::string&                                                 key,
                                  const ChaosStringSet&                                              meta_tags,
                                  const ChaosStringSet&                                              projection_keys,
@@ -741,6 +760,7 @@ chaos::common::data::CDWUniquePtr ChaosManager::restoreSnapshot(const std::strin
 
 chaos::common::data::CDWUniquePtr ChaosManager::commandTemplateSubmit(const std::string& uid, const std::string& command_alias, const chaos::common::data::CDWUniquePtr& slow_command_data, const SubmissionRuleType::SubmissionRule submission_rule, const uint32_t priority, const uint64_t scheduler_steps_delay, const uint32_t submission_checker_steps_delay) {
   CDWUniquePtr res;
+  CALC_EXEC_START;
 
   CDWUniquePtr message(new CDataWrapper());
   // this key need only to inform mds to redirect to node the slowcomand without porcess it
@@ -757,12 +777,14 @@ chaos::common::data::CDWUniquePtr ChaosManager::commandTemplateSubmit(const std:
     message->appendAllElement(*slow_command_data);
   }
   if (persistence_driver) {
-    CALC_EXEC_START;
 
     CommandTemplateSubmit node;
     res = node.execute(MOVE(message));
-    CALC_EXEC_END
-  }
+  } 
+  
+
+  CALC_EXEC_END
+
   return res;
 }
 
diff --git a/chaos_service_common/ChaosManager.h b/chaos_service_common/ChaosManager.h
index 6c80473cc..f8f7bb9cf 100644
--- a/chaos_service_common/ChaosManager.h
+++ b/chaos_service_common/ChaosManager.h
@@ -16,7 +16,7 @@
 #include <chaos/common/io/IODataDriver.h>
 #include <chaos/common/batch_command/BatchCommandTypes.h>
 #include <chaos/common/property/property.h>
-
+#define MANAGER_NO_DIRECT_ACCESS -1000
 namespace chaos {
   namespace metadata_service{
     class ChaosMetadataService;
@@ -156,6 +156,8 @@ int queryTS(const std::string& key,
                                        const uint64_t end_ts,
                                        const uint32_t page_dimension,
                                        chaos::common::data::VectorCDWShrdPtr& found_element_page);
+int queryTSCount(const std::string& key,const uint64_t start_ts,const uint64_t end_ts,const ChaosStringSet& tags=ChaosStringSet(),const ChaosStringSet&vars=ChaosStringSet());
+                                      
 int deleteDataCloud(const std::string& key,
                                        const uint64_t start_ts,
                                        const uint64_t end_ts,int32_t millisec_to_wait=10000);
diff --git a/chaos_service_common/DriverPoolManager.cpp b/chaos_service_common/DriverPoolManager.cpp
index d428932a8..eb7712e6f 100644
--- a/chaos_service_common/DriverPoolManager.cpp
+++ b/chaos_service_common/DriverPoolManager.cpp
@@ -49,6 +49,7 @@ DriverPoolManager::~DriverPoolManager() {}
 
 void DriverPoolManager::init(void* init_data) {
   //init cache pool
+  int err=0;
   //InizializableService::initImplementation(cache_pool, NULL, "CacheDriverPool", __PRETTY_FUNCTION__);
   const std::string cache_impl_name = cacheSetting.cache_driver_impl + "CacheDriver";
   if (cacheSetting.cache_driver_impl.size()) {
@@ -64,11 +65,16 @@ void DriverPoolManager::init(void* init_data) {
       cache_driver.init((void*)&cacheSetting, __PRETTY_FUNCTION__);
     } catch (CException& e) {
       cache_driver.reset(NULL,cache_impl_name);
-      throw e;
+      err++;
+      DECODE_CHAOS_EXCEPTION(e);
+
+      //throw e;
     } catch (...) {
       DP_LOG_ERR << " Undefined exception catchd during initialization of cache driver";
 
       cache_driver.reset(NULL,cache_impl_name);
+      err++;
+
     }
   }
   //init dirver instace
@@ -81,11 +87,15 @@ void DriverPoolManager::init(void* init_data) {
       persistence_driver.init((void*)&persistentSetting, __PRETTY_FUNCTION__);
     } catch (CException& e) {
       persistence_driver.reset(NULL,persistence_impl_name);
-      throw e;
+      err++;
+      DECODE_CHAOS_EXCEPTION(e);
+
+     // throw e;
     } catch (...) {
       DP_LOG_ERR << " Undefined exception catchd during initialization of persistent driver";
 
       persistence_driver.reset(NULL,persistence_impl_name);
+      err++;
     }
   }
   const std::string storage_impl_name = objectSetting.persistence_implementation + "ObjectStorageDriver";
@@ -97,10 +107,12 @@ void DriverPoolManager::init(void* init_data) {
       storage_driver.init((void*)&objectSetting, __PRETTY_FUNCTION__);
     } catch (CException& e) {
       storage_driver.reset(NULL,storage_impl_name);
-      throw e;
+      //throw e;
+      err++;
+      DECODE_CHAOS_EXCEPTION(e);
     } catch (...) {
       DP_LOG_ERR << " Undefined exception catchd during initialization of storage driver";
-
+      err++;
       storage_driver.reset(NULL,storage_impl_name);
     }
   }
@@ -109,20 +121,22 @@ void DriverPoolManager::init(void* init_data) {
     log_driver.reset(ObjectFactoryRegister<chaos::service_common::persistence::data_access::AbstractPersistenceDriver>::getInstance()->getNewInstanceByName(log_impl_name),
                      log_impl_name);
     if (log_driver.get() == NULL) {
-      DP_LOG_INFO << " Log driver not defined";
+      DP_LOG_ERR << " Log driver not defined ";
     } else {
       try {
         log_driver.init((void*)&logSetting, __PRETTY_FUNCTION__);
       } catch (CException& ex) {
         log_driver.reset(NULL,log_impl_name);
         DECODE_CHAOS_EXCEPTION(ex)
-
       } catch (...) {
         DP_LOG_ERR << " Undefined exception catchd during initialization of LOG driver";
         log_driver.reset(NULL,log_impl_name);
       }
     }
   }
+  if(err){
+    throw chaos::CException(err,"Drivers cannot be initialized",__PRETTY_FUNCTION__);
+  }
 }
 
 void DriverPoolManager::deinit() {
-- 
GitLab