diff --git a/ChaosAgent/AgentRegister.cpp b/ChaosAgent/AgentRegister.cpp index 47e6b30fcbf8d4b4b6cbca89cb9bc3eb51dcd4eb..333cc19504568c7f943b61a6a3a23521eb00d127 100644 --- a/ChaosAgent/AgentRegister.cpp +++ b/ChaosAgent/AgentRegister.cpp @@ -203,7 +203,7 @@ void AgentRegister::timeout() { case AgentRegisterStateUnregistered: HealtManager::getInstance()->addNodeMetricValue(ChaosAgent::getInstance()->settings.agent_uid, NodeHealtDefinitionKey::NODE_HEALT_STATUS, - NodeHealtDefinitionValue::NODE_HEALT_STATUS_UNLOAD); + NodeHealtDefinitionValue::NODE_HEALT_STATUS_UNLOAD,true); TimerHandler::stopMe(); HealtManager::getInstance()->removeNode(ChaosAgent::getInstance()->settings.agent_uid); break; @@ -220,17 +220,18 @@ void AgentRegister::timeout() { HealtManager::getInstance()->addNewNode(agent_uid); HealtManager::getInstance()->addNodeMetricValue(agent_uid, NodeHealtDefinitionKey::NODE_HEALT_STATUS, - NodeHealtDefinitionValue::NODE_HEALT_STATUS_LOADING); + NodeHealtDefinitionValue::NODE_HEALT_STATUS_LOADING,true); } break; } case AgentRegisterStateRegistered: { HealtManager::getInstance()->addNodeMetricValue(agent_uid, NodeHealtDefinitionKey::NODE_HEALT_STATUS, - NodeHealtDefinitionValue::NODE_HEALT_STATUS_START); + NodeHealtDefinitionValue::NODE_HEALT_STATUS_START,true); //stop timer TimerHandler::stopMe(); - + HealtManager::getInstance()->publishNodeHealt(agent_uid); + //register all action try{ for(MapWorkerIterator iter = map_worker.begin(); @@ -246,6 +247,8 @@ void AgentRegister::timeout() { //perform autstart WorkerSharedPtr pw_ptr = map_worker["ProcessWorker"]; WorkerSharedPtr lw_ptr = map_worker["LogWorker"]; + //stop timer + for(VectorAgentAssociationIterator it = agent_instance_sd_wrapper().node_associated.begin(), end = agent_instance_sd_wrapper().node_associated.end(); it != end; @@ -298,5 +301,4 @@ void AgentRegister::timeout() { NodeHealtDefinitionValue::NODE_HEALT_STATUS_FERROR); break; } - HealtManager::getInstance()->publishNodeHealt(agent_uid); } diff --git a/ChaosMetadataService/QueryDataConsumer.cpp b/ChaosMetadataService/QueryDataConsumer.cpp index f83f6bb99dc9b09a1e3d204e65a20dc7a619a756..9844e1651d60be24bb31eedf04ec567e83c4a093 100644 --- a/ChaosMetadataService/QueryDataConsumer.cpp +++ b/ChaosMetadataService/QueryDataConsumer.cpp @@ -127,23 +127,24 @@ int QueryDataConsumer::consumePutEvent(const std::string& key, chaos::common::data::CDataWrapper& data_pack) { int err = 0; - //data_pack.addInt64Value(NodeHealtDefinitionKey::NODE_HEALT_MDS_TIMESTAMP, now); - chaos::common::data::BufferUPtr ptr = data_pack.getBSONDataBuffer(); - if (ptr.get() == NULL) { - ERR << key << " empty packet"; - return -1; - } - BufferSPtr channel_data_injected(ptr.release()); - + DataServiceNodeDefinitionType::DSStorageType storage_type = static_cast<DataServiceNodeDefinitionType::DSStorageType>(hst_tag); //! if tag is == 1 the datapack is in liveonly - if (storage_type & DataServiceNodeDefinitionType::DSStorageTypeLive && channel_data_injected.get()) { + if (storage_type & DataServiceNodeDefinitionType::DSStorageTypeLive) { + chaos::common::data::BufferUPtr ptr = data_pack.getBSONDataBuffer(); + BufferSPtr channel_data_injected(ptr.release()); + if(channel_data_injected.get()==NULL){ + ERR<<"Invalid packet"; + return -1; + } //protected access to cached driver CacheDriver& cache_slot = DriverPoolManager::getInstance()->getCacheDrv(); err = cache_slot.putData(key, channel_data_injected); } + data_pack.removeKey(DataPackCommonKey::DPCK_DATASET_TYPE); + if (storage_type & DataServiceNodeDefinitionType::DSStorageLogHisto) { //protected access to cached driver ObjectStorageDataAccess* log_slot = DriverPoolManager::getInstance()->getLogDrv().getDataAccess<ObjectStorageDataAccess>(); @@ -160,7 +161,14 @@ int QueryDataConsumer::consumePutEvent(const std::string& key, //compute the index to use for the data worker uint32_t index_to_use; index_to_use = device_data_worker_index++ % archive_workers; - CHAOS_ASSERT(device_data_worker[index_to_use].get()) + CHAOS_ASSERT(device_data_worker[index_to_use].get()); + chaos::common::data::BufferUPtr ptr = data_pack.getBSONDataBuffer(); + BufferSPtr channel_data_injected(ptr.release()); + if(channel_data_injected.get()==NULL){ + ERR<<"Invalid packet"; + return -2; + } + //create storage job information auto job = ChaosMakeSharedPtr<DeviceSharedWorkerJob>(); job->key = key; diff --git a/ChaosMetadataService/QueryDataMsgPSConsumer.cpp b/ChaosMetadataService/QueryDataMsgPSConsumer.cpp index d025d200b306487cddb969b358f27c24c04bfeac..5229dd40071371d7a119ef8419eb6f06e701b6e2 100644 --- a/ChaosMetadataService/QueryDataMsgPSConsumer.cpp +++ b/ChaosMetadataService/QueryDataMsgPSConsumer.cpp @@ -115,7 +115,6 @@ void QueryDataMsgPSConsumer::messageHandler(chaos::common::message::ele_t& data) return; } cd->removeKey(DataServiceNodeDefinitionKey::DS_STORAGE_TYPE); - cd->removeKey(DataPackCommonKey::DPCK_DATASET_TYPE); } } cd->addInt32Value(DataPackCommonKey::NODE_MDS_TIMEDIFF, lat); diff --git a/ChaosMetadataService/worker/DeviceSharedDataWorker.cpp b/ChaosMetadataService/worker/DeviceSharedDataWorker.cpp index 35f764c66111281148355a94bcdac481c60724a9..186998bded8e024b49cee616ee1afe71594d6d48 100644 --- a/ChaosMetadataService/worker/DeviceSharedDataWorker.cpp +++ b/ChaosMetadataService/worker/DeviceSharedDataWorker.cpp @@ -83,7 +83,6 @@ int DeviceSharedDataWorker::executeJob(WorkerJobPtr job_info, void* cookie) { if(job.data_pack->data()){ CDataWrapper data_pack((char *)job.data_pack->data()); //push received datapack into object storage - if((err = obj_storage_da->pushObject(job.key, MOVE(job.meta_tag), data_pack))) { diff --git a/chaos/common/alarm/MultiSeverityAlarm.cpp b/chaos/common/alarm/MultiSeverityAlarm.cpp index 16373df46168c84735e26c8c5b4ec5477cbe8335..028896339efa9e53f20b218af341344c2b13e808 100644 --- a/chaos/common/alarm/MultiSeverityAlarm.cpp +++ b/chaos/common/alarm/MultiSeverityAlarm.cpp @@ -44,15 +44,15 @@ AlarmDescription(alarm_tag, alarm_description){ addState(MultiSeverityAlarmLevelClear, "clear", - "Alarm is in a regular state", + "Alarm Clear", StateFlagServerityRegular); addState(MultiSeverityAlarmLevelWarning, "Warning", - "Low probability that something will fails, attention is needed", + "Alarm Warning", StateFlagServerityWarning); addState(MultiSeverityAlarmLevelHigh, "High", - "High probability that something is going to fails", + "Alarm High", StateFlagServerityHigh); } diff --git a/chaos/common/message/MDSMessageChannel.cpp b/chaos/common/message/MDSMessageChannel.cpp index 526010b6b6b47f34be877f94f1e59707c1743863..6766f2733607209e03808cf6d8557af503817bb8 100644 --- a/chaos/common/message/MDSMessageChannel.cpp +++ b/chaos/common/message/MDSMessageChannel.cpp @@ -993,7 +993,7 @@ int MDSMessageChannel::queryDataCloud(const std::string& if (res->hasKey("ts")) { last_sequence.ts = res->getInt64Value("ts"); } - MSG_DBG << "DATA:" << res->getJSONString(); + //MSG_DBG << "DATA:" << res->getJSONString(); if (res->hasKey("data") && res->isVectorValue("data")) { CMultiTypeDataArrayWrapperSPtr d = res->getVectorValue("data"); for (int idx = 0; diff --git a/chaos/common/rpc/psm/PSMServer.cpp b/chaos/common/rpc/psm/PSMServer.cpp index 8775aadacbc4a6235ba68e169be4ee619f9afe98..b6329c794293aa1808e79bbbd046dd509a0f1ca5 100644 --- a/chaos/common/rpc/psm/PSMServer.cpp +++ b/chaos/common/rpc/psm/PSMServer.cpp @@ -106,7 +106,7 @@ void PSMServer::messageHandler( chaos::common::message::ele_t& data) { if(data.cd->hasKey(RpcActionDefinitionKey::CS_CMDM_ANSWER_HOST_IP)){ src=data.cd->getStringValue(RPC_SRC_UID); } - //PSMS_LDBG << "Message Received from node:"<<src<<" seq_id:"<<seq_id << " desc:"<<data.cd->getJSONString(); + PSMS_LDBG << "Message Received from node:"<<src<<" seq_id:"<<seq_id;// << " desc:"<<data.cd->getJSONString(); CDWShrdPtr result_data_pack; if(data.cd->hasKey(RPC_SYNC_KEY) && diff --git a/chaos/cu_toolkit/control_manager/AbstractControlUnit.cpp b/chaos/cu_toolkit/control_manager/AbstractControlUnit.cpp index ba39fce54e2013faac321d8d8c635842256fb24b..6b42a002604b0ca47e088832aa49d65d683a4417 100644 --- a/chaos/cu_toolkit/control_manager/AbstractControlUnit.cpp +++ b/chaos/cu_toolkit/control_manager/AbstractControlUnit.cpp @@ -732,7 +732,7 @@ void AbstractControlUnit::unitDefineCustomAttribute() { cnt++; } - if (cnt) { + /* if (cnt) { // drv.finalizeArrayForKey(chaos::ControlUnitNodeDefinitionKey::CONTROL_UNIT_DRIVER_INFO); ACULDBG_ << " Adding driver properties to custom dataset " << chaos::ControlUnitNodeDefinitionKey::CONTROL_UNIT_DRIVER_INFO << ":" << drv.getJSONString(); @@ -742,7 +742,7 @@ void AbstractControlUnit::unitDefineCustomAttribute() { ACULDBG_ << " Adding driver info to custom dataset " << chaos::ControlUnitNodeDefinitionKey::CONTROL_UNIT_CU_INFO << ":" << drv_info->getJSONString(); getAttributeCache()->addCustomAttribute(chaos::ControlUnitNodeDefinitionKey::CONTROL_UNIT_CU_INFO, *drv_info.get()); getAttributeCache()->setCustomAttributeValue(chaos::ControlUnitNodeDefinitionKey::CONTROL_UNIT_CU_INFO, *drv_info.get()); - } + }*/ } void AbstractControlUnit::_undefineActionAndDataset() { @@ -809,17 +809,20 @@ void AbstractControlUnit::doInitRpCheckList() { CHAOS_CHECK_LIST_DONE(check_list_sub_service, "_init", INIT_RPC_PHASE_CALL_UNIT_DEFINE_ATTRIBUTE) { std::string cu_load_param = getCUParam(); - if (isCUParamInJson()) { + /* if (isCUParamInJson()) { getAttributeCache()->addCustomAttribute(chaos::ControlUnitNodeDefinitionKey::CONTROL_UNIT_LOAD_PARAM, cu_load_param.size() + 1, chaos::DataType::TYPE_CLUSTER); } else { getAttributeCache()->addCustomAttribute(chaos::ControlUnitNodeDefinitionKey::CONTROL_UNIT_LOAD_PARAM, cu_load_param.size() + 1, chaos::DataType::TYPE_STRING); } getAttributeCache()->setCustomAttributeValue(chaos::ControlUnitNodeDefinitionKey::CONTROL_UNIT_LOAD_PARAM, (void*)cu_load_param.c_str(), cu_load_param.size() + 1); - +*/ //define the implementations custom variable AbstractControlUnit::unitDefineCustomAttribute(); unitDefineCustomAttribute(); - + getAttributeCache()->setCustomDomainAsChanged(); + fillCachedValueVector(attribute_value_shared_cache->getSharedDomain(DOMAIN_CUSTOM), + cache_custom_attribute_vector); + pushCustomDataset(); break; } CHAOS_CHECK_LIST_DONE(check_list_sub_service, "_init", INIT_RPC_PHASE_CREATE_FAST_ACCESS_CASCHE_VECTOR) { @@ -930,7 +933,7 @@ void AbstractControlUnit::doInitRpCheckList() { } } - if (cu_ds_init.get()) { + /* if (cu_ds_init.get()) { ACULDBG_ << "INIT ATTRIBUTES:" << cu_ds_init->getJSONString(); getAttributeCache()->addCustomAttribute(chaos::ControlUnitNodeDefinitionKey::CONTROL_UNIT_DATASET_INITIALIZATION, *(cu_ds_init.get())); @@ -938,7 +941,7 @@ void AbstractControlUnit::doInitRpCheckList() { fillCachedValueVector(attribute_value_shared_cache->getSharedDomain(DOMAIN_CUSTOM), cache_custom_attribute_vector); - } + }*/ CDWUniquePtr res = setDatasetAttribute(MOVE(cdw_unique_ptr)); } } diff --git a/chaos/cu_toolkit/data_manager/KeyDataStorage.cpp b/chaos/cu_toolkit/data_manager/KeyDataStorage.cpp index 48ab1897e9d01310ad91d274d4b13e47ca7b961c..4ae1240262112d7d983c5babfccbb04cb0b652f6 100644 --- a/chaos/cu_toolkit/data_manager/KeyDataStorage.cpp +++ b/chaos/cu_toolkit/data_manager/KeyDataStorage.cpp @@ -290,6 +290,8 @@ int KeyDataStorage::loadRestorePoint(const std::string& restore_point_tag) { return err; } if(dataset.get()){ + // KeyDataStorageLDBG<< "RESTORE PACK:" << restore_point_tag << " :" << dataset->getJSONString(); + if(dataset->hasKey(DataPackID::INPUT_DATASET_ID)&&dataset->isCDataWrapperValue(DataPackID::INPUT_DATASET_ID)){ CDWShrdPtr p(dataset->getCSDataValue(DataPackID::INPUT_DATASET_ID).release()); restore_point_map[restore_point_tag].insert(make_pair(input_key, p)); @@ -297,6 +299,7 @@ int KeyDataStorage::loadRestorePoint(const std::string& restore_point_tag) { if(dataset->hasKey(DataPackID::OUTPUT_DATASET_ID)&&dataset->isCDataWrapperValue(DataPackID::OUTPUT_DATASET_ID)){ CDWShrdPtr p(dataset->getCSDataValue(DataPackID::OUTPUT_DATASET_ID).release()); restore_point_map[restore_point_tag].insert(make_pair(output_key,p)); + } if(dataset->hasKey(DataPackID::SYSTEM_DATASETID)&&dataset->isCDataWrapperValue(DataPackID::SYSTEM_DATASETID)){ CDWShrdPtr p(dataset->getCSDataValue(DataPackID::SYSTEM_DATASETID).release());