From 929b16ac7ff811d6ce771883ac5157c77b569226 Mon Sep 17 00:00:00 2001
From: amichelo <andrea.michelotti@lnf.infn.it>
Date: Thu, 11 Nov 2021 00:23:02 +0100
Subject: [PATCH] snapshots custom removed things

---
 ChaosAgent/AgentRegister.cpp                  | 12 ++++----
 ChaosMetadataService/QueryDataConsumer.cpp    | 28 ++++++++++++-------
 .../QueryDataMsgPSConsumer.cpp                |  1 -
 .../worker/DeviceSharedDataWorker.cpp         |  1 -
 chaos/common/alarm/MultiSeverityAlarm.cpp     |  6 ++--
 chaos/common/message/MDSMessageChannel.cpp    |  2 +-
 chaos/common/rpc/psm/PSMServer.cpp            |  2 +-
 .../control_manager/AbstractControlUnit.cpp   | 17 ++++++-----
 .../data_manager/KeyDataStorage.cpp           |  3 ++
 9 files changed, 43 insertions(+), 29 deletions(-)

diff --git a/ChaosAgent/AgentRegister.cpp b/ChaosAgent/AgentRegister.cpp
index 47e6b30fc..333cc1950 100644
--- a/ChaosAgent/AgentRegister.cpp
+++ b/ChaosAgent/AgentRegister.cpp
@@ -203,7 +203,7 @@ void AgentRegister::timeout() {
         case AgentRegisterStateUnregistered:
             HealtManager::getInstance()->addNodeMetricValue(ChaosAgent::getInstance()->settings.agent_uid,
                                                             NodeHealtDefinitionKey::NODE_HEALT_STATUS,
-                                                            NodeHealtDefinitionValue::NODE_HEALT_STATUS_UNLOAD);
+                                                            NodeHealtDefinitionValue::NODE_HEALT_STATUS_UNLOAD,true);
             TimerHandler::stopMe();
             HealtManager::getInstance()->removeNode(ChaosAgent::getInstance()->settings.agent_uid);
             break;
@@ -220,17 +220,18 @@ void AgentRegister::timeout() {
                 HealtManager::getInstance()->addNewNode(agent_uid);
                 HealtManager::getInstance()->addNodeMetricValue(agent_uid,
                                                                 NodeHealtDefinitionKey::NODE_HEALT_STATUS,
-                                                                NodeHealtDefinitionValue::NODE_HEALT_STATUS_LOADING);
+                                                                NodeHealtDefinitionValue::NODE_HEALT_STATUS_LOADING,true);
             }
             break;
         }
         case AgentRegisterStateRegistered: {
             HealtManager::getInstance()->addNodeMetricValue(agent_uid,
                                                             NodeHealtDefinitionKey::NODE_HEALT_STATUS,
-                                                            NodeHealtDefinitionValue::NODE_HEALT_STATUS_START);
+                                                            NodeHealtDefinitionValue::NODE_HEALT_STATUS_START,true);
             //stop timer
             TimerHandler::stopMe();
-            
+            HealtManager::getInstance()->publishNodeHealt(agent_uid);
+
             //register all action
             try{
                 for(MapWorkerIterator iter = map_worker.begin();
@@ -246,6 +247,8 @@ void AgentRegister::timeout() {
                 //perform autstart
                 WorkerSharedPtr pw_ptr = map_worker["ProcessWorker"];
                 WorkerSharedPtr lw_ptr = map_worker["LogWorker"];
+                   //stop timer
+         
                 for(VectorAgentAssociationIterator it = agent_instance_sd_wrapper().node_associated.begin(),
                     end = agent_instance_sd_wrapper().node_associated.end();
                     it != end;
@@ -298,5 +301,4 @@ void AgentRegister::timeout() {
                                                             NodeHealtDefinitionValue::NODE_HEALT_STATUS_FERROR);
             break;
     }
-    HealtManager::getInstance()->publishNodeHealt(agent_uid);
 }
diff --git a/ChaosMetadataService/QueryDataConsumer.cpp b/ChaosMetadataService/QueryDataConsumer.cpp
index f83f6bb99..9844e1651 100644
--- a/ChaosMetadataService/QueryDataConsumer.cpp
+++ b/ChaosMetadataService/QueryDataConsumer.cpp
@@ -127,23 +127,24 @@ int QueryDataConsumer::consumePutEvent(const std::string&                 key,
                                        chaos::common::data::CDataWrapper& data_pack) {
   int err = 0;
 
-  //data_pack.addInt64Value(NodeHealtDefinitionKey::NODE_HEALT_MDS_TIMESTAMP, now);
-  chaos::common::data::BufferUPtr ptr = data_pack.getBSONDataBuffer();
-  if (ptr.get() == NULL) {
-    ERR << key << " empty packet";
-    return -1;
-  }
-  BufferSPtr channel_data_injected(ptr.release());
-
+  
   DataServiceNodeDefinitionType::DSStorageType storage_type = static_cast<DataServiceNodeDefinitionType::DSStorageType>(hst_tag);
   //! if tag is == 1 the datapack is in liveonly
 
-  if (storage_type & DataServiceNodeDefinitionType::DSStorageTypeLive && channel_data_injected.get()) {
+  if (storage_type & DataServiceNodeDefinitionType::DSStorageTypeLive) {
+    chaos::common::data::BufferUPtr ptr = data_pack.getBSONDataBuffer();
+    BufferSPtr channel_data_injected(ptr.release());
+    if(channel_data_injected.get()==NULL){
+      ERR<<"Invalid packet";
+      return -1;
+    }
     //protected access to cached driver
     CacheDriver& cache_slot = DriverPoolManager::getInstance()->getCacheDrv();
     err                     = cache_slot.putData(key,
                              channel_data_injected);
   }
+  data_pack.removeKey(DataPackCommonKey::DPCK_DATASET_TYPE);
+
   if (storage_type & DataServiceNodeDefinitionType::DSStorageLogHisto) {
     //protected access to cached driver
     ObjectStorageDataAccess* log_slot = DriverPoolManager::getInstance()->getLogDrv().getDataAccess<ObjectStorageDataAccess>();
@@ -160,7 +161,14 @@ int QueryDataConsumer::consumePutEvent(const std::string&                 key,
     //compute the index to use for the data worker
     uint32_t index_to_use;
     index_to_use = device_data_worker_index++ % archive_workers;
-    CHAOS_ASSERT(device_data_worker[index_to_use].get())
+    CHAOS_ASSERT(device_data_worker[index_to_use].get());
+    chaos::common::data::BufferUPtr ptr = data_pack.getBSONDataBuffer();
+    BufferSPtr channel_data_injected(ptr.release());
+    if(channel_data_injected.get()==NULL){
+      ERR<<"Invalid packet";
+      return -2;
+    }
+
     //create storage job information
     auto job       = ChaosMakeSharedPtr<DeviceSharedWorkerJob>();
     job->key       = key;
diff --git a/ChaosMetadataService/QueryDataMsgPSConsumer.cpp b/ChaosMetadataService/QueryDataMsgPSConsumer.cpp
index d025d200b..5229dd400 100644
--- a/ChaosMetadataService/QueryDataMsgPSConsumer.cpp
+++ b/ChaosMetadataService/QueryDataMsgPSConsumer.cpp
@@ -115,7 +115,6 @@ void QueryDataMsgPSConsumer::messageHandler(chaos::common::message::ele_t& data)
               return;
             }
             cd->removeKey(DataServiceNodeDefinitionKey::DS_STORAGE_TYPE);
-            cd->removeKey(DataPackCommonKey::DPCK_DATASET_TYPE);
           }
         }
         cd->addInt32Value(DataPackCommonKey::NODE_MDS_TIMEDIFF, lat);
diff --git a/ChaosMetadataService/worker/DeviceSharedDataWorker.cpp b/ChaosMetadataService/worker/DeviceSharedDataWorker.cpp
index 35f764c66..186998bde 100644
--- a/ChaosMetadataService/worker/DeviceSharedDataWorker.cpp
+++ b/ChaosMetadataService/worker/DeviceSharedDataWorker.cpp
@@ -83,7 +83,6 @@ int DeviceSharedDataWorker::executeJob(WorkerJobPtr job_info, void* cookie) {
            if(job.data_pack->data()){
             CDataWrapper data_pack((char *)job.data_pack->data());
             //push received datapack into object storage
-            
             if((err = obj_storage_da->pushObject(job.key,
                                                  MOVE(job.meta_tag),
                                                  data_pack))) {
diff --git a/chaos/common/alarm/MultiSeverityAlarm.cpp b/chaos/common/alarm/MultiSeverityAlarm.cpp
index 16373df46..028896339 100644
--- a/chaos/common/alarm/MultiSeverityAlarm.cpp
+++ b/chaos/common/alarm/MultiSeverityAlarm.cpp
@@ -44,15 +44,15 @@ AlarmDescription(alarm_tag,
                  alarm_description){
     addState(MultiSeverityAlarmLevelClear,
              "clear",
-             "Alarm is in a regular state",
+             "Alarm Clear",
              StateFlagServerityRegular);
     addState(MultiSeverityAlarmLevelWarning,
              "Warning",
-             "Low probability that something will fails, attention is needed",
+             "Alarm Warning",
              StateFlagServerityWarning);
     addState(MultiSeverityAlarmLevelHigh,
              "High",
-             "High probability that something is going to fails",
+             "Alarm High",
              StateFlagServerityHigh);
     
 }
diff --git a/chaos/common/message/MDSMessageChannel.cpp b/chaos/common/message/MDSMessageChannel.cpp
index 526010b6b..6766f2733 100644
--- a/chaos/common/message/MDSMessageChannel.cpp
+++ b/chaos/common/message/MDSMessageChannel.cpp
@@ -993,7 +993,7 @@ int MDSMessageChannel::queryDataCloud(const std::string&
         if (res->hasKey("ts")) {
           last_sequence.ts = res->getInt64Value("ts");
         }
-        MSG_DBG << "DATA:" << res->getJSONString();
+        //MSG_DBG << "DATA:" << res->getJSONString();
         if (res->hasKey("data") && res->isVectorValue("data")) {
           CMultiTypeDataArrayWrapperSPtr d = res->getVectorValue("data");
           for (int idx = 0;
diff --git a/chaos/common/rpc/psm/PSMServer.cpp b/chaos/common/rpc/psm/PSMServer.cpp
index 8775aadac..b6329c794 100644
--- a/chaos/common/rpc/psm/PSMServer.cpp
+++ b/chaos/common/rpc/psm/PSMServer.cpp
@@ -106,7 +106,7 @@ void PSMServer::messageHandler( chaos::common::message::ele_t& data) {
     if(data.cd->hasKey(RpcActionDefinitionKey::CS_CMDM_ANSWER_HOST_IP)){
         src=data.cd->getStringValue(RPC_SRC_UID);
     }
-    //PSMS_LDBG << "Message Received from node:"<<src<<" seq_id:"<<seq_id << " desc:"<<data.cd->getJSONString();
+    PSMS_LDBG << "Message Received from node:"<<src<<" seq_id:"<<seq_id;// << " desc:"<<data.cd->getJSONString();
     CDWShrdPtr result_data_pack;
 
     if(data.cd->hasKey(RPC_SYNC_KEY) &&
diff --git a/chaos/cu_toolkit/control_manager/AbstractControlUnit.cpp b/chaos/cu_toolkit/control_manager/AbstractControlUnit.cpp
index ba39fce54..6b42a0026 100644
--- a/chaos/cu_toolkit/control_manager/AbstractControlUnit.cpp
+++ b/chaos/cu_toolkit/control_manager/AbstractControlUnit.cpp
@@ -732,7 +732,7 @@ void AbstractControlUnit::unitDefineCustomAttribute() {
     cnt++;
   }
 
-  if (cnt) {
+ /* if (cnt) {
     //        drv.finalizeArrayForKey(chaos::ControlUnitNodeDefinitionKey::CONTROL_UNIT_DRIVER_INFO);
     ACULDBG_ << " Adding driver properties to custom dataset " << chaos::ControlUnitNodeDefinitionKey::CONTROL_UNIT_DRIVER_INFO << ":" << drv.getJSONString();
 
@@ -742,7 +742,7 @@ void AbstractControlUnit::unitDefineCustomAttribute() {
     ACULDBG_ << " Adding driver info to custom dataset " << chaos::ControlUnitNodeDefinitionKey::CONTROL_UNIT_CU_INFO << ":" << drv_info->getJSONString();
     getAttributeCache()->addCustomAttribute(chaos::ControlUnitNodeDefinitionKey::CONTROL_UNIT_CU_INFO, *drv_info.get());
     getAttributeCache()->setCustomAttributeValue(chaos::ControlUnitNodeDefinitionKey::CONTROL_UNIT_CU_INFO, *drv_info.get());
-  }
+  }*/
 }
 
 void AbstractControlUnit::_undefineActionAndDataset() {
@@ -809,17 +809,20 @@ void         AbstractControlUnit::doInitRpCheckList() {
     CHAOS_CHECK_LIST_DONE(check_list_sub_service, "_init", INIT_RPC_PHASE_CALL_UNIT_DEFINE_ATTRIBUTE) {
       std::string cu_load_param = getCUParam();
 
-      if (isCUParamInJson()) {
+    /* if (isCUParamInJson()) {
         getAttributeCache()->addCustomAttribute(chaos::ControlUnitNodeDefinitionKey::CONTROL_UNIT_LOAD_PARAM, cu_load_param.size() + 1, chaos::DataType::TYPE_CLUSTER);
       } else {
         getAttributeCache()->addCustomAttribute(chaos::ControlUnitNodeDefinitionKey::CONTROL_UNIT_LOAD_PARAM, cu_load_param.size() + 1, chaos::DataType::TYPE_STRING);
       }
       getAttributeCache()->setCustomAttributeValue(chaos::ControlUnitNodeDefinitionKey::CONTROL_UNIT_LOAD_PARAM, (void*)cu_load_param.c_str(), cu_load_param.size() + 1);
-
+*/
       //define the implementations custom variable
       AbstractControlUnit::unitDefineCustomAttribute();
       unitDefineCustomAttribute();
-
+      getAttributeCache()->setCustomDomainAsChanged();
+      fillCachedValueVector(attribute_value_shared_cache->getSharedDomain(DOMAIN_CUSTOM),
+                                  cache_custom_attribute_vector);
+      pushCustomDataset();
       break;
     }
     CHAOS_CHECK_LIST_DONE(check_list_sub_service, "_init", INIT_RPC_PHASE_CREATE_FAST_ACCESS_CASCHE_VECTOR) {
@@ -930,7 +933,7 @@ void         AbstractControlUnit::doInitRpCheckList() {
             }
           }
 
-          if (cu_ds_init.get()) {
+         /* if (cu_ds_init.get()) {
             ACULDBG_ << "INIT ATTRIBUTES:" << cu_ds_init->getJSONString();
 
             getAttributeCache()->addCustomAttribute(chaos::ControlUnitNodeDefinitionKey::CONTROL_UNIT_DATASET_INITIALIZATION, *(cu_ds_init.get()));
@@ -938,7 +941,7 @@ void         AbstractControlUnit::doInitRpCheckList() {
 
             fillCachedValueVector(attribute_value_shared_cache->getSharedDomain(DOMAIN_CUSTOM),
                                   cache_custom_attribute_vector);
-          }
+          }*/
           CDWUniquePtr res = setDatasetAttribute(MOVE(cdw_unique_ptr));
         }
       }
diff --git a/chaos/cu_toolkit/data_manager/KeyDataStorage.cpp b/chaos/cu_toolkit/data_manager/KeyDataStorage.cpp
index 48ab1897e..4ae124026 100644
--- a/chaos/cu_toolkit/data_manager/KeyDataStorage.cpp
+++ b/chaos/cu_toolkit/data_manager/KeyDataStorage.cpp
@@ -290,6 +290,8 @@ int KeyDataStorage::loadRestorePoint(const std::string& restore_point_tag) {
         return err;
     }
     if(dataset.get()){
+       // KeyDataStorageLDBG<< "RESTORE PACK:" << restore_point_tag << " :" << dataset->getJSONString();
+
         if(dataset->hasKey(DataPackID::INPUT_DATASET_ID)&&dataset->isCDataWrapperValue(DataPackID::INPUT_DATASET_ID)){
             CDWShrdPtr p(dataset->getCSDataValue(DataPackID::INPUT_DATASET_ID).release());
             restore_point_map[restore_point_tag].insert(make_pair(input_key, p));
@@ -297,6 +299,7 @@ int KeyDataStorage::loadRestorePoint(const std::string& restore_point_tag) {
         if(dataset->hasKey(DataPackID::OUTPUT_DATASET_ID)&&dataset->isCDataWrapperValue(DataPackID::OUTPUT_DATASET_ID)){
             CDWShrdPtr p(dataset->getCSDataValue(DataPackID::OUTPUT_DATASET_ID).release());
             restore_point_map[restore_point_tag].insert(make_pair(output_key,p));
+
         }
         if(dataset->hasKey(DataPackID::SYSTEM_DATASETID)&&dataset->isCDataWrapperValue(DataPackID::SYSTEM_DATASETID)){
             CDWShrdPtr p(dataset->getCSDataValue(DataPackID::SYSTEM_DATASETID).release());
-- 
GitLab