diff --git a/ChaosMetadataService/ChaosMetadataService.cpp b/ChaosMetadataService/ChaosMetadataService.cpp index 652e1c560970f47e5b1acfede0f9bdd6d5ef6337..3553e4fd527d8c4f1ec9dff27ddc5bd48fd42d86 100644 --- a/ChaosMetadataService/ChaosMetadataService.cpp +++ b/ChaosMetadataService/ChaosMetadataService.cpp @@ -75,6 +75,8 @@ ChaosMetadataService::~ChaosMetadataService() {} void ChaosMetadataService::init(int argc, const char* argv[]) { is_present = false; ChaosCommon<ChaosMetadataService>::init(argc, argv); + + } //! stringbuffer parser /* @@ -90,6 +92,16 @@ void ChaosMetadataService::init(istringstream& initStringStream) { */ void ChaosMetadataService::init(void* init_data) { try { + getGlobalConfigurationInstance()->getConfiguration()->addBoolValue("ismds",true); + std::string gname; + if(getGlobalConfigurationInstance()->hasOption(InitOption::OPT_GROUP_NAME)){ + gname=getGlobalConfigurationInstance()->getOption<std::string>(InitOption::OPT_GROUP_NAME); + } + if(gname.size()){ + getGlobalConfigurationInstance()->getConfiguration()->addStringValue(InitOption::OPT_GROUP_NAME,gname); + } else { + getGlobalConfigurationInstance()->getConfiguration()->addStringValue(InitOption::OPT_GROUP_NAME,CDS_GROUP_NAME); + } ChaosCommon<ChaosMetadataService>::init(init_data); if (signal((int)SIGINT, ChaosMetadataService::signalHanlder) == SIG_ERR) { @@ -114,6 +126,7 @@ void ChaosMetadataService::init(void* init_data) { // no cache server provided throw chaos::CException(-4, "No persistence's server list provided", __PRETTY_FUNCTION__); } + if (getGlobalConfigurationInstance()->hasOption(chaos::service_common::persistence::OPT_PERSITENCE_KV_PARAMTER)) { fillKVParameter(setting.persistence_kv_param_map, @@ -189,7 +202,7 @@ void ChaosMetadataService::init(void* init_data) { api_managment_service.init(NULL, __PRETTY_FUNCTION__); #if defined(KAFKA_RDK_ENABLE) || defined(KAFKA_ASIO_ENABLE) #warning "CDS NEEDS KAFKA" - message_consumer.reset(new QueryDataMsgPSConsumer("cds"), "QueryDataMsgPSConsumer"); + message_consumer.reset(new QueryDataMsgPSConsumer(CDS_GROUP_NAME), "QueryDataMsgPSConsumer"); if (!message_consumer.get()) throw chaos::CException(-7, "Error instantiating message data consumer", __PRETTY_FUNCTION__); message_consumer.init(NULL, __PRETTY_FUNCTION__); #endif @@ -233,15 +246,14 @@ void ChaosMetadataService::start() { data_consumer.start(__PRETTY_FUNCTION__); LAPP_ << "\n----------------------------------------------------------------------" << "\n!CHAOS Metadata service started" - << "\nRPC Server address: " << NetworkBroker::getInstance()->getRPCUrl() << "\nDirectIO Server address: " << NetworkBroker::getInstance()->getDirectIOUrl() << CHAOS_FORMAT("\nData Service published with url: %1%|0", % NetworkBroker::getInstance()->getDirectIOUrl()) << "\nTime precision mask: " << std::hex << timePrecisionMask << std::dec << "\n----------------------------------------------------------------------"; + << "\nRPC Server address: " << NetworkBroker::getInstance()->getRPCUrl() << "\nDirectIO Server address: " << NetworkBroker::getInstance()->getDirectIOUrl() << CHAOS_FORMAT("\nData Service published with url: %1%", % NetworkBroker::getInstance()->getDirectIOUrl()) << "\nTime precision mask: " << std::hex << timePrecisionMask << std::dec << "\n----------------------------------------------------------------------"; // register this process on persistence database persistence::data_access::DataServiceDataAccess* ds_da = DriverPoolManager::getInstance()->getPersistenceDataAccess<persistence::data_access::DataServiceDataAccess>(); - std::string unique_uid = NetworkBroker::getInstance()->getRPCUrl(); - LCND_LDBG << "-----REGISTERING ---"; + LCND_LDBG << "-----REGISTERING "<<nodeuid; ds_da->registerNode(setting.ha_zone_name, - unique_uid, + nodeuid, NetworkBroker::getInstance()->getDirectIOUrl(), 0, getBuildInfo(chaos::common::data::CDWUniquePtr())); @@ -252,8 +264,8 @@ void ChaosMetadataService::start() { chaos::common::constants::HBTimersTimeoutinMSec); StartableService::startImplementation(HealtManagerDirect::getInstance(), "HealtManagerDirect", __PRETTY_FUNCTION__); - HealtManagerDirect::getInstance()->addNewNode(unique_uid); - HealtManagerDirect::getInstance()->addNodeMetricValue(unique_uid, + HealtManagerDirect::getInstance()->addNewNode(nodeuid); + HealtManagerDirect::getInstance()->addNodeMetricValue(nodeuid, NodeHealtDefinitionKey::NODE_HEALT_STATUS, NodeHealtDefinitionValue::NODE_HEALT_STATUS_START); #if defined(KAFKA_RDK_ENABLE) || defined(KAFKA_ASIO_ENABLE) @@ -286,21 +298,20 @@ void ChaosMetadataService::start() { void ChaosMetadataService::timeout() { int err = 0; HealthStat service_proc_stat; - const std::string ds_uid = NetworkBroker::getInstance()->getRPCUrl(); persistence::data_access::DataServiceDataAccess* ds_da = DriverPoolManager::getInstance()->getPersistenceDataAccess<persistence::data_access::DataServiceDataAccess>(); persistence::data_access::NodeDataAccess* n_da = DriverPoolManager::getInstance()->getPersistenceDataAccess<persistence::data_access::NodeDataAccess>(); service_proc_stat.mds_received_timestamp = TimingUtil::getTimeStamp(); if (is_present == false) { bool presence = false; - if (n_da->checkNodePresence(presence, ds_uid) != 0) { - LCND_LERR << CHAOS_FORMAT("Error check if this mds [%1%] description is registered", % NetworkBroker::getInstance()->getRPCUrl()); + if (n_da->checkNodePresence(presence, nodeuid) != 0) { + LCND_LERR << CHAOS_FORMAT("Error check if this mds [%1%] description is registered", % nodeuid); // return; } if (presence == false) { // reinsert mds ds_da->registerNode(setting.ha_zone_name, - NetworkBroker::getInstance()->getRPCUrl(), + nodeuid, NetworkBroker::getInstance()->getDirectIOUrl(), 0, getBuildInfo(chaos::common::data::CDWUniquePtr())); @@ -310,7 +321,7 @@ void ChaosMetadataService::timeout() { // update proc stat ProcStatCalculator::update(service_proc_stat); - if ((err = n_da->setNodeHealthStatus(NetworkBroker::getInstance()->getRPCUrl(), + if ((err = n_da->setNodeHealthStatus(nodeuid, service_proc_stat))) { LCND_LERR << CHAOS_FORMAT("error storing health data into database for this mds [%1%]", % NetworkBroker::getInstance()->getRPCUrl()); } diff --git a/ChaosMetadataService/ChaosMetadataService.h b/ChaosMetadataService/ChaosMetadataService.h index 5c3ec224f2786e5fb2902262a8bf148bbe29d58d..f5ef124b7d6b55015506a9c9e550498124251f6d 100644 --- a/ChaosMetadataService/ChaosMetadataService.h +++ b/ChaosMetadataService/ChaosMetadataService.h @@ -38,6 +38,7 @@ #include <chaos/common/thread/WaitSemaphore.h> #include <chaos/common/utility/StartableService.h> #include <chaos/common/utility/ProcStat.h> +#define CDS_GROUP_NAME "cds" namespace chaos { namespace metadata_service { //! Chaos Node Directory base class diff --git a/ChaosMetadataService/QueryDataConsumer.cpp b/ChaosMetadataService/QueryDataConsumer.cpp index dec8d9f5d6c6fe1391c9cfcf3ec0c8b72330c5eb..d87b5b35d82edb588a8ffbd32f8af91d14661ab5 100644 --- a/ChaosMetadataService/QueryDataConsumer.cpp +++ b/ChaosMetadataService/QueryDataConsumer.cpp @@ -66,7 +66,11 @@ QueryDataConsumer::~QueryDataConsumer() {} void QueryDataConsumer::init(void* init_data) { //get new chaos direct io endpoint server_endpoint = NetworkBroker::getInstance()->getDirectIOServerEndpoint(); - if (!server_endpoint) throw chaos::CException(-2, "Invalid server endpoint", __FUNCTION__); + if(server_endpoint==NULL){ + INFO << "DirectIO disabled"; + return; + } +// if (!server_endpoint) throw chaos::CException(-2, "Invalid server endpoint", __FUNCTION__); INFO << "QueryDataConsumer initialized with endpoint " << server_endpoint->getRouteIndex(); INFO << "Allocating DirectIODeviceServerChannel"; diff --git a/ChaosMetadataService/common/CUCommonUtility.cpp b/ChaosMetadataService/common/CUCommonUtility.cpp index 9562994f25d3c13f39ed539066d1d0cc0793f363..1402e8d284a18ce32a3e43b2217b6127f74f18d6 100644 --- a/ChaosMetadataService/common/CUCommonUtility.cpp +++ b/ChaosMetadataService/common/CUCommonUtility.cpp @@ -273,6 +273,7 @@ void CUCommonUtility::addDataServicePack(ChaosUniquePtr<chaos::common::data::CDa //result.reset(new CDataWrapper()); if (data_services.size() == 0) { /// something wrong returning my self + result->appendStringToArray(boost::str(boost::format("%1%|0") % NetworkBroker::getInstance()->getDirectIOUrl())); result->finalizeArrayForKey(chaos::DataServiceNodeDefinitionKey::DS_DIRECT_IO_FULL_ADDRESS_LIST); diff --git a/ChaosMetadataService/main.cpp b/ChaosMetadataService/main.cpp index 904b40e9ac05b3707fdf6d356fa2061acd7e1ec2..f2321c108ceb7660397d09a93daf67ec18713950 100644 --- a/ChaosMetadataService/main.cpp +++ b/ChaosMetadataService/main.cpp @@ -119,6 +119,7 @@ int main(int argc, const char * argv[]) { "Specify the check time (in seconds ) for ageing management (default is one day of delay)", 86400, &ChaosMetadataService::getInstance()->setting.cron_job_ageing_management_repeat_time); + ChaosMetadataService::getInstance()->init(argc, argv); ChaosMetadataService::getInstance()->start(); diff --git a/ChaosMetadataService/mds_constants.h b/ChaosMetadataService/mds_constants.h index 1d12d9625d60fe57c97ce37a9f7e191b3e0d1a29..47c2caf1500a15725a094821b914667d6b77f3bd 100644 --- a/ChaosMetadataService/mds_constants.h +++ b/ChaosMetadataService/mds_constants.h @@ -20,6 +20,7 @@ static const char* OPT_CRON_JOB_AGEING_MANAGEMENT ="cron-job-ageing-management- static const char* OPT_ARCHIVER_NUM ="archiver-instances"; static const char* OPT_ARCHIVER_THREAD ="archiver-thread"; static const char* OPT_ARCHIVER_QUEUE_PUSH_TIMEOUT ="archiver-queue-push-timeout"; + } } #endif diff --git a/chaos/common/ChaosCommon.cpp b/chaos/common/ChaosCommon.cpp index ea4f339580471179701ed60502d5c57c9c93fcd3..1d35309aec47ed318e02f04ecda3bfcbf8fc1409 100644 --- a/chaos/common/ChaosCommon.cpp +++ b/chaos/common/ChaosCommon.cpp @@ -287,7 +287,11 @@ void ChaosAbstractCommon::init(void *init_data) { //initialize the plugin manager chaos::common::utility::InizializableService::initImplementation(chaos::common::plugin::PluginManager::getInstance(), NULL, "PluginManager", __PRETTY_FUNCTION__); } - + if (GlobalConfiguration::getInstance()->hasOption(InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS)) { + nodeuid=GlobalConfiguration::getInstance()->getConfiguration()->getStringValue(InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS); + } else { + nodeuid=NetworkBroker::getInstance()->getRPCUrl(); + } //finally we can register the system rpc api for common uses AbstActionDescShrPtr action_description = addActionDescritionInstance<ChaosAbstractCommon>(this, diff --git a/chaos/common/chaos_constants.h b/chaos/common/chaos_constants.h index 67a627344b0a23996282d4b24b46a3749e1d65cd..8d955840e66e1ab2c98f312eaf0632a1e4c7cdb2 100644 --- a/chaos/common/chaos_constants.h +++ b/chaos/common/chaos_constants.h @@ -133,6 +133,7 @@ static const char* const OPT_MSG_BROKER_DRIVER = "msg-broker-driver"; static const char* const OPT_HA_ZONE_NAME = "ha-zone-name"; static const char* const CONTROL_MANAGER_UNIT_SERVER_ALIAS = "unit-server-alias"; +static const char* OPT_GROUP_NAME ="group-name"; #if ENABLE_ZMQ_MONITOR static const char* const OPT_ENABLE_ZMQ_MONITOR = "zmq-monitor"; @@ -179,6 +180,7 @@ static const unsigned int GlobalDirectIOTimeoutinMSec = 5000; } // namespace common namespace common { namespace constants { +static const char* CHAOS_ADMIN_ADMIN_TOPIC = "CHAOS_ADMIN"; // hearth beat timers static const unsigned int HBTimersTimeoutinMSec = 5000; static const unsigned int AgentTimersTimeoutinMSec = 5000; diff --git a/chaos/common/configuration/GlobalConfiguration.cpp b/chaos/common/configuration/GlobalConfiguration.cpp index a6f2cd97000701ad598bc8a103faac7f8a4d5f9c..fc4028f629b84155a58babb4135ed7aff49cf0f1 100644 --- a/chaos/common/configuration/GlobalConfiguration.cpp +++ b/chaos/common/configuration/GlobalConfiguration.cpp @@ -88,7 +88,7 @@ void GlobalConfiguration::preParseStartupParameters() { addOption(InitOption::OPT_DIRECT_IO_SERVER_IMPL_KV_PARAM, po::value< std::vector<std::string> >(),"DirectIO implementation key value parameters[k:v]"); addOption(InitOption::OPT_DIRECT_IO_CLIENT_IMPL_KV_PARAM, po::value< std::vector<std::string> >(),"DirectIO implementation key value parameters[k:v]"); addOption(InitOption::OPT_RPC_SYNC_ENABLE, po::value< bool >()->default_value(false), "Enable the sync wrapper to rpc protocol"); - addOption(InitOption::OPT_RPC_IMPLEMENTATION, po::value< string >()->default_value("ZMQ"), "Specify the rpc implementation"); + addOption(InitOption::OPT_RPC_IMPLEMENTATION, po::value< string >()->default_value("PSM"), "Specify the rpc implementation"); addOption(InitOption::OPT_RPC_SERVER_PORT, po::value<uint32_t>()->default_value(_RPC_PORT), "RPC server port"); addOption(InitOption::OPT_RPC_SERVER_THREAD_NUMBER, po::value<uint32_t>()->default_value(2),"RPC server thread number"); addOption(InitOption::OPT_RPC_IMPL_KV_PARAM, po::value< std::vector<std::string> >(),"RPC implementation key value parameter[k:v]"); @@ -118,6 +118,8 @@ void GlobalConfiguration::preParseStartupParameters() { #endif addOption(InitOption::OPT_MSG_BROKER_SERVER, po::value< std::string >()->default_value(std::string("localhost:9092")), "Message broker"); addOption(InitOption::OPT_MSG_BROKER_DRIVER, po::value< std::string >()->default_value(std::string("kafka-rdk")), "Message broker driver"); + addOption(InitOption::OPT_GROUP_NAME, po::value< std::string >()->default_value(std::string("")), "Group Name"); + addOption(InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS, po::value< std::string >()/*->default_value(std::string("NONAME"))*/,"UID of the node"); @@ -327,7 +329,9 @@ void GlobalConfiguration::checkDefaultOption() { fillKVParameter(map_kv_param_directio_clnt_impl, directio_clnt_impl_kv_param, ""); } CHECK_AND_DEFINE_OPTION(string, direct_io_server_impl, InitOption::OPT_DIRECT_IO_IMPLEMENTATION) - configuration->addStringValue(common::direct_io::DirectIOConfigurationKey::DIRECT_IO_IMPL_TYPE, direct_io_server_impl); + if(direct_io_server_impl.size()){ + configuration->addStringValue(common::direct_io::DirectIOConfigurationKey::DIRECT_IO_IMPL_TYPE, direct_io_server_impl); + } CHECK_AND_DEFINE_OPTION_WITH_DEFAULT(uint32_t, direct_io_priority_port, InitOption::OPT_DIRECT_IO_PRIORITY_SERVER_PORT, _DIRECT_IO_PRIORITY_PORT); freeFoundPort = InetUtility::scanForLocalFreePort(direct_io_priority_port); diff --git a/chaos/common/io/IODirectIODriver.cpp b/chaos/common/io/IODirectIODriver.cpp index 88f04d5711454ec6c3b282e23690fdd9ac9db3c6..ebe8489fd427c0e7b0d53a2ac0049f01bc118e38 100644 --- a/chaos/common/io/IODirectIODriver.cpp +++ b/chaos/common/io/IODirectIODriver.cpp @@ -88,7 +88,12 @@ void IODirectIODriver::init(void *_init_parameter) { //if(!init_parameter.client_instance) throw CException(-1, "No client configured", __PRETTY_FUNCTION__); init_parameter.endpoint_instance = NetworkBroker::getInstance()->getDirectIOServerEndpoint(); - if(!init_parameter.endpoint_instance) throw CException(-1, "No endpoint configured", __PRETTY_FUNCTION__); + if(!init_parameter.endpoint_instance){ + IODirectIODriver_LINFO_<<"No Directio Endpoint configured"; + return; + //throw CException(-1, "No endpoint configured", __PRETTY_FUNCTION__); + + } //initialize client //InizializableService::initImplementation(init_parameter.client_instance, _init_parameter, init_parameter.client_instance->getName(), __PRETTY_FUNCTION__); diff --git a/chaos/common/network/CNodeNetworkAddress.h b/chaos/common/network/CNodeNetworkAddress.h index d4151044f3c6959f76a1be16d1e05713dd2fdf96..6f598a87a3ccd747ce66ba440216285f32d8719a 100644 --- a/chaos/common/network/CNodeNetworkAddress.h +++ b/chaos/common/network/CNodeNetworkAddress.h @@ -35,12 +35,13 @@ namespace chaos { struct CNetworkAddress { //the ip and port for the host that run the control unit std::string ip_port; + bool isatopic; CNetworkAddress(){}; CNetworkAddress(const CNetworkAddress& cna): ip_port(cna.ip_port){}; - CNetworkAddress(const std::string& _ip_port): - ip_port(_ip_port){}; + CNetworkAddress(const std::string& _ip_port,bool _isatopic=false): + ip_port(_ip_port),isatopic(_isatopic){}; }; //! Represent the abstraction of chaos node id of the chaos virtual node diff --git a/chaos/common/network/NetworkBroker.cpp b/chaos/common/network/NetworkBroker.cpp index b1ddfa875032d889caf3dd21dec7a0d4258c6ca2..96ce6ffea2b381f6871b4c991f83e6e2c8210fe8 100644 --- a/chaos/common/network/NetworkBroker.cpp +++ b/chaos/common/network/NetworkBroker.cpp @@ -45,6 +45,7 @@ #endif #define MB_LAPP LAPP_ << "[NetworkBroker]- " +#define MB_LERR LERR_ << __PRETTY_FUNCTION__<<"- ## [NetworkBroker]- " #define INIT_STEP 0 #define DEINIT_STEP 1 @@ -95,8 +96,9 @@ void NetworkBroker::init(void *initData) { //---------------------------- D I R E C T I/O ---------------------------- if (globalConfiguration->hasKey(common::direct_io::DirectIOConfigurationKey::DIRECT_IO_IMPL_TYPE)) { - MB_LAPP << "Setup DirectIO sublayer"; string direct_io_impl = globalConfiguration->getStringValue(common::direct_io::DirectIOConfigurationKey::DIRECT_IO_IMPL_TYPE); + MB_LAPP << "Setup DirectIO sublayer "<<direct_io_impl; + //construct the rpc server and client name string direct_io_server_impl = direct_io_impl + "DirectIOServer"; direct_io_client_impl = direct_io_impl + "DirectIOClient"; @@ -231,12 +233,15 @@ void NetworkBroker::deinit() { //remove global request domain before delete all, it use internally NetworBroker singleton global_request_domain.reset(); //---------------------------- D I R E C T I/O ---------------------------- - CHAOS_NOT_THROW(InizializableService::deinitImplementation(direct_io_client, direct_io_client->getName(), __PRETTY_FUNCTION__);) - DELETE_OBJ_POINTER(direct_io_client); - - CHAOS_NOT_THROW(StartableService::deinitImplementation(direct_io_server, direct_io_server->getName(), "NetworkBroker::deinit");) + if(direct_io_client){ + CHAOS_NOT_THROW(InizializableService::deinitImplementation(direct_io_client, direct_io_client->getName(), __PRETTY_FUNCTION__);) + DELETE_OBJ_POINTER(direct_io_client); + } + if(direct_io_server){ + CHAOS_NOT_THROW(StartableService::deinitImplementation(direct_io_server, direct_io_server->getName(), "NetworkBroker::deinit");) - DELETE_OBJ_POINTER(direct_io_server); + DELETE_OBJ_POINTER(direct_io_server); + } //---------------------------- D I R E C T I/O ---------------------------- //---------------------------- E V E N T ---------------------------- @@ -303,7 +308,9 @@ void NetworkBroker::deinit() { * all part are started */ void NetworkBroker::start() { - StartableService::startImplementation(direct_io_server, direct_io_server->getName(), __PRETTY_FUNCTION__); + if(direct_io_server){ + StartableService::startImplementation(direct_io_server, direct_io_server->getName(), __PRETTY_FUNCTION__); + } if (!GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)) { StartableService::startImplementation(event_dispatcher, "DefaultEventDispatcher", __PRETTY_FUNCTION__); StartableService::startImplementation(event_server, event_server->getName(), __PRETTY_FUNCTION__); @@ -329,7 +336,9 @@ void NetworkBroker::stop() { CHAOS_NOT_THROW(StartableService::stopImplementation(event_server, event_server->getName(), __PRETTY_FUNCTION__);) CHAOS_NOT_THROW(StartableService::stopImplementation(event_dispatcher, "DefaultEventDispatcher", __PRETTY_FUNCTION__);) } - CHAOS_NOT_THROW(StartableService::stopImplementation(direct_io_server, direct_io_server->getName(), __PRETTY_FUNCTION__);) + if(direct_io_server){ + CHAOS_NOT_THROW(StartableService::stopImplementation(direct_io_server, direct_io_server->getName(), __PRETTY_FUNCTION__);) + } } @@ -347,9 +356,14 @@ int NetworkBroker::getPublishedPort() { */ void NetworkBroker::getPublishedHostAndPort(string &hostAndPort) { CHAOS_ASSERT(rpc_server); - hostAndPort = GlobalConfiguration::getInstance()->getLocalServerAddress(); - hostAndPort.append(":"); - hostAndPort.append(lexical_cast<string>(rpc_server->getPublishedPort())); + if(rpc_server->isps()){ + hostAndPort = rpc_server->getPublishedEndpoint(); + } else { + hostAndPort = GlobalConfiguration::getInstance()->getLocalServerAddress(); + hostAndPort.append(":"); + hostAndPort.append(lexical_cast<string>(rpc_server->getPublishedPort())); + } + } std::string NetworkBroker::getRPCUrl() { @@ -359,7 +373,11 @@ std::string NetworkBroker::getRPCUrl() { } std::string NetworkBroker::getDirectIOUrl() { - CHAOS_ASSERT(rpc_server); + if(direct_io_server==NULL){ + if(rpc_server->isps()){ + return "topic:"+rpc_server->getPublishedEndpoint(); + } + } return direct_io_server->getUrl(); } @@ -607,7 +625,15 @@ MessageChannel *NetworkBroker::getNewMessageChannelForRemoteHost(CNetworkAddress Performe the creation of metadata server */ MDSMessageChannel *NetworkBroker::getMetadataserverMessageChannel(MessageRequestDomainSHRDPtr shared_request_domain) { - MDSMessageChannel *channel = (shared_request_domain.get() == NULL) ? (new MDSMessageChannel(this, GlobalConfiguration::getInstance()->getMetadataServerAddressList(), global_request_domain)) : (new MDSMessageChannel(this, GlobalConfiguration::getInstance()->getMetadataServerAddressList(), shared_request_domain)); + chaos::common::network::VectorNetworkAddress mds; + if(rpc_client->isps()){ + chaos::common::network::CNetworkAddress m(common::constants::CHAOS_ADMIN_ADMIN_TOPIC,true); + mds.push_back(m); + } else { + mds=GlobalConfiguration::getInstance()->getMetadataServerAddressList(); + } + + MDSMessageChannel *channel = (shared_request_domain.get() == NULL) ? (new MDSMessageChannel(this,mds , global_request_domain)) : (new MDSMessageChannel(this, mds, shared_request_domain)); if (channel) { channel->init(); boost::mutex::scoped_lock lock(mutex_map_rpc_channel_acces); @@ -737,7 +763,10 @@ void NetworkBroker::disposeMessageChannel(chaos::common::message::MDSMessageChan //Allocate a new endpoint in the direct io server chaos_direct_io::DirectIOServerEndpoint *NetworkBroker::getDirectIOServerEndpoint() { - CHAOS_ASSERT(direct_io_dispatcher) + if(direct_io_dispatcher==NULL){ + MB_LERR<<"DirectIO server disabled"; + return NULL; + } chaos_direct_io::DirectIOServerEndpoint *result_endpoint = direct_io_dispatcher->getNewEndpoint(); return result_endpoint; } @@ -747,6 +776,10 @@ void NetworkBroker::releaseDirectIOServerEndpoint(chaos_direct_io::DirectIOServe } //Return a new direct io client instance chaos_direct_io::DirectIOClient *NetworkBroker::getSharedDirectIOClientInstance() { + if(direct_io_client==NULL){ + MB_LERR<<"DirectIO client disabled"; + } + return direct_io_client; } diff --git a/chaos/common/rpc/RpcClient.cpp b/chaos/common/rpc/RpcClient.cpp index 5432223a34d6ff8af18b03d25ab9e3c3d99cda9e..5932cfd9b8e8ce287e199905505176e221af3691 100644 --- a/chaos/common/rpc/RpcClient.cpp +++ b/chaos/common/rpc/RpcClient.cpp @@ -36,6 +36,7 @@ using namespace chaos::common::data; */ RpcClient::RpcClient(const std::string& alias): NamedService(alias), +is_psm(false), syncrhonous_call(GlobalConfiguration::getInstance()->getConfiguration()->getBoolValue(InitOption::OPT_RPC_SYNC_ENABLE)), server_handler(NULL){} diff --git a/chaos/common/rpc/RpcClient.h b/chaos/common/rpc/RpcClient.h index 05c633a7124e22034344bff20e002553bde3eaf2..f24f023806d8c8ceb2498bed71d347aa4a5632fe 100644 --- a/chaos/common/rpc/RpcClient.h +++ b/chaos/common/rpc/RpcClient.h @@ -68,7 +68,7 @@ namespace chaos { //! handler to the dispatcher to forward error on data forwarding chaos::common::rpc::RpcServerHandler *server_handler; protected: - + bool is_psm; // is publish subscribe bool syncrhonous_call; void forwadSubmissionResult(NFISharedPtr message_info, @@ -105,6 +105,7 @@ namespace chaos { //! Set dinamically the synchronousRpcFeautres void setSynchronousRPCState(bool _syncrhonous_call); bool getynchronousRPCState()const; + bool isps()const {return is_psm;} }; } #endif diff --git a/chaos/common/rpc/RpcServer.cpp b/chaos/common/rpc/RpcServer.cpp index acf8ddf119a5d99eb523c59835a848918b24cfea..cac3b93ea83457285577864b97071bcb3b6a8719 100644 --- a/chaos/common/rpc/RpcServer.cpp +++ b/chaos/common/rpc/RpcServer.cpp @@ -24,6 +24,7 @@ using namespace chaos; RpcServer::RpcServer(const std::string& alias): NamedService(alias), +is_psm(false), port_number(0), command_handler(NULL){} @@ -37,6 +38,11 @@ void RpcServer::setAlternatePortAddress(int new_port_address) { int RpcServer::getPublishedPort() { return port_number; } +std::string RpcServer::getPublishedEndpoint(){ + std::stringstream ss; + ss<<":"<<port_number; + return ss.str(); +} /* set the command dispatcher associated to the instance of rpc adapter diff --git a/chaos/common/rpc/RpcServer.h b/chaos/common/rpc/RpcServer.h index c3ee11cb17925226304b39d5a2457b9e82bee2e1..4f9ff261f608c96caf4d5f659b64089b4ffa71ac 100644 --- a/chaos/common/rpc/RpcServer.h +++ b/chaos/common/rpc/RpcServer.h @@ -54,6 +54,8 @@ namespace chaos { public common::utility::NamedService { friend class chaos::common::network::NetworkBroker; protected: + bool is_psm; // is publish subscribe + //! port where server has been published int port_number; @@ -83,6 +85,11 @@ namespace chaos { Return the published port */ virtual int getPublishedPort(); + + /*! + Return the published endpoint + */ + virtual std::string getPublishedEndpoint(); /* set the command dispatcher associated to the instance of rpc adapter @@ -96,6 +103,8 @@ namespace chaos { of internal queue message */ virtual uint64_t getMessageQueueSize(); + bool isps()const {return is_psm;} + }; } #endif diff --git a/chaos/common/rpc/psm/PSMClient.cpp b/chaos/common/rpc/psm/PSMClient.cpp index 6f0b3b53f5f3d4bb5bf12161af75e8b1a3bf5c0e..78cbf162d9d8d3e497b883a11cb3ac339a787971 100644 --- a/chaos/common/rpc/psm/PSMClient.cpp +++ b/chaos/common/rpc/psm/PSMClient.cpp @@ -49,6 +49,7 @@ DEFINE_CLASS_FACTORY(PSMClient, RpcClient); PSMClient::PSMClient(const string& alias): RpcClient(alias){ seq_id=0; + is_psm=true; } @@ -80,7 +81,7 @@ void PSMClient::init(void *init_data) { std::string msgbroker = cfg->getStringValue(InitOption::OPT_MSG_BROKER_SERVER); - prod = chaos::common::message::MessagePSDriver::getNewProducerDriver(msgbrokerdrv); + prod = chaos::common::message::MessagePSDriver::getProducerDriver(msgbrokerdrv); PSMC_LAPP << "Initializing producer based on " << msgbroker<<" ("+msgbrokerdrv+")"; prod->addServer(msgbroker); @@ -132,13 +133,16 @@ bool PSMClient::submitMessage(NFISharedPtr forwardInfo, throw CException(0, "No destination in message description", __PRETTY_FUNCTION__); if(!forwardInfo->hasMessage()) throw CException(0, "No message in description", __PRETTY_FUNCTION__); - + if(forwardInfo->message->hasKey(RpcActionDefinitionKey::CS_CMDM_ANSWER_HOST_IP)){ + forwardInfo->message->removeKey(RpcActionDefinitionKey::CS_CMDM_ANSWER_HOST_IP); + forwardInfo->message->addStringValue(RpcActionDefinitionKey::CS_CMDM_ANSWER_HOST_IP,nodeuid); + } forwardInfo->message->addBoolValue(RPC_SYNC_KEY, RpcClient::syncrhonous_call); forwardInfo->message->addInt64Value(RPC_SEQ_KEY, (++seq_id)); - forwardInfo->message->addStringValue(RPC_SRC_UID, nodeuid); + std::string key=forwardInfo->destinationAddr+ chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX; - PSMC_LDBG<<"Sending message to:"<<key<<" msg:"<<forwardInfo->message->getJSONString(); - prod->pushMsg(*forwardInfo->message.get(),key); + PSMC_LDBG<<"Sending message to:"<<forwardInfo->destinationAddr<<" ("<<key<<") msg:"<<forwardInfo->message->getJSONString(); + prod->pushMsgAsync(*forwardInfo->message.get(),key); } catch(CException& ex){ //in this case i need to delete the memory diff --git a/chaos/common/rpc/psm/PSMServer.cpp b/chaos/common/rpc/psm/PSMServer.cpp index baa8f74f66946c0023f5c1b36e230a37b831fffb..09ac8323f1c46cc534a2980c761010fac211ca5a 100644 --- a/chaos/common/rpc/psm/PSMServer.cpp +++ b/chaos/common/rpc/psm/PSMServer.cpp @@ -38,6 +38,7 @@ using namespace chaos::common::data; DEFINE_CLASS_FACTORY(PSMServer, RpcServer); PSMServer::PSMServer(const string& alias): RpcServer(alias){ + is_psm=true; } @@ -64,16 +65,30 @@ void PSMServer::init(void *init_data) { } std::string msgbroker = cfg->getStringValue(InitOption::OPT_MSG_BROKER_SERVER); + std::string gname; + if(cfg->hasKey(InitOption::OPT_GROUP_NAME)){ + gname=cfg->getStringValue(InitOption::OPT_GROUP_NAME); + PSMS_LAPP << "belong to group:\""<<gname<<"\""; + + } + cons = chaos::common::message::MessagePSDriver::getNewConsumerDriver(msgbrokerdrv, gname); + prod = chaos::common::message::MessagePSDriver::getProducerDriver(msgbrokerdrv); - cons = chaos::common::message::MessagePSDriver::getNewConsumerDriver(msgbrokerdrv, ""); if (cons.get() == 0) { throw chaos::CException(-3, "cannot initialize Publish Subscribe Consumer of topic:" + nodeuid + "_cmd", __PRETTY_FUNCTION__); } cons->addServer(msgbroker); + prod->addServer(msgbroker); // subscribe to the queue of commands cons->addHandler(chaos::common::message::MessagePublishSubscribeBase::ONARRIVE, boost::bind(&PSMServer::messageHandler, this, _1)); cons->addHandler(chaos::common::message::MessagePublishSubscribeBase::ONERROR, boost::bind(&PSMServer::messageError, this, _1)); - + if (cons->applyConfiguration() != 0) { + throw chaos::CException(-1, "cannot initialize Publish Subscribe:" + cons->getLastError(), __PRETTY_FUNCTION__); + } + if(cfg->hasKey("ismds")){ + PSMS_LAPP << "Subscribing to " <<chaos::common::constants::CHAOS_ADMIN_ADMIN_TOPIC+ std::string(chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX); + cons->subscribe(chaos::common::constants::CHAOS_ADMIN_ADMIN_TOPIC+ std::string(chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX)); + } } catch (std::exception& e) { throw CException(-2, e.what(), "PSMServer::init"); } catch (...) { @@ -87,8 +102,8 @@ void PSMServer::messageHandler( chaos::common::message::ele_t& data) { if(data.cd->hasKey(RPC_SEQ_KEY)){ seq_id=data.cd->getInt64Value(RPC_SEQ_KEY); } - if(data.cd->hasKey(RPC_SRC_UID)){ - src=data.cd->getInt64Value(RPC_SRC_UID); + if(data.cd->hasKey(RpcActionDefinitionKey::CS_CMDM_ANSWER_HOST_IP)){ + src=data.cd->getStringValue(RPC_SRC_UID); } PSMS_LDBG << "Message Received from node:"<<src<<" seq_id:"<<seq_id << " desc:"<<data.cd->getJSONString(); CDWShrdPtr result_data_pack; @@ -100,9 +115,10 @@ void PSMServer::messageHandler( chaos::common::message::ele_t& data) { } else { result_data_pack = command_handler->dispatchCommand(MOVE(data.cd)); } - if(result_data_pack.get()){ + + if(result_data_pack.get() && src.size()){ PSMS_LDBG << "Something to send back:"<<seq_id << "to node:"<<src<<" desc:"<<result_data_pack->getJSONString(); - + prod->pushMsgAsync(*result_data_pack.get(),src+chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX); } } @@ -110,11 +126,17 @@ void PSMServer::messageError( chaos::common::message::ele_t& data) { PSMS_LERR << "ERROR:"; } - + std::string PSMServer::getPublishedEndpoint(){ + + return nodeuid + chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX; +} //start the rpc adapter void PSMServer::start() { + PSMS_LAPP << "Subscribing to " << nodeuid + chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX; cons->subscribe(nodeuid + chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX); + cons->start(); + prod->start(); } //start the rpc adapter diff --git a/chaos/common/rpc/psm/PSMServer.h b/chaos/common/rpc/psm/PSMServer.h index 64fc6ed36fb884c6f2ddc485781f2617ca2dbff8..62be49e73af0a376fcc0270ce320fd732f0e9b2a 100644 --- a/chaos/common/rpc/psm/PSMServer.h +++ b/chaos/common/rpc/psm/PSMServer.h @@ -39,6 +39,8 @@ namespace chaos { REGISTER_AND_DEFINE_DERIVED_CLASS_FACTORY_HELPER(PSMServer) // publish subscribe chaos::common::message::consumer_uptr_t cons; + chaos::common::message::producer_uptr_t prod; + std::string nodeuid; PSMServer(const std::string& alias); virtual ~PSMServer(); @@ -46,15 +48,6 @@ namespace chaos { void messageHandler( chaos::common::message::ele_t& data); void messageError( chaos::common::message::ele_t& data); - int sendMessage(void *socket, - void *message_data, - size_t message_size, - bool more_to_send); - - int readMessage(void *socket, - std::string& buffer, - bool& has_next, - std::string *peer_ip = NULL); public: /* @@ -78,8 +71,8 @@ namespace chaos { /*! Thread where data is received and managed */ - void executeOnThread(); - }; + std::string getPublishedEndpoint(); + }; } #endif diff --git a/chaos/cu_toolkit/ChaosCUToolkit.cpp b/chaos/cu_toolkit/ChaosCUToolkit.cpp index 0d9ff069afb22ebe8eb25a6dcfbc9212c7c97cbe..d2bd8c25058b4d67bc115b1ccb4cd47b90601241 100644 --- a/chaos/cu_toolkit/ChaosCUToolkit.cpp +++ b/chaos/cu_toolkit/ChaosCUToolkit.cpp @@ -180,7 +180,7 @@ void ChaosCUToolkit::start() { StartableService::startImplementation(ControlManager::getInstance(), "ControlManager", "ChaosCUToolkit::start"); LAPP_ << "-----------------------------------------"; LAPP_ << "!CHAOS Control Unit System Started"; - LAPP_ << "RPC Server address: " << CommandManager::getInstance()->broker->getRPCUrl(); + LAPP_ << "Server Endpoint: " << CommandManager::getInstance()->broker->getRPCUrl(); LAPP_ << "DirectIO Server address: " << CommandManager::getInstance()->broker->getDirectIOUrl(); LAPP_ << "-----------------------------------------"; //at this point i must with for end signal