diff --git a/ChaosMetadataService/ChaosMetadataService.cpp b/ChaosMetadataService/ChaosMetadataService.cpp index fc1f1ba9f688bfa771d9db9ba51aa15639f515e8..1310c0436e5164dab954ddf45045bb9b6c3089e8 100644 --- a/ChaosMetadataService/ChaosMetadataService.cpp +++ b/ChaosMetadataService/ChaosMetadataService.cpp @@ -194,18 +194,19 @@ void ChaosMetadataService::init(void* init_data) { InizializableService::initImplementation(DriverPoolManager::getInstance(), NULL, "DriverPoolManager", __PRETTY_FUNCTION__); - //! batch system - StartableService::initImplementation(MDSBatchExecutor::getInstance(), NULL, "MDSBatchExecutor", __PRETTY_FUNCTION__); - - // api system - api_managment_service.reset(new ApiManagement(), "ApiManagment"); - api_managment_service.init(NULL, __PRETTY_FUNCTION__); #if defined(KAFKA_RDK_ENABLE) || defined(KAFKA_ASIO_ENABLE) #warning "CDS NEEDS KAFKA" message_consumer.reset(new QueryDataMsgPSConsumer(CDS_GROUP_NAME), "QueryDataMsgPSConsumer"); if (!message_consumer.get()) throw chaos::CException(-7, "Error instantiating message data consumer", __PRETTY_FUNCTION__); message_consumer.init(NULL, __PRETTY_FUNCTION__); #endif + + //! batch system + StartableService::initImplementation(MDSBatchExecutor::getInstance(), NULL, "MDSBatchExecutor", __PRETTY_FUNCTION__); + + // api system + api_managment_service.reset(new ApiManagment(), "ApiManagment"); + api_managment_service.init(NULL, __PRETTY_FUNCTION__); data_consumer.reset(new QueryDataConsumer(), "QueryDataConsumer"); if (!data_consumer.get()) throw chaos::CException(-7, "Error instantiating data consumer", __PRETTY_FUNCTION__); @@ -217,7 +218,7 @@ void ChaosMetadataService::init(void* init_data) { "MDSConousManager", __PRETTY_FUNCTION__); - InizializableService::initImplementation(SharedManagedDirecIoDataDriver::getInstance(), NULL, "SharedManagedDirecIoDataDriver", __PRETTY_FUNCTION__); + // InizializableService::initImplementation(SharedManagedDirecIoDataDriver::getInstance(), NULL, "SharedManagedDirecIoDataDriver", __PRETTY_FUNCTION__); StartableService::initImplementation(HealtManagerDirect::getInstance(), NULL, "HealtManagerDirect", __PRETTY_FUNCTION__); @@ -228,9 +229,33 @@ void ChaosMetadataService::init(void* init_data) { // start data manager } -int ChaosMetadataService::notifyNewNode(const std::string& nodeuid) { - LCND_LDBG << " NEW NODE:" << nodeuid; - return message_consumer->consumeHealthDataEvent(nodeuid, 0, ChaosStringSetConstSPtr(), ChaosMakeSharedPtr<Buffer>()); +int ChaosMetadataService::notifyNewNode(const std::string& nod) { + if(nod==nodeuid){ + LCND_LDBG << " NEW NODE ITS ME" << nod; + return 0; + + } else { + LCND_LDBG << " NEW NODE:" << nod; + } + { + int retry=10; + while((message_consumer.get()==NULL)&&(retry>0)){ + LCND_LDBG << " not still ready to process request from :"<<nod; + sleep(1); + retry--; + + } + if(retry==0){ + LCND_LERR << "INTERNAL ERROR: consumer not started exiting"; + exit(1); + + } + } + if(message_consumer.get()==NULL){ + LCND_LERR << " not yet ready to process... request from :"<<nod; + return -1; + } + return message_consumer->consumeHealthDataEvent(nod, 0, ChaosStringSetConstSPtr(), ChaosMakeSharedPtr<Buffer>()); } /* diff --git a/ChaosMetadataService/QueryDataConsumer.cpp b/ChaosMetadataService/QueryDataConsumer.cpp index ad96dd549f46fd6e58b190f7ab415c789f5b7a25..f83f6bb99dc9b09a1e3d204e65a20dc7a619a756 100644 --- a/ChaosMetadataService/QueryDataConsumer.cpp +++ b/ChaosMetadataService/QueryDataConsumer.cpp @@ -203,11 +203,12 @@ int QueryDataConsumer::consumeHealthDataEvent(const std::string& key, CDataWrapper health_data_pack((char*)channel_data->data()); health_data_pack.addInt64Value(NodeHealtDefinitionKey::NODE_HEALT_MDS_TIMESTAMP, TimingUtil::getTimeStamp()); - NodeDataAccess* s_da = DriverPoolManager::getInstance()->getPersistenceDataAccess<NodeDataAccess>(); + +#ifdef HEALTH_ON_DB + NodeDataAccess* s_da = DriverPoolManager::getInstance()->getPersistenceDataAccess<NodeDataAccess>(); HealthStatSDWrapper attribute_reference_wrapper; attribute_reference_wrapper.deserialize(&health_data_pack); -#ifdef HEALTH_ON_DB // WE HAVE ALREADY HEALTH INFO IN CACHE if ((err = s_da->setNodeHealthStatus(attribute_reference_wrapper().node_uid, attribute_reference_wrapper()))) { diff --git a/ChaosMetadataService/QueryDataMsgPSConsumer.cpp b/ChaosMetadataService/QueryDataMsgPSConsumer.cpp index 1b4836e40e69515591fbd72c4fb0b10481ee2886..d025d200b306487cddb969b358f27c24c04bfeac 100644 --- a/ChaosMetadataService/QueryDataMsgPSConsumer.cpp +++ b/ChaosMetadataService/QueryDataMsgPSConsumer.cpp @@ -63,44 +63,44 @@ QueryDataMsgPSConsumer::QueryDataMsgPSConsumer(const std::string& id) void QueryDataMsgPSConsumer::messageHandler(chaos::common::message::ele_t& data) { try { - + chaos::common::data::CDataWrapper* cd=data.cd.get(); - if (data.cd.get()&&data.cd->hasKey(DataPackCommonKey::DPCK_DATASET_TYPE) && data.cd->hasKey(NodeDefinitionKey::NODE_UNIQUE_ID)) { + if (cd&&cd->hasKey(DataPackCommonKey::DPCK_DATASET_TYPE) && cd->hasKey(NodeDefinitionKey::NODE_UNIQUE_ID)) { uint64_t now = TimingUtil::getTimeStamp(); - int pktype = data.cd->getInt32Value(DataPackCommonKey::DPCK_DATASET_TYPE); + int pktype = cd->getInt32Value(DataPackCommonKey::DPCK_DATASET_TYPE); int64_t ts = 0; uint32_t st = (uint32_t)DataServiceNodeDefinitionType::DSStorageTypeLive; - if (data.cd->hasKey(DataServiceNodeDefinitionKey::DS_STORAGE_TYPE)) { - st = data.cd->getInt32Value(DataServiceNodeDefinitionKey::DS_STORAGE_TYPE); + if (cd->hasKey(DataServiceNodeDefinitionKey::DS_STORAGE_TYPE)) { + st = cd->getInt32Value(DataServiceNodeDefinitionKey::DS_STORAGE_TYPE); if (pktype != DataPackCommonKey::DPCK_DATASET_TYPE_OUTPUT) { st |= (uint32_t)DataServiceNodeDefinitionType::DSStorageTypeLive; } } - // kp = data.cd->getStringValue(NodeDefinitionKey::NODE_UNIQUE_ID) + datasetTypeToPostfix(pktype); + // kp = cd->getStringValue(NodeDefinitionKey::NODE_UNIQUE_ID) + datasetTypeToPostfix(pktype); int32_t lat = 0; if (pktype == DataPackCommonKey::DPCK_DATASET_TYPE_LOG) { - if (data.cd->hasKey(MetadataServerLoggingDefinitionKeyRPC::PARAM_NODE_LOGGING_LOG_TIMESTAMP)) { - ts = data.cd->getInt64Value(MetadataServerLoggingDefinitionKeyRPC::PARAM_NODE_LOGGING_LOG_TIMESTAMP); + if (cd->hasKey(MetadataServerLoggingDefinitionKeyRPC::PARAM_NODE_LOGGING_LOG_TIMESTAMP)) { + ts = cd->getInt64Value(MetadataServerLoggingDefinitionKeyRPC::PARAM_NODE_LOGGING_LOG_TIMESTAMP); lat = now - ts; - if (lat > SKIP_OLDER_THAN) { - ERR << data.key << " log too old: " << lat << " ms, skipping..."; + if (lat > chaos::common::constants::SkipDatasetOlderThan) { + ERR << data.key << " log too old: " << lat/1000.0 << " s, skipping..."; return; } - data.cd->addInt32Value(DataPackCommonKey::NODE_MDS_TIMEDIFF, lat); + cd->addInt32Value(DataPackCommonKey::NODE_MDS_TIMEDIFF, lat); } - // DBG<<"Queue:"<<CObjectProcessingPriorityQueue<CDataWrapper>::queueSize()<<" LOG:"<<data.cd->getJSONString(); + // DBG<<"Queue:"<<CObjectProcessingPriorityQueue<CDataWrapper>::queueSize()<<" LOG:"<<cd->getJSONString(); if (CObjectProcessingPriorityQueue<CDataWrapper>::queueSize() < MAX_LOG_QUEUE) { - CDWShrdPtr ptr(data.cd.release()); + CDWShrdPtr ptr(cd->clone().release()); CObjectProcessingPriorityQueue<CDataWrapper>::push(ptr, 0); } else { ERR << data.key << "] too many logs on queue for DB:" << CObjectProcessingPriorityQueue<CDataWrapper>::queueSize(); return; } - } else if (data.cd->hasKey(DataPackCommonKey::DPCK_TIMESTAMP)) { - ts = data.cd->getInt64Value(DataPackCommonKey::DPCK_TIMESTAMP); + } else if (cd->hasKey(DataPackCommonKey::DPCK_TIMESTAMP)) { + ts = cd->getInt64Value(DataPackCommonKey::DPCK_TIMESTAMP); lat = (now - ts); if ((pktype == DataPackCommonKey::DPCK_DATASET_TYPE_HEALTH)) { if (lat > (chaos::common::constants::HBTimersTimeoutinMSec * 2)) { @@ -109,25 +109,25 @@ void QueryDataMsgPSConsumer::messageHandler(chaos::common::message::ele_t& data) } } else if ((pktype == DataPackCommonKey::DPCK_DATASET_TYPE_OUTPUT) || (pktype == DataPackCommonKey::DPCK_DATASET_TYPE_INPUT)) { if (((st == 0) || (st == DataServiceNodeDefinitionType::DSStorageTypeLive))) { - if (lat > SKIP_OLDER_THAN) { - ERR << data.key << " too old: " << lat << " ms, skipping..."; + if (lat > chaos::common::constants::SkipDatasetOlderThan) { + ERR << data.key << " too old: " << lat/1000.0 << " s, skipping..."; // output too old return; } - data.cd->removeKey(DataServiceNodeDefinitionKey::DS_STORAGE_TYPE); - data.cd->removeKey(DataPackCommonKey::DPCK_DATASET_TYPE); + cd->removeKey(DataServiceNodeDefinitionKey::DS_STORAGE_TYPE); + cd->removeKey(DataPackCommonKey::DPCK_DATASET_TYPE); } } - data.cd->addInt32Value(DataPackCommonKey::NODE_MDS_TIMEDIFF, lat); + cd->addInt32Value(DataPackCommonKey::NODE_MDS_TIMEDIFF, lat); } ChaosStringSetConstSPtr meta_tag_set; - if (data.cd->hasKey(ControlUnitNodeDefinitionKey::CONTROL_UNIT_DATASET_TAG)) { + if (cd->hasKey(ControlUnitNodeDefinitionKey::CONTROL_UNIT_DATASET_TAG)) { ChaosStringSet* tag = new ChaosStringSet(); - tag->insert(data.cd->getStringValue(ControlUnitNodeDefinitionKey::CONTROL_UNIT_DATASET_TAG)); + tag->insert(cd->getStringValue(ControlUnitNodeDefinitionKey::CONTROL_UNIT_DATASET_TAG)); meta_tag_set.reset(tag); } - QueryDataConsumer::consumePutEvent(data.cd->getStringValue(NodeDefinitionKey::NODE_UNIQUE_ID) + datasetTypeToPostfix(pktype), (uint8_t)st, meta_tag_set, *(data.cd.get())); + QueryDataConsumer::consumePutEvent(cd->getStringValue(NodeDefinitionKey::NODE_UNIQUE_ID) + datasetTypeToPostfix(pktype), (uint8_t)st, meta_tag_set, *cd); } } catch (const chaos::CException& e) { ERR << "Chaos Exception caught processing key:" << data.key << " (" << data.off << "," << data.par << ") error:" << e.what(); diff --git a/ChaosMetadataService/api/node/NodeRegister.cpp b/ChaosMetadataService/api/node/NodeRegister.cpp index edcc240bd052863830ed489737898fb3822d55a3..4694b3931490d95e1d5a43e3127098b16d84a71e 100644 --- a/ChaosMetadataService/api/node/NodeRegister.cpp +++ b/ChaosMetadataService/api/node/NodeRegister.cpp @@ -59,7 +59,7 @@ CDWUniquePtr NodeRegister::execute(CDWUniquePtr api_data){ const std::string node_type = api_data->getStringValue(NodeDefinitionKey::NODE_TYPE); if(node_type.compare(NodeType::NODE_TYPE_UNIT_SERVER) == 0) { result = unitServerRegistration(MOVE(api_data)); - } else if(boost::starts_with(node_type, NodeType::NODE_TYPE_CONTROL_UNIT)) { + } else if(boost::starts_with(node_type, NodeType::NODE_TYPE_CONTROL_UNIT)||boost::starts_with(node_type, NodeType::NODE_TYPE_ROOT)) { result = controlUnitRegistration(MOVE(api_data)); } else if(boost::starts_with(node_type, NodeType::NODE_TYPE_AGENT)) { result = agentRegistration(MOVE(api_data)); @@ -78,6 +78,13 @@ CDWUniquePtr NodeRegister::agentRegistration(CDWUniquePtr api_data) { const std::string agent_uid = api_data->getStringValue(NodeDefinitionKey::NODE_UNIQUE_ID); //fetch the unit server data access GET_DATA_ACCESS(AgentDataAccess, a_da, -1) + if(api_data->hasKey(chaos::NodeDefinitionKey::NODE_TIMESTAMP)){ + uint32_t lat=( chaos::common::utility::TimingUtil::getTimeStamp()-api_data->getInt64Value(chaos::NodeDefinitionKey::NODE_TIMESTAMP)); + if(lat>chaos::common::constants::SkipDatasetOlderThan){ + USRA_ERR << "Registration timestamp too old " << agent_uid<<" "<<lat/1000.0<<" sec old"; + return CDWUniquePtr(); + } + } ChaosMetadataService::getInstance()->notifyNewNode(agent_uid); try { @@ -125,7 +132,13 @@ CDWUniquePtr NodeRegister::simpleRegistration(CDWUniquePtr api_data) { alive=ChaosMetadataService::getInstance()->isNodeAlive(node_uid); #endif - + if(api_data->hasKey(chaos::NodeDefinitionKey::NODE_TIMESTAMP)){ + uint32_t lat=( chaos::common::utility::TimingUtil::getTimeStamp()-api_data->getInt64Value(chaos::NodeDefinitionKey::NODE_TIMESTAMP)); + if(lat>chaos::common::constants::SkipDatasetOlderThan){ + USRA_ERR << "Registration timestamp too old " << node_uid<<" "<<lat/1000.0<<" sec old"; + return CDWUniquePtr(); + } + } //we can porceed with uniserver registration USRA_INFO << "Registering NODE: " << node_uid; std::string ttype; @@ -239,6 +252,13 @@ CDWUniquePtr NodeRegister::unitServerRegistration(CDWUniquePtr api_data) { } //we can porceed with uniserver registration const std::string unit_server_alias = api_data->getStringValue(NodeDefinitionKey::NODE_UNIQUE_ID); + if(api_data->hasKey(chaos::NodeDefinitionKey::NODE_TIMESTAMP)){ + uint32_t lat=( chaos::common::utility::TimingUtil::getTimeStamp()-api_data->getInt64Value(chaos::NodeDefinitionKey::NODE_TIMESTAMP)); + if(lat>chaos::common::constants::SkipDatasetOlderThan){ + USRA_ERR << "Registration timestamp too old " << unit_server_alias<<" "<<lat/1000.0<<" sec old"; + return CDWUniquePtr(); + } + } USRA_INFO << "Register unit server " << unit_server_alias; ChaosMetadataService::getInstance()->notifyNewNode(unit_server_alias); @@ -324,7 +344,13 @@ CDWUniquePtr NodeRegister::controlUnitRegistration(CDWUniquePtr api_data) { //allocate datapack for batch command ChaosUniquePtr<chaos::common::data::CDataWrapper> ack_command(new CDataWrapper()); const std::string cu_uid = api_data->getStringValue(NodeDefinitionKey::NODE_UNIQUE_ID); - + if(api_data->hasKey(chaos::NodeDefinitionKey::NODE_TIMESTAMP)){ + uint32_t lat=( chaos::common::utility::TimingUtil::getTimeStamp()-api_data->getInt64Value(chaos::NodeDefinitionKey::NODE_TIMESTAMP)); + if(lat>chaos::common::constants::SkipDatasetOlderThan){ + USRA_ERR << "Registration timestamp too old " << cu_uid<<" "<<lat/1000.0<<" sec old"; + return CDWUniquePtr(); + } + } USRA_INFO << "Register control unit " << cu_uid; ChaosMetadataService::getInstance()->notifyNewNode(cu_uid); //set cu id to the batch command datapack @@ -345,7 +371,7 @@ CDWUniquePtr NodeRegister::controlUnitRegistration(CDWUniquePtr api_data) { } //check if the cu has a parent - //we need to check if the control unit is assocaite to an unit server + //we need to check if the control unit is associated to an unit server if((err = us_da->getUnitserverForControlUnitID(cu_uid, us_host))){ LOG_AND_TROW_FORMATTED(USRA_ERR, -6, "Error searching unit server for control unit %1% with code %2%",%cu_uid%err); diff --git a/ChaosMetadataService/batch/control_unit/RegistrationAckBatchCommand.cpp b/ChaosMetadataService/batch/control_unit/RegistrationAckBatchCommand.cpp index b52a6e6dff089f78aa2a553dfc56120d7821ba37..502ec2b5aed255c67ed462a57b36f882b6706c8e 100644 --- a/ChaosMetadataService/batch/control_unit/RegistrationAckBatchCommand.cpp +++ b/ChaosMetadataService/batch/control_unit/RegistrationAckBatchCommand.cpp @@ -21,8 +21,6 @@ #include "RegistrationAckBatchCommand.h" -#include "../control_unit/IDSTControlUnitBatchCommand.h" - using namespace chaos::common::data; using namespace chaos::common::network; using namespace chaos::metadata_service::batch::control_unit; @@ -47,13 +45,16 @@ RegistrationAckBatchCommand::~RegistrationAckBatchCommand() {} // inherited method void RegistrationAckBatchCommand::setHandler(CDataWrapper *data) { MDSBatchCommand::setHandler(data); - + is_root=false; //set cu id to the batch command datapack if(!data->hasKey(NodeDefinitionKey::NODE_UNIQUE_ID)) throw CException(-1, RegistrationAckBatchCommand_NO_UID, __PRETTY_FUNCTION__); if(!data->hasKey(NodeDefinitionKey::NODE_RPC_ADDR)) throw CException(-2, RegistrationAckBatchCommand_NO_RPC_ADDR, __PRETTY_FUNCTION__); if(!data->hasKey(NodeDefinitionKey::NODE_RPC_DOMAIN)) throw CException(-3, RegistrationAckBatchCommand_NO_RPC_DOM, __PRETTY_FUNCTION__); if(!data->hasKey(MetadataServerNodeDefinitionKeyRPC::PARAM_REGISTER_NODE_RESULT)) throw CException(-4, RegistrationAckBatchCommand_NO_RESULT_FOUND, __PRETTY_FUNCTION__); - + if(data->hasKey(NodeDefinitionKey::NODE_TYPE)&&data->getStringValue(NodeDefinitionKey::NODE_TYPE)==NodeType::NODE_TYPE_ROOT) { + is_root=true; + } + cu_id = data->getStringValue(NodeDefinitionKey::NODE_UNIQUE_ID); unit_server_addr = data->getStringValue(NodeDefinitionKey::NODE_RPC_ADDR); reg_result = data->getInt32Value(MetadataServerNodeDefinitionKeyRPC::PARAM_REGISTER_NODE_RESULT); @@ -88,9 +89,17 @@ void RegistrationAckBatchCommand::ccHandler() { case MESSAGE_PHASE_SENT: { manageRequestPhase(*request); break; + + } case MESSAGE_PHASE_COMPLETED: + if(is_root){ + /* CUCommonUtility::prepareAutoInitAndStartInAutoLoadControlUnit(cu_id, + getDataAccess<mds_data_access::NodeDataAccess>(), + getDataAccess<mds_data_access::ControlUnitDataAccess>(), + getDataAccess<mds_data_access::DataServiceDataAccess>())*/ + } case MESSAGE_PHASE_TIMEOUT: { BC_END_RUNNING_PROPERTY break; diff --git a/ChaosMetadataService/batch/control_unit/RegistrationAckBatchCommand.h b/ChaosMetadataService/batch/control_unit/RegistrationAckBatchCommand.h index 5f88c678b57392387c19a47bf1b8a28eec568537..6d538e5c07078df449188c1920f6444cd68d2204 100644 --- a/ChaosMetadataService/batch/control_unit/RegistrationAckBatchCommand.h +++ b/ChaosMetadataService/batch/control_unit/RegistrationAckBatchCommand.h @@ -37,7 +37,7 @@ namespace chaos { chaos::common::network::CNetworkAddress * control_unit_address; ChaosUniquePtr<RequestInfo> request; - + bool is_root; std::string cu_id; std::string unit_server_addr; int32_t reg_result; diff --git a/ChaosMetadataService/batch/script/LoadInstanceOnUnitServer.cpp b/ChaosMetadataService/batch/script/LoadInstanceOnUnitServer.cpp index b6ca10e5849753559bf1bbd280f12e3e2c224bfe..a9f98f5e6a7b928de89f37ac41f684414bc867ba 100644 --- a/ChaosMetadataService/batch/script/LoadInstanceOnUnitServer.cpp +++ b/ChaosMetadataService/batch/script/LoadInstanceOnUnitServer.cpp @@ -20,8 +20,6 @@ */ #include "LoadInstanceOnUnitServer.h" - -#include "../control_unit/IDSTControlUnitBatchCommand.h" #include "../../common/CUCommonUtility.h" diff --git a/ChaosMetadataService/batch/script/UpdateScriptOnNode.cpp b/ChaosMetadataService/batch/script/UpdateScriptOnNode.cpp index b3e389cbbc2bb3f42703a45a57f1a99dbfc30764..f3d99f08fefc754f3f3310f8c4dd2b88bc587a41 100644 --- a/ChaosMetadataService/batch/script/UpdateScriptOnNode.cpp +++ b/ChaosMetadataService/batch/script/UpdateScriptOnNode.cpp @@ -20,8 +20,6 @@ */ #include "UpdateScriptOnNode.h" - -#include "../control_unit/IDSTControlUnitBatchCommand.h" #include "../../common/CUCommonUtility.h" #include "../../ChaosMetadataService.h" diff --git a/ChaosMetadataService/batch/unit_server/LoadUnloadControlUnit.cpp b/ChaosMetadataService/batch/unit_server/LoadUnloadControlUnit.cpp index 5df9409d299354b00fc5bae45165cb05765739fb..766d8e7a19ae31a4bc64049058fb84f904b69e61 100644 --- a/ChaosMetadataService/batch/unit_server/LoadUnloadControlUnit.cpp +++ b/ChaosMetadataService/batch/unit_server/LoadUnloadControlUnit.cpp @@ -20,8 +20,6 @@ */ #include "LoadUnloadControlUnit.h" - -#include "../control_unit/IDSTControlUnitBatchCommand.h" #include "../../common/CUCommonUtility.h" #include <chaos_service_common/data/data.h> using namespace chaos::common::data; diff --git a/chaos/common/chaos_constants.h b/chaos/common/chaos_constants.h index c60b3ad8bd2eac67c009523bda6d2993d02d6efb..694bd8d8ca15f2dc966b3805d5c5a5613f0e705e 100644 --- a/chaos/common/chaos_constants.h +++ b/chaos/common/chaos_constants.h @@ -190,6 +190,7 @@ static const unsigned int ObjectStorageTimeoutinMSec = 50000; static const unsigned int ChacheTimeoutinMSec = 5000; static const unsigned int MetricCollectorTimeoutinMSec = 1000; static const unsigned int RefreshEndpointMSec = 60000; +static const unsigned int SkipDatasetOlderThan = 5*60000; //!time to wait for queue can accept new data to push in object storage /*! diff --git a/chaos/common/configuration/GlobalConfiguration.cpp b/chaos/common/configuration/GlobalConfiguration.cpp index 46fb7c681056ba2a60a03c42ed128e823e50729f..b3413aea58fc563a3fc2f71df415fae11e2f72ec 100644 --- a/chaos/common/configuration/GlobalConfiguration.cpp +++ b/chaos/common/configuration/GlobalConfiguration.cpp @@ -73,22 +73,27 @@ void GlobalConfiguration::preParseStartupParameters() { addOption(InitOption::OPT_DATA_IO_IMPL, po::value< string >()->default_value(std::string("IODirectIOPSMsgDriver")), "Specify the data io implementation"); addOption(InitOption::OPT_MSG_PRODUCER_KVP, po::value< std::vector<std::string> >(), "K:V message producer options"); addOption(InitOption::OPT_MSG_CONSUMER_KVP, po::value< std::vector<std::string> >(), "K:V message consumer options"); + //disable directio + addOption(InitOption::OPT_DIRECT_IO_IMPLEMENTATION, po::value< string >()->default_value(std::string("")), "Specify the direct io implementation"); + addOption(InitOption::OPT_RPC_IMPLEMENTATION, po::value< string >()->default_value("PSM"), "Specify the rpc implementation"); #else addOption(InitOption::OPT_DATA_IO_IMPL, po::value< string >()->default_value(std::string("IODirectIODriver")), "Specify the data io implementation"); + addOption(InitOption::OPT_DIRECT_IO_IMPLEMENTATION, po::value< string >()->default_value(std::string("ZMQ")), "Specify the direct io implementation"); + addOption(InitOption::OPT_RPC_IMPLEMENTATION, po::value< string >()->default_value("ZMQ"), "Specify the rpc implementation"); + #endif #if ENABLE_ZMQ_MONITOR addOption(InitOption::OPT_ENABLE_ZMQ_MONITOR, po::value< bool >()->default_value(true), "Monitor zmq connections"); #endif - addOption(InitOption::OPT_DIRECT_IO_IMPLEMENTATION, po::value< string >()->default_value(std::string("ZMQ")), "Specify the direct io implementation"); addOption(InitOption::OPT_DIRECT_IO_PRIORITY_SERVER_PORT, po::value<uint32_t>()->default_value(_DIRECT_IO_PRIORITY_PORT), "DirectIO priority server port"); addOption(InitOption::OPT_DIRECT_IO_SERVICE_SERVER_PORT, po::value<uint32_t>()->default_value(_DIRECT_IO_SERVICE_PORT), "DirectIO service server port"); + addOption(InitOption::OPT_DIRECT_IO_SERVER_THREAD_NUMBER, po::value<uint32_t>()->default_value(1),"DirectIO server thread number"); addOption(InitOption::OPT_DIRECT_IO_SERVER_IMPL_KV_PARAM, po::value< std::vector<std::string> >(),"DirectIO implementation key value parameters[k:v]"); addOption(InitOption::OPT_DIRECT_IO_CLIENT_IMPL_KV_PARAM, po::value< std::vector<std::string> >(),"DirectIO implementation key value parameters[k:v]"); addOption(InitOption::OPT_RPC_SYNC_ENABLE, po::value< bool >()->default_value(false), "Enable the sync wrapper to rpc protocol"); - addOption(InitOption::OPT_RPC_IMPLEMENTATION, po::value< string >()->default_value("PSM"), "Specify the rpc implementation"); addOption(InitOption::OPT_RPC_SERVER_PORT, po::value<uint32_t>()->default_value(_RPC_PORT), "RPC server port"); addOption(InitOption::OPT_RPC_SERVER_THREAD_NUMBER, po::value<uint32_t>()->default_value(2),"RPC server thread number"); addOption(InitOption::OPT_RPC_IMPL_KV_PARAM, po::value< std::vector<std::string> >(),"RPC implementation key value parameter[k:v]"); diff --git a/chaos/common/healt_system/HealtManager.cpp b/chaos/common/healt_system/HealtManager.cpp index 28d427ceb857b35541ca5cf2c438107962061724..1586ef33b9e7e5c8f64128fb137b65bf35f6d38c 100644 --- a/chaos/common/healt_system/HealtManager.cpp +++ b/chaos/common/healt_system/HealtManager.cpp @@ -22,8 +22,6 @@ #include <chaos/common/io/IODirectIODriver.h> #include <chaos/common/healt_system/HealtManager.h> #include <chaos/common/configuration/GlobalConfiguration.h> -#include <chaos/cu_toolkit/data_manager/DataManager.h> - #include <chaos/common/io/SharedManagedDirecIoDataDriver.h> #include <boost/format.hpp> @@ -36,7 +34,6 @@ using namespace chaos::common::message; using namespace chaos::common::network; using namespace chaos::common::healt_system; using namespace chaos::common::async_central; -using namespace chaos::cu::data_manager; #define HM_INFO INFO_LOG(HealtManagerBase) #define HM_DBG DBG_LOG(HealtManagerBase) diff --git a/chaos/common/network/NetworkBroker.cpp b/chaos/common/network/NetworkBroker.cpp index 96ce6ffea2b381f6871b4c991f83e6e2c8210fe8..6d6be6061504c2b975126b7c9d836c8b8e48fa41 100644 --- a/chaos/common/network/NetworkBroker.cpp +++ b/chaos/common/network/NetworkBroker.cpp @@ -308,6 +308,7 @@ void NetworkBroker::deinit() { * all part are started */ void NetworkBroker::start() { + MB_LAPP << "Starting.."; if(direct_io_server){ StartableService::startImplementation(direct_io_server, direct_io_server->getName(), __PRETTY_FUNCTION__); } diff --git a/chaos/common/rpc/RpcServer.cpp b/chaos/common/rpc/RpcServer.cpp index cac3b93ea83457285577864b97071bcb3b6a8719..ed9851f9556aef7abd12b59dda31a8d7a592c4c7 100644 --- a/chaos/common/rpc/RpcServer.cpp +++ b/chaos/common/rpc/RpcServer.cpp @@ -19,6 +19,7 @@ * permissions and limitations under the Licence. */ #include <chaos/common/rpc/RpcServer.h> +#include <chaos/common/data/CDataWrapper.h> using namespace chaos; @@ -27,7 +28,19 @@ NamedService(alias), is_psm(false), port_number(0), command_handler(NULL){} +void RpcServer::init(void* c) { + try{ + if(c){ + chaos::common::data::CDataWrapper *cc = static_cast<chaos::common::data::CDataWrapper*>(c); + cfg=cc->clone(); + if(cfg.get()==NULL){ + throw chaos::CException(-1, "a valid configuration must be given", __PRETTY_FUNCTION__); + } + } + } catch(...){ + } + } void RpcServer::setAlternatePortAddress(int new_port_address) { port_number = new_port_address; } diff --git a/chaos/common/rpc/RpcServer.h b/chaos/common/rpc/RpcServer.h index 4f9ff261f608c96caf4d5f659b64089b4ffa71ac..086334e41f3c14e52e12086950e8fbb934e9cfd8 100644 --- a/chaos/common/rpc/RpcServer.h +++ b/chaos/common/rpc/RpcServer.h @@ -26,17 +26,14 @@ #include <string> #include <chaos/common/utility/StartableService.h> #include <chaos/common/utility/NamedService.h> -/* #include <chaos/common/data/CDataWrapper.h> -#include <chaos/common/exception/exception.h> -#include <chaos/common/event/EventHandler.h> -*/ namespace chaos { using namespace std; //forward declaration namespace common { + namespace network { class NetworkBroker; @@ -58,13 +55,13 @@ namespace chaos { //! port where server has been published int port_number; - + chaos::common::data::CDWUniquePtr cfg; chaos::common::rpc::RpcServerHandler *command_handler; /* init the rpc adapter */ - virtual void init(void*) = 0; + virtual void init(void*) ; /* start the rpc adapter diff --git a/chaos/common/rpc/psm/PSMServer.cpp b/chaos/common/rpc/psm/PSMServer.cpp index 2bb42d2b4421f4b8b2e95dc5e7664a65b7dc4796..94c51d81b4286985cce4371da3c7ede9526558a3 100644 --- a/chaos/common/rpc/psm/PSMServer.cpp +++ b/chaos/common/rpc/psm/PSMServer.cpp @@ -48,7 +48,7 @@ PSMServer::~PSMServer() { //init the server getting the configuration value void PSMServer::init(void *init_data) { - CDataWrapper *cfg = reinterpret_cast<CDataWrapper*>(init_data); + RpcServer::init(init_data); PSMS_LAPP << "initialization"; try{ if(!cfg->hasKey(InitOption::OPT_MSG_BROKER_SERVER)){ @@ -88,10 +88,8 @@ void PSMServer::init(void *init_data) { if (cons->applyConfiguration() != 0) { throw chaos::CException(-1, "cannot initialize Publish Subscribe:" + cons->getLastError(), __PRETTY_FUNCTION__); } - if(cfg->hasKey("ismds")){ - PSMS_LAPP << "Subscribing to " <<chaos::common::constants::CHAOS_ADMIN_ADMIN_TOPIC; - cons->subscribe(chaos::common::constants::CHAOS_ADMIN_ADMIN_TOPIC); - } + + } catch (std::exception& e) { throw CException(-2, e.what(), "PSMServer::init"); } catch (...) { @@ -135,7 +133,10 @@ void PSMServer::messageError( chaos::common::message::ele_t& data) { } //start the rpc adapter void PSMServer::start() { - + if(cfg->hasKey("ismds")){ + PSMS_LAPP << "Subscribing to " <<chaos::common::constants::CHAOS_ADMIN_ADMIN_TOPIC; + cons->subscribe(chaos::common::constants::CHAOS_ADMIN_ADMIN_TOPIC); + } PSMS_LAPP << "Subscribing to " << nodeuid + chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX; cons->subscribe(nodeuid + chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX); cons->start(); diff --git a/chaos/common/rpc/psm/PSMServer.h b/chaos/common/rpc/psm/PSMServer.h index 62be49e73af0a376fcc0270ce320fd732f0e9b2a..8802c226e92232f70e16803879313ded2ae14f29 100644 --- a/chaos/common/rpc/psm/PSMServer.h +++ b/chaos/common/rpc/psm/PSMServer.h @@ -47,7 +47,6 @@ namespace chaos { //worker that process request in a separate thread void messageHandler( chaos::common::message::ele_t& data); void messageError( chaos::common::message::ele_t& data); - public: /* diff --git a/chaos/common/rpc/zmq/ZMQServer.cpp b/chaos/common/rpc/zmq/ZMQServer.cpp index 4ff8719cd419457c8287899f8ed42c6237881dbd..1d20f31b269b86aaa39d310b7734fe65ce0e8a35 100644 --- a/chaos/common/rpc/zmq/ZMQServer.cpp +++ b/chaos/common/rpc/zmq/ZMQServer.cpp @@ -61,21 +61,22 @@ ZMQServer::~ZMQServer() { //init the server getting the configuration value void ZMQServer::init(void *init_data) { //get portnumber and thread number - CDataWrapper *adapterConfiguration = reinterpret_cast<CDataWrapper*>(init_data); + RpcServer::init(init_data); + ZMQS_LAPP << "initialization"; try { run_server = true; if(!port_number) { //no one has set alternate port number so use the default - port_number = adapterConfiguration->getInt32Value(InitOption::OPT_RPC_SERVER_PORT); + port_number = cfg->getInt32Value(InitOption::OPT_RPC_SERVER_PORT); } - thread_number = adapterConfiguration->getInt32Value(InitOption::OPT_RPC_SERVER_THREAD_NUMBER); + thread_number = cfg->getInt32Value(InitOption::OPT_RPC_SERVER_THREAD_NUMBER); //bad patch OPT_RPC_DOMAIN_QUEUE_THREAD need to be removed - if(thread_number < adapterConfiguration->getUInt32Value(InitOption::OPT_RPC_DOMAIN_QUEUE_THREAD)) { - thread_number = adapterConfiguration->getUInt32Value(InitOption::OPT_RPC_DOMAIN_QUEUE_THREAD); + if(thread_number < cfg->getUInt32Value(InitOption::OPT_RPC_DOMAIN_QUEUE_THREAD)) { + thread_number = cfg->getUInt32Value(InitOption::OPT_RPC_DOMAIN_QUEUE_THREAD); } ZMQS_LAPP << "port number:" << port_number; ZMQS_LAPP << "worker thread number:" << thread_number; diff --git a/chaos/cu_toolkit/data_manager/DataManager.cpp b/chaos/cu_toolkit/data_manager/DataManager.cpp index e7dbe3ae9c3b49ba97b30a616dd8adbf61a10e60..b5ef3d28b134b1cbbabbb21c313fee3f12952339 100644 --- a/chaos/cu_toolkit/data_manager/DataManager.cpp +++ b/chaos/cu_toolkit/data_manager/DataManager.cpp @@ -63,7 +63,7 @@ void DataManager::deinit() { void DataManager::start() {} CDataWrapper* DataManager::updateConfiguration(CDataWrapper *newConfiguration) { - LAPP_ << "Update Configuraiton for DataManager"; + LAPP_ << "Update Configuration for DataManager"; map<string, KeyDataStorage*>::iterator iter = deviceIDKeyDataStorageMap.begin(); KeyDataStorage *tmpKDS = 0L;