diff --git a/ChaosMetadataService/QueryDataMsgPSConsumer.cpp b/ChaosMetadataService/QueryDataMsgPSConsumer.cpp index c1e7c91bb32fb9f7a4524dc15939ed3c0a19a1c8..39e99de7900ec5ecbc3d6dd6acce97dd8ae7dc60 100644 --- a/ChaosMetadataService/QueryDataMsgPSConsumer.cpp +++ b/ChaosMetadataService/QueryDataMsgPSConsumer.cpp @@ -60,7 +60,7 @@ QueryDataMsgPSConsumer::QueryDataMsgPSConsumer(const std::string& id) cons = chaos::common::message::MessagePSDriver::getConsumerDriver(msgbrokerdrv, groupid); } -void QueryDataMsgPSConsumer::messageHandler(const chaos::common::message::ele_t& data) { +void QueryDataMsgPSConsumer::messageHandler( chaos::common::message::ele_t& data) { try { ChaosStringSetConstSPtr meta_tag_set; @@ -103,7 +103,8 @@ void QueryDataMsgPSConsumer::messageHandler(const chaos::common::message::ele_t& } // DBG<<"Queue:"<<CObjectProcessingPriorityQueue<CDataWrapper>::queueSize()<<" LOG:"<<data.cd->getJSONString(); if(CObjectProcessingPriorityQueue<CDataWrapper>::queueSize()<MAX_LOG_QUEUE){ - CObjectProcessingPriorityQueue<CDataWrapper>::push(data.cd,0); + CDWShrdPtr ptr(data.cd.release()); + CObjectProcessingPriorityQueue<CDataWrapper>::push(ptr,0); } else { ERR<<kp<<"] too many logs on queue for DB:"<<CObjectProcessingPriorityQueue<CDataWrapper>::queueSize(); return; @@ -150,7 +151,7 @@ void QueryDataMsgPSConsumer::messageHandler(const chaos::common::message::ele_t& } -void QueryDataMsgPSConsumer::messageError(const chaos::common::message::ele_t& data) { +void QueryDataMsgPSConsumer::messageError( chaos::common::message::ele_t& data) { ChaosStringSetConstSPtr meta_tag_set; boost::mutex::scoped_lock ll(map_m); std::string path=data.key; diff --git a/ChaosMetadataService/QueryDataMsgPSConsumer.h b/ChaosMetadataService/QueryDataMsgPSConsumer.h index ec48b95734fa6b66f0f5b1965297eb10754b23ce..d55708c70edd5992133ef2323719c0be76a7e5dc 100644 --- a/ChaosMetadataService/QueryDataMsgPSConsumer.h +++ b/ChaosMetadataService/QueryDataMsgPSConsumer.h @@ -37,8 +37,8 @@ class QueryDataMsgPSConsumer : public QueryDataConsumer,protected chaos::common: static std::map<std::string, uint64_t> alive_map; chaos::common::message::consumer_uptr_t cons; static boost::mutex map_m; - void messageHandler(const chaos::common::message::ele_t& data); - void messageError(const chaos::common::message::ele_t& data); + void messageHandler( chaos::common::message::ele_t& data); + void messageError( chaos::common::message::ele_t& data); int subscribe_retry; //---------------- DirectIODeviceServerChannelHandler ----------------------- void processBufferElement(chaos::common::data::CDWShrdPtr log_entry); diff --git a/chaos/common/CMakeLists.txt b/chaos/common/CMakeLists.txt index a85c6b51e22568f0e43588f1e887bec4335b989a..bd032575066515ebfb02e3f9d68fe9dde44a425f 100644 --- a/chaos/common/CMakeLists.txt +++ b/chaos/common/CMakeLists.txt @@ -497,7 +497,10 @@ SET(common_lib_src ${common_lib_src} rpc/RpcClient.cpp rpc/RpcServer.cpp rpc/zmq/ZMQClient.cpp - rpc/zmq/ZMQServer.cpp) + rpc/zmq/ZMQServer.cpp + rpc/psm/PSMClient.cpp + rpc/psm/PSMServer.cpp + ) SET(common_lib_src ${common_lib_src} utility/NamedService.cpp utility/StartableService.cpp diff --git a/chaos/common/configuration/GlobalConfiguration.cpp b/chaos/common/configuration/GlobalConfiguration.cpp index 0b4ac3fc65c963b6f21ef38e1eb84ec53973119e..a6f2cd97000701ad598bc8a103faac7f8a4d5f9c 100644 --- a/chaos/common/configuration/GlobalConfiguration.cpp +++ b/chaos/common/configuration/GlobalConfiguration.cpp @@ -235,6 +235,12 @@ void GlobalConfiguration::parseParameter(const po::basic_parsed_options<char>& o //check the default option checkDefaultOption(); } +#define CHECK_AND_DEFINE_CONFIG_OPTION(t,y)\ +{t x;\ +if(hasOption(y)){\ +x = getOption<t>(y);\ +configuration->append(y,x);}} + void GlobalConfiguration::checkDefaultOption() { configuration.reset(new CDataWrapper()); //now we can fill the gloabl configuration @@ -257,9 +263,6 @@ void GlobalConfiguration::checkDefaultOption() { CHECK_AND_DEFINE_OPTION(string, logFilePath, InitOption::OPT_LOG_FILE); configuration->addStringValue(InitOption::OPT_LOG_FILE, logFilePath); - CHECK_AND_DEFINE_OPTION(string, nodeDesc, InitOption::OPT_NODE_DESC); - configuration->addStringValue(InitOption::OPT_NODE_DESC, nodeDesc); - CHECK_AND_DEFINE_OPTION(string, logLevel, InitOption::OPT_LOG_LEVEL) configuration->addInt32Value(InitOption::OPT_LOG_LEVEL, filterLogLevel(logLevel)); @@ -283,7 +286,11 @@ void GlobalConfiguration::checkDefaultOption() { CHECK_AND_DEFINE_OPTION(string, rpcImpl, InitOption::OPT_RPC_IMPLEMENTATION) configuration->addStringValue(InitOption::OPT_RPC_IMPLEMENTATION, rpcImpl); - + + CHECK_AND_DEFINE_CONFIG_OPTION(std::string,InitOption::OPT_MSG_BROKER_SERVER); + CHECK_AND_DEFINE_CONFIG_OPTION(std::string,InitOption::OPT_NODE_DESC); + CHECK_AND_DEFINE_CONFIG_OPTION(std::string,chaos::InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS); + CHECK_AND_DEFINE_CONFIG_OPTION(std::string,InitOption::OPT_MSG_BROKER_DRIVER); CHECK_AND_DEFINE_OPTION(bool, OPT_RPC_SYNC_ENABLE, InitOption::OPT_RPC_SYNC_ENABLE) else{ OPT_RPC_SYNC_ENABLE = false; @@ -537,6 +544,10 @@ string GlobalConfiguration::getLocalServerAddress() { std::string GlobalConfiguration::getDesc(){ return configuration->getStringValue(chaos::InitOption::OPT_NODE_DESC); +} +std::string GlobalConfiguration::getNodeUID(){ + return configuration->getStringValue(chaos::InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS); + } /* diff --git a/chaos/common/configuration/GlobalConfiguration.h b/chaos/common/configuration/GlobalConfiguration.h index bcb22fe5469f1b825d1ab2f077a80658eebc454e..eab2b35895af419b880a4a0327c3fe832f36d4a0 100644 --- a/chaos/common/configuration/GlobalConfiguration.h +++ b/chaos/common/configuration/GlobalConfiguration.h @@ -72,6 +72,8 @@ t x;\ if(hasOption(y)){\ x = getOption<t>(y);\ } + + #define CHECK_AND_DEFINE_BOOL_ZERO_TOKEN_OPTION(x,y)\ bool x;\ @@ -299,6 +301,8 @@ x = hasOption(y); //!return the optional description of the node std::string getDesc(); + //!return the nodeuid + std::string getNodeUID(); /* return the address of metadataserver */ diff --git a/chaos/common/io/IODirectIOPSMsgDriver.cpp b/chaos/common/io/IODirectIOPSMsgDriver.cpp index d6de2541525ed3b826f88d589560b5f733797398..9bbfe02cd338c2ff7cc324bdfa612716bf95a1f5 100644 --- a/chaos/common/io/IODirectIOPSMsgDriver.cpp +++ b/chaos/common/io/IODirectIOPSMsgDriver.cpp @@ -52,10 +52,8 @@ IODirectIOPSMsgDriver::IODirectIOPSMsgDriver(const std::string& alias) msgbrokerdrv = GlobalConfiguration::getInstance()->getOption<std::string>(InitOption::OPT_MSG_BROKER_DRIVER); prod = chaos::common::message::MessagePSDriver::getProducerDriver(msgbrokerdrv); - std::string gid; - if (GlobalConfiguration::getInstance()->hasOption(InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS)) { - gid = GlobalConfiguration::getInstance()->getOption<std::string>(InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS); - } + std::string gid=GlobalConfiguration::getInstance()->getNodeUID(); + if (gid == "") { gid = "IODirectIODriver"; } @@ -87,7 +85,7 @@ void IODirectIOPSMsgDriver::init(void* _init_parameter) { prod->start(); } } -void IODirectIOPSMsgDriver::defaultHandler(const chaos::common::message::ele_t& data) { +void IODirectIOPSMsgDriver::defaultHandler( chaos::common::message::ele_t& data) { std::map<std::string, chaos::common::message::msgHandler>::iterator i, end; { boost::mutex::scoped_lock ll(hmutex); diff --git a/chaos/common/io/IODirectIOPSMsgDriver.h b/chaos/common/io/IODirectIOPSMsgDriver.h index b2058a0bd39ef7b9f02c6511544266a269eea13a..4eeb0a00b96674050d7a422f202893e1019b496c 100644 --- a/chaos/common/io/IODirectIOPSMsgDriver.h +++ b/chaos/common/io/IODirectIOPSMsgDriver.h @@ -52,7 +52,7 @@ namespace chaos{ std::string msgbrokerdrv; chaos::common::message::producer_uptr_t prod; chaos::common::message::consumer_uptr_t cons; - void defaultHandler(const chaos::common::message::ele_t& data); + void defaultHandler( chaos::common::message::ele_t& data); boost::mutex hmutex; std::map<std::string,chaos::common::message::msgHandler> handler_map; ChaosSharedMutex map_query_future_mutex; diff --git a/chaos/common/message/MessagePublishSubscribeBase.h b/chaos/common/message/MessagePublishSubscribeBase.h index e39f17e4f9c48289ef9957ed610a2aa486ed03ec..5e1eee313051c1384eb45bcf9327d1eab4ced2ce 100644 --- a/chaos/common/message/MessagePublishSubscribeBase.h +++ b/chaos/common/message/MessagePublishSubscribeBase.h @@ -12,8 +12,7 @@ namespace chaos { namespace common { namespace message { - - typedef struct ele {std::string key;uint32_t off;uint32_t par;chaos::common::data::CDWShrdPtr cd;} ele_t; + typedef struct ele {std::string key;uint32_t off;uint32_t par;chaos::common::data::CDWUniquePtr cd;} ele_t; typedef boost::lockfree::queue<ele_t*> msg_queue_t; typedef ChaosUniquePtr<ele_t> ele_uptr_t; //typedef std::vector<chaos::common::data::CDWShrdPtr> msg_queue_t; @@ -24,7 +23,7 @@ namespace chaos { }; */ - typedef boost::function<void(const chaos::common::message::ele_t&)> msgHandler; + typedef boost::function<void(chaos::common::message::ele_t&)> msgHandler; class MessagePublishSubscribeBase { diff --git a/chaos/common/message/impl/kafka/rdk/MessagePSRDKafkaConsumer.cpp b/chaos/common/message/impl/kafka/rdk/MessagePSRDKafkaConsumer.cpp index 6c80a5e72e7b47f9f915a4a1a7b1a68103d69256..ce53bba5e9756e87ce5db3184ea2991229a21ee9 100644 --- a/chaos/common/message/impl/kafka/rdk/MessagePSRDKafkaConsumer.cpp +++ b/chaos/common/message/impl/kafka/rdk/MessagePSRDKafkaConsumer.cpp @@ -215,7 +215,7 @@ void MessagePSRDKafkaConsumer::poll() { d.key = rd_kafka_topic_name(rkm->rkt); d.off = rkm->offset; d.par = rkm->partition; - d.cd = chaos::common::data::CDWShrdPtr(new chaos::common::data::CDataWrapper()); + d.cd = chaos::common::data::CDWUniquePtr(new chaos::common::data::CDataWrapper()); d.cd->addStringValue("msg", errstr); d.cd->addInt32Value("err", rkm->err); handlers[ONERROR](d); @@ -243,7 +243,7 @@ void MessagePSRDKafkaConsumer::poll() { d.off = rkm->offset; d.par = rkm->partition; try { - d.cd = chaos::common::data::CDWShrdPtr(new chaos::common::data::CDataWrapper((const char*)rkm->payload, rkm->len)); + d.cd = chaos::common::data::CDWUniquePtr(new chaos::common::data::CDataWrapper((const char*)rkm->payload, rkm->len)); } catch (chaos::CException& e) { stats.errs++; std::stringstream ss; @@ -256,7 +256,7 @@ void MessagePSRDKafkaConsumer::poll() { d.off = rkm->offset; d.par = rkm->partition; - d.cd = chaos::common::data::CDWShrdPtr(new chaos::common::data::CDataWrapper()); + d.cd = chaos::common::data::CDWUniquePtr(new chaos::common::data::CDataWrapper()); d.cd->addStringValue("msg",ss.str()); d.cd->addInt32Value("err",-1); @@ -274,7 +274,7 @@ void MessagePSRDKafkaConsumer::poll() { ele->par = rkm->partition; try { - ele->cd = chaos::common::data::CDWShrdPtr(new chaos::common::data::CDataWrapper((const char*)rkm->payload, rkm->len)); + ele->cd = chaos::common::data::CDWUniquePtr(new chaos::common::data::CDataWrapper((const char*)rkm->payload, rkm->len)); } catch (chaos::CException& e) { stats.errs++; std::stringstream ss; @@ -286,7 +286,7 @@ void MessagePSRDKafkaConsumer::poll() { d.key = rd_kafka_topic_name(rkm->rkt); d.off = rkm->offset; d.par = rkm->partition; - d.cd = chaos::common::data::CDWShrdPtr(new chaos::common::data::CDataWrapper()); + d.cd = chaos::common::data::CDWUniquePtr(new chaos::common::data::CDataWrapper()); d.cd->addStringValue("msg",ss.str()); d.cd->addInt32Value("err",-1); diff --git a/chaos/common/network/NetworkBroker.cpp b/chaos/common/network/NetworkBroker.cpp index ccd8481e34cc41afe7be39d322a2e16bf47ffd32..b1ddfa875032d889caf3dd21dec7a0d4258c6ca2 100644 --- a/chaos/common/network/NetworkBroker.cpp +++ b/chaos/common/network/NetworkBroker.cpp @@ -18,36 +18,35 @@ * See the Licence for the specific language governing * permissions and limitations under the Licence. */ -#include <chaos/common/global.h> -#include <boost/lexical_cast.hpp> +#include <chaos/common/dispatcher/AbstractCommandDispatcher.h> +#include <chaos/common/dispatcher/AbstractEventDispatcher.h> #include <chaos/common/event/event.h> -#include <chaos/common/utility/InetUtility.h> -#include <chaos/common/network/NetworkBroker.h> -#include <chaos/common/message/MessageChannel.h> -#include <chaos/common/message/MDSMessageChannel.h> +#include <chaos/common/global.h> #include <chaos/common/message/DeviceMessageChannel.h> -#include <chaos/common/utility/ObjectFactoryRegister.h> +#include <chaos/common/message/MDSMessageChannel.h> +#include <chaos/common/message/MessageChannel.h> #include <chaos/common/message/MessageRequestDomain.h> -#include <chaos/common/message/PerformanceNodeChannel.h> -#include <chaos/common/dispatcher/AbstractEventDispatcher.h> #include <chaos/common/message/MultiAddressMessageChannel.h> -#include <chaos/common/dispatcher/AbstractCommandDispatcher.h> -#include <chaos/common/rpc/RpcServer.h> +#include <chaos/common/message/PerformanceNodeChannel.h> +#include <chaos/common/network/NetworkBroker.h> #include <chaos/common/rpc/RpcClient.h> +#include <chaos/common/rpc/RpcServer.h> +#include <chaos/common/utility/InetUtility.h> +#include <chaos/common/utility/ObjectFactoryRegister.h> +#include <boost/lexical_cast.hpp> #include <chaos/common/direct_io/DirectIO.h> - //-----------for metric collection--------- #if (CHAOS_PROMETHEUS) +#include <chaos/common/direct_io/DirectIODispatcherMetricCollector.h> #include <chaos/common/rpc/RpcClientMetricCollector.h> #include <chaos/common/rpc/RpcServerMetricCollector.h> -#include <chaos/common/direct_io/DirectIODispatcherMetricCollector.h> #endif #define MB_LAPP LAPP_ << "[NetworkBroker]- " -#define INIT_STEP 0 +#define INIT_STEP 0 #define DEINIT_STEP 1 using namespace chaos; using namespace chaos::common::event; @@ -59,24 +58,25 @@ using namespace chaos::common::message; /*! */ -NetworkBroker::NetworkBroker(): -//performance_session_managment(this), -event_client(NULL), -event_server(NULL), -event_dispatcher(NULL), -rpc_server(NULL), -rpc_client(NULL), -rpc_dispatcher(NULL), -direct_io_dispatcher(NULL), -direct_io_server(NULL), -direct_io_client(NULL), -global_request_domain(){} +NetworkBroker::NetworkBroker() + : //performance_session_managment(this), + event_client(NULL) + , event_server(NULL) + , event_dispatcher(NULL) + , rpc_server(NULL) + , rpc_client(NULL) + , rpc_dispatcher(NULL) + , direct_io_dispatcher(NULL) + , direct_io_server(NULL) + , direct_io_client(NULL) + , usepsbroker(false) + , global_request_domain() {} /*! */ NetworkBroker::~NetworkBroker() { - CHAOS_NOT_THROW(stop();); - CHAOS_NOT_THROW(deinit();); + CHAOS_NOT_THROW(stop();); + CHAOS_NOT_THROW(deinit();); } /*! @@ -84,304 +84,283 @@ NetworkBroker::~NetworkBroker() { * for the rpc client and server and for the dispatcher. All these are here initialized */ void NetworkBroker::init(void *initData) { - MB_LAPP << "Init phase"; - //get global configuration reference - CDataWrapper *globalConfiguration = GlobalConfiguration::getInstance()->getConfiguration(); - - - if(!globalConfiguration) { - throw CException(-1, "No global configuration found", __PRETTY_FUNCTION__); - } - MB_LAPP << "Configuration:"<<globalConfiguration->getCompliantJSONString(); - if (GlobalConfiguration::getInstance()->getConfiguration()->hasKey(InitOption::OPT_MSG_BROKER_SERVER)) { - std::string msgbrokerdrv = "kafka-rdk"; - msgbrokerdrv = GlobalConfiguration::getInstance()->getOption<std::string>(InitOption::OPT_MSG_BROKER_DRIVER); - - std::string msgbroker = GlobalConfiguration::getInstance()->getConfiguration()->getStringValue(InitOption::OPT_MSG_BROKER_SERVER); - MB_LAPP << "Initializing producer/consumer based on "<<msgbroker; - - prod=chaos::common::message::MessagePSDriver::getNewProducerDriver(msgbrokerdrv); - prod->addServer(msgbroker); - - if (prod->applyConfiguration() != 0) { - throw chaos::CException(-1, "cannot initialize Publish Subscribe Producer:" + prod->getLastError(), __PRETTY_FUNCTION__); - } - prod->start(); - cons=chaos::common::message::MessagePSDriver::getNewConsumerDriver(msgbrokerdrv,""); - cons->addServer(msgbroker); + MB_LAPP << "Init phase"; + //get global configuration reference + CDataWrapper *globalConfiguration = GlobalConfiguration::getInstance()->getConfiguration(); + if (!globalConfiguration) { + throw CException(-1, "No global configuration found", __PRETTY_FUNCTION__); } + MB_LAPP << "Configuration:" << globalConfiguration->getCompliantJSONString(); + //---------------------------- D I R E C T I/O ---------------------------- - if(globalConfiguration->hasKey(common::direct_io::DirectIOConfigurationKey::DIRECT_IO_IMPL_TYPE)) { - MB_LAPP << "Setup DirectIO sublayer"; - string direct_io_impl = globalConfiguration->getStringValue(common::direct_io::DirectIOConfigurationKey::DIRECT_IO_IMPL_TYPE); - //construct the rpc server and client name - string direct_io_server_impl = direct_io_impl+"DirectIOServer"; - direct_io_client_impl = direct_io_impl + "DirectIOClient"; - MB_LAPP << "Trying to initialize DirectIO Server: " << direct_io_server_impl; - direct_io_server = ObjectFactoryRegister<common::direct_io::DirectIOServer>::getInstance()->getNewInstanceByName(direct_io_server_impl); - if(!direct_io_server) throw CException(-2, "Error creating direct io server implementation:"+direct_io_server_impl, __PRETTY_FUNCTION__); - - //allocate the dispatcher - MB_LAPP << "Allocate DirectIODispatcher"; + if (globalConfiguration->hasKey(common::direct_io::DirectIOConfigurationKey::DIRECT_IO_IMPL_TYPE)) { + MB_LAPP << "Setup DirectIO sublayer"; + string direct_io_impl = globalConfiguration->getStringValue(common::direct_io::DirectIOConfigurationKey::DIRECT_IO_IMPL_TYPE); + //construct the rpc server and client name + string direct_io_server_impl = direct_io_impl + "DirectIOServer"; + direct_io_client_impl = direct_io_impl + "DirectIOClient"; + MB_LAPP << "Trying to initialize DirectIO Server: " << direct_io_server_impl; + direct_io_server = ObjectFactoryRegister<common::direct_io::DirectIOServer>::getInstance()->getNewInstanceByName(direct_io_server_impl); + if (!direct_io_server) throw CException(-2, "Error creating direct io server implementation:" + direct_io_server_impl, __PRETTY_FUNCTION__); + + //allocate the dispatcher + MB_LAPP << "Allocate DirectIODispatcher"; #if CHAOS_PROMETHEUS - // install metric awahre dispatcher subclass - direct_io_dispatcher = new chaos::common::direct_io::DirectIODispatcherMetricCollector(); + // install metric awahre dispatcher subclass + direct_io_dispatcher = new chaos::common::direct_io::DirectIODispatcherMetricCollector(); #else - direct_io_dispatcher = new common::direct_io::DirectIODispatcher(); + direct_io_dispatcher = new common::direct_io::DirectIODispatcher(); #endif - direct_io_server->setHandler(direct_io_dispatcher); - //initialize direct io server - StartableService::initImplementation(direct_io_server, static_cast<void*>(globalConfiguration), direct_io_server->getName(), __PRETTY_FUNCTION__); - - //init the my_ip variable for all client - // common::direct_io::DirectIOClientConnection::my_str_ip = GlobalConfiguration::getInstance()->getLocalServerAddress(); - // common::direct_io::DirectIOClientConnection::my_i64_ip = STRIP_TO_UI64(common::direct_io::DirectIOClientConnection::my_str_ip).to_ulong(); - - direct_io_client = ObjectFactoryRegister<common::direct_io::DirectIOClient>::getInstance()->getNewInstanceByName(direct_io_client_impl); - if(!direct_io_client) throw CException(-3, "Error creating direct io client implementation:"+direct_io_client_impl, __PRETTY_FUNCTION__); - - //initialize direct io client - InizializableService::initImplementation(direct_io_client, static_cast<void*>(globalConfiguration), direct_io_client->getName(), __PRETTY_FUNCTION__); - + direct_io_server->setHandler(direct_io_dispatcher); + //initialize direct io server + StartableService::initImplementation(direct_io_server, static_cast<void *>(globalConfiguration), direct_io_server->getName(), __PRETTY_FUNCTION__); + + //init the my_ip variable for all client + // common::direct_io::DirectIOClientConnection::my_str_ip = GlobalConfiguration::getInstance()->getLocalServerAddress(); + // common::direct_io::DirectIOClientConnection::my_i64_ip = STRIP_TO_UI64(common::direct_io::DirectIOClientConnection::my_str_ip).to_ulong(); + + direct_io_client = ObjectFactoryRegister<common::direct_io::DirectIOClient>::getInstance()->getNewInstanceByName(direct_io_client_impl); + if (!direct_io_client) throw CException(-3, "Error creating direct io client implementation:" + direct_io_client_impl, __PRETTY_FUNCTION__); + + //initialize direct io client + InizializableService::initImplementation(direct_io_client, static_cast<void *>(globalConfiguration), direct_io_client->getName(), __PRETTY_FUNCTION__); } - + //---------------------------- D I R E C T I/O ---------------------------- - + //---------------------------- E V E N T ---------------------------- - if(!GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)) { - MB_LAPP << "Setup Event sublayer"; - event_dispatcher = ObjectFactoryRegister<AbstractEventDispatcher>::getInstance()->getNewInstanceByName("DefaultEventDispatcher"); - if(!event_dispatcher) - throw CException(-4, "Event dispatcher implementation not found", __PRETTY_FUNCTION__); - - if(!StartableService::initImplementation(event_dispatcher, static_cast<void*>(globalConfiguration), "DefaultEventDispatcher", __PRETTY_FUNCTION__)) - throw CException(-5, "Event dispatcher has not been initialized due an error", __PRETTY_FUNCTION__); - - - string event_adapter_type = globalConfiguration->getStringValue(event::EventConfiguration::OPTION_KEY_EVENT_ADAPTER_IMPLEMENTATION); - //construct the rpc server and client name - string event_server_name = event_adapter_type+"EventServer"; - string event_client_name = event_adapter_type+"EventClient"; - - MB_LAPP << "Trying to initilize Event Server: " << event_server_name; - event_server = ObjectFactoryRegister<EventServer>::getInstance()->getNewInstanceByName(event_server_name); - if(StartableService::initImplementation(event_server, static_cast<void*>(globalConfiguration), event_server->getName(), __PRETTY_FUNCTION__)){ - //register the root handler on event server - event_server->setEventHanlder(event_dispatcher); - } - - MB_LAPP << "Trying to initilize Event Client: " << event_client_name; - event_client = ObjectFactoryRegister<EventClient>::getInstance()->getNewInstanceByName(event_client_name); - StartableService::initImplementation(event_client, static_cast<void*>(globalConfiguration), event_client->getName(), __PRETTY_FUNCTION__); + if (!GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)) { + MB_LAPP << "Setup Event sublayer"; + event_dispatcher = ObjectFactoryRegister<AbstractEventDispatcher>::getInstance()->getNewInstanceByName("DefaultEventDispatcher"); + if (!event_dispatcher) + throw CException(-4, "Event dispatcher implementation not found", __PRETTY_FUNCTION__); + + if (!StartableService::initImplementation(event_dispatcher, static_cast<void *>(globalConfiguration), "DefaultEventDispatcher", __PRETTY_FUNCTION__)) + throw CException(-5, "Event dispatcher has not been initialized due an error", __PRETTY_FUNCTION__); + + string event_adapter_type = globalConfiguration->getStringValue(event::EventConfiguration::OPTION_KEY_EVENT_ADAPTER_IMPLEMENTATION); + //construct the rpc server and client name + string event_server_name = event_adapter_type + "EventServer"; + string event_client_name = event_adapter_type + "EventClient"; + + MB_LAPP << "Trying to initilize Event Server: " << event_server_name; + event_server = ObjectFactoryRegister<EventServer>::getInstance()->getNewInstanceByName(event_server_name); + if (StartableService::initImplementation(event_server, static_cast<void *>(globalConfiguration), event_server->getName(), __PRETTY_FUNCTION__)) { + //register the root handler on event server + event_server->setEventHanlder(event_dispatcher); + } + + MB_LAPP << "Trying to initilize Event Client: " << event_client_name; + event_client = ObjectFactoryRegister<EventClient>::getInstance()->getNewInstanceByName(event_client_name); + StartableService::initImplementation(event_client, static_cast<void *>(globalConfiguration), event_client->getName(), __PRETTY_FUNCTION__); } //---------------------------- E V E N T ---------------------------- - + //---------------------------- R P C ---------------------------- - if(globalConfiguration->hasKey(InitOption::OPT_RPC_IMPLEMENTATION)){ - //get the dispatcher - MB_LAPP << "Setup RPC sublayer"; - uint32_t dispatcher_type = globalConfiguration->getUInt32Value(InitOption::OPT_RPC_DOMAIN_SCHEDULER_TYPE); - switch(dispatcher_type) { - case 1: - MB_LAPP << "Use SharedCommandDispatcher for RPC"; - rpc_dispatcher = ObjectFactoryRegister<AbstractCommandDispatcher>::getInstance()->getNewInstanceByName("SharedCommandDispatcher"); - break; - - default: - MB_LAPP << "Use DefaultCommandDispatcher for RPC"; - rpc_dispatcher = ObjectFactoryRegister<AbstractCommandDispatcher>::getInstance()->getNewInstanceByName("DefaultCommandDispatcher"); - break; - } - - if(!rpc_dispatcher) - throw CException(-6, "Command dispatcher implementation not found", __PRETTY_FUNCTION__); - - if(!StartableService::initImplementation(rpc_dispatcher, static_cast<void*>(globalConfiguration), "DefaultCommandDispatcher", __PRETTY_FUNCTION__)) - throw CException(-7, "Command dispatcher has not been initialized due an error", __PRETTY_FUNCTION__); - - - // get the rpc type to instantiate - string rpc_impl = globalConfiguration->getStringValue(InitOption::OPT_RPC_IMPLEMENTATION); - //construct the rpc server and client name - string rpc_server_name = rpc_impl+"Server"; - string rpc_client_name = rpc_impl+"Client"; - - MB_LAPP << "Trying to initilize RPC Server: " << rpc_server_name; - rpc_server = ObjectFactoryRegister<RpcServer>::getInstance()->getNewInstanceByName(rpc_server_name); - if(!rpc_server) throw CException(-8, "Error allocating rpc server implementation", __PRETTY_FUNCTION__); + if (globalConfiguration->hasKey(InitOption::OPT_RPC_IMPLEMENTATION)) { + //get the dispatcher + MB_LAPP << "Setup RPC sublayer"; + uint32_t dispatcher_type = globalConfiguration->getUInt32Value(InitOption::OPT_RPC_DOMAIN_SCHEDULER_TYPE); + switch (dispatcher_type) { + case 1: + MB_LAPP << "Use SharedCommandDispatcher for RPC"; + rpc_dispatcher = ObjectFactoryRegister<AbstractCommandDispatcher>::getInstance()->getNewInstanceByName("SharedCommandDispatcher"); + break; + + default: + MB_LAPP << "Use DefaultCommandDispatcher for RPC"; + rpc_dispatcher = ObjectFactoryRegister<AbstractCommandDispatcher>::getInstance()->getNewInstanceByName("DefaultCommandDispatcher"); + break; + } + + if (!rpc_dispatcher) + throw CException(-6, "Command dispatcher implementation not found", __PRETTY_FUNCTION__); + + if (!StartableService::initImplementation(rpc_dispatcher, static_cast<void *>(globalConfiguration), "DefaultCommandDispatcher", __PRETTY_FUNCTION__)) + throw CException(-7, "Command dispatcher has not been initialized due an error", __PRETTY_FUNCTION__); + + // get the rpc type to instantiate + string rpc_impl = globalConfiguration->getStringValue(InitOption::OPT_RPC_IMPLEMENTATION); + //construct the rpc server and client name + string rpc_server_name = rpc_impl + "Server"; + string rpc_client_name = rpc_impl + "Client"; + + MB_LAPP << "Trying to initilize RPC Server: " << rpc_server_name; + rpc_server = ObjectFactoryRegister<RpcServer>::getInstance()->getNewInstanceByName(rpc_server_name); + if (!rpc_server) throw CException(-8, "Error allocating rpc server implementation", __PRETTY_FUNCTION__); #if CHAOS_PROMETHEUS - rpc_server = new rpc::RpcServerMetricCollector(rpc_server->getName(), rpc_server); + rpc_server = new rpc::RpcServerMetricCollector(rpc_server->getName(), rpc_server); #endif - if(StartableService::initImplementation(rpc_server, static_cast<void*>(globalConfiguration), rpc_server->getName(), __PRETTY_FUNCTION__)) { - //set the handler on the rpc server - rpc_server->setCommandDispatcher(rpc_dispatcher); - } - - MB_LAPP << "Trying to initilize RPC Client: " << rpc_client_name; - rpc_client = ObjectFactoryRegister<RpcClient>::getInstance()->getNewInstanceByName(rpc_client_name); - if(!rpc_client) throw CException(-9, "Error allocating rpc client implementation", __PRETTY_FUNCTION__); + if (StartableService::initImplementation(rpc_server, static_cast<void *>(globalConfiguration), rpc_server->getName(), __PRETTY_FUNCTION__)) { + //set the handler on the rpc server + rpc_server->setCommandDispatcher(rpc_dispatcher); + } + + MB_LAPP << "Trying to initilize RPC Client: " << rpc_client_name; + rpc_client = ObjectFactoryRegister<RpcClient>::getInstance()->getNewInstanceByName(rpc_client_name); + if (!rpc_client) throw CException(-9, "Error allocating rpc client implementation", __PRETTY_FUNCTION__); #if CHAOS_PROMETHEUS - rpc_client = new rpc::RpcClientMetricCollector(rpc_client->getName(), rpc_client); + rpc_client = new rpc::RpcClientMetricCollector(rpc_client->getName(), rpc_client); #endif - //! connect dispatch to manage error durign request forwarding - rpc_client->setServerHandler(rpc_dispatcher); - if(StartableService::initImplementation(rpc_client, static_cast<void*>(globalConfiguration), rpc_client->getName(), __PRETTY_FUNCTION__)) { - //set the forwarder into dispatcher for answere - rpc_dispatcher->setRpcForwarder(rpc_client); - } + //! connect dispatch to manage error durign request forwarding + rpc_client->setServerHandler(rpc_dispatcher); + if (StartableService::initImplementation(rpc_client, static_cast<void *>(globalConfiguration), rpc_client->getName(), __PRETTY_FUNCTION__)) { + //set the forwarder into dispatcher for answere + rpc_dispatcher->setRpcForwarder(rpc_client); + } } else { - throw CException(-10, "No RPC Adapter type found in configuration", __PRETTY_FUNCTION__); + throw CException(-10, "No RPC Adapter type found in configuration", __PRETTY_FUNCTION__); } - MB_LAPP << "Initialize performance session manager"; -// StartableService::initImplementation(performance_session_managment, static_cast<void*>(globalConfiguration), "PerformanceManagment", __PRETTY_FUNCTION__); - + MB_LAPP << "Initialize performance session manager"; + // StartableService::initImplementation(performance_session_managment, static_cast<void*>(globalConfiguration), "PerformanceManagment", __PRETTY_FUNCTION__); + //get host and port for fastly set it into the requests published_host_and_port.clear(); getPublishedHostAndPort(published_host_and_port); - + //create the shared message request domain global_request_domain.reset(new MessageRequestDomain()); + } /*! * All rpc adapter and command siaptcer are deinitilized */ void NetworkBroker::deinit() { - //remove global request domain before delete all, it use internally NetworBroker singleton - global_request_domain.reset(); - + //remove global request domain before delete all, it use internally NetworBroker singleton + global_request_domain.reset(); //---------------------------- D I R E C T I/O ---------------------------- CHAOS_NOT_THROW(InizializableService::deinitImplementation(direct_io_client, direct_io_client->getName(), __PRETTY_FUNCTION__);) DELETE_OBJ_POINTER(direct_io_client); - + CHAOS_NOT_THROW(StartableService::deinitImplementation(direct_io_server, direct_io_server->getName(), "NetworkBroker::deinit");) - + DELETE_OBJ_POINTER(direct_io_server); //---------------------------- D I R E C T I/O ---------------------------- - + //---------------------------- E V E N T ---------------------------- - if(!GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)) { - MB_LAPP << "Deallocate all event channel"; - for (map<string, event::channel::EventChannel*>::iterator channnelIter = active_event_channel.begin(); - channnelIter != active_event_channel.end(); - channnelIter++) { - - event::channel::EventChannel *eventChannelToDispose = channnelIter->second; - - if(eventChannelToDispose){ - - //deinit channel - eventChannelToDispose->deinit(); - - //dispose it - delete(eventChannelToDispose); - } + if (!GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)) { + MB_LAPP << "Deallocate all event channel"; + for (map<string, event::channel::EventChannel *>::iterator channnelIter = active_event_channel.begin(); + channnelIter != active_event_channel.end(); + channnelIter++) { + event::channel::EventChannel *eventChannelToDispose = channnelIter->second; + + if (eventChannelToDispose) { + //deinit channel + eventChannelToDispose->deinit(); + + //dispose it + delete (eventChannelToDispose); } - MB_LAPP << "Clear event channel map"; - active_event_channel.clear(); - - CHAOS_NOT_THROW(StartableService::deinitImplementation(event_client, event_client->getName(), __PRETTY_FUNCTION__);) - DELETE_OBJ_POINTER(event_client); - - CHAOS_NOT_THROW(StartableService::deinitImplementation(event_server, event_server->getName(), __PRETTY_FUNCTION__);) - DELETE_OBJ_POINTER(event_server); - - MB_LAPP << "Deinit Event dispatcher"; - CHAOS_NOT_THROW(StartableService::deinitImplementation(event_dispatcher, "DefaultEventDispatcher", __PRETTY_FUNCTION__);) - DELETE_OBJ_POINTER(event_dispatcher); + } + MB_LAPP << "Clear event channel map"; + active_event_channel.clear(); + + CHAOS_NOT_THROW(StartableService::deinitImplementation(event_client, event_client->getName(), __PRETTY_FUNCTION__);) + DELETE_OBJ_POINTER(event_client); + + CHAOS_NOT_THROW(StartableService::deinitImplementation(event_server, event_server->getName(), __PRETTY_FUNCTION__);) + DELETE_OBJ_POINTER(event_server); + + MB_LAPP << "Deinit Event dispatcher"; + CHAOS_NOT_THROW(StartableService::deinitImplementation(event_dispatcher, "DefaultEventDispatcher", __PRETTY_FUNCTION__);) + DELETE_OBJ_POINTER(event_dispatcher); } //---------------------------- E V E N T ---------------------------- - + //---------------------------- R P C ---------------------------- - MB_LAPP << "Deallocate all rpc channel"; - for (map<string, MessageChannel*>::iterator it = active_rpc_channel.begin(); + MB_LAPP << "Deallocate all rpc channel"; + for (map<string, MessageChannel *>::iterator it = active_rpc_channel.begin(); it != active_rpc_channel.end(); it++) { - - MessageChannel *messageChannelToDispose = it->second; - - //deinit channel - messageChannelToDispose->deinit(); - - //dispose it - delete(messageChannelToDispose); + MessageChannel *messageChannelToDispose = it->second; + + //deinit channel + messageChannelToDispose->deinit(); + + //dispose it + delete (messageChannelToDispose); } - MB_LAPP << "Clear rpc channel map"; + MB_LAPP << "Clear rpc channel map"; active_rpc_channel.clear(); - + CHAOS_NOT_THROW(StartableService::deinitImplementation(rpc_client, rpc_client->getName(), __PRETTY_FUNCTION__);) DELETE_OBJ_POINTER(rpc_client); - + CHAOS_NOT_THROW(StartableService::deinitImplementation(rpc_server, rpc_server->getName(), __PRETTY_FUNCTION__);) DELETE_OBJ_POINTER(rpc_server); - - MB_LAPP << "Deinit Command Dispatcher"; + + MB_LAPP << "Deinit Command Dispatcher"; CHAOS_NOT_THROW(StartableService::deinitImplementation(rpc_dispatcher, "DefaultCommandDispatcher", __PRETTY_FUNCTION__);) DELETE_OBJ_POINTER(rpc_dispatcher); //---------------------------- R P C ---------------------------- -} + } + /*! * all part are started */ -void NetworkBroker::start(){ +void NetworkBroker::start() { StartableService::startImplementation(direct_io_server, direct_io_server->getName(), __PRETTY_FUNCTION__); - if(!GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)) { - StartableService::startImplementation(event_dispatcher, "DefaultEventDispatcher", __PRETTY_FUNCTION__); - StartableService::startImplementation(event_server, event_server->getName(), __PRETTY_FUNCTION__); - StartableService::startImplementation(event_client, event_client->getName(), __PRETTY_FUNCTION__); + if (!GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)) { + StartableService::startImplementation(event_dispatcher, "DefaultEventDispatcher", __PRETTY_FUNCTION__); + StartableService::startImplementation(event_server, event_server->getName(), __PRETTY_FUNCTION__); + StartableService::startImplementation(event_client, event_client->getName(), __PRETTY_FUNCTION__); } StartableService::startImplementation(rpc_dispatcher, "DefaultCommandDispatcher", __PRETTY_FUNCTION__); StartableService::startImplementation(rpc_server, rpc_server->getName(), __PRETTY_FUNCTION__); StartableService::startImplementation(rpc_client, rpc_client->getName(), __PRETTY_FUNCTION__); -// StartableService::startImplementation(performance_session_managment, "PerformanceManagment", __PRETTY_FUNCTION__); + + // StartableService::startImplementation(performance_session_managment, "PerformanceManagment", __PRETTY_FUNCTION__); } /*! * all part are started */ void NetworkBroker::stop() { -// CHAOS_NOT_THROW(StartableService::stopImplementation(performance_session_managment, "PerformanceManagment", __PRETTY_FUNCTION__);) + // CHAOS_NOT_THROW(StartableService::stopImplementation(performance_session_managment, "PerformanceManagment", __PRETTY_FUNCTION__);) CHAOS_NOT_THROW(StartableService::stopImplementation(rpc_client, rpc_client->getName(), __PRETTY_FUNCTION__);) CHAOS_NOT_THROW(StartableService::stopImplementation(rpc_server, rpc_server->getName(), __PRETTY_FUNCTION__);) CHAOS_NOT_THROW(StartableService::stopImplementation(rpc_dispatcher, "DefaultCommandDispatcher", __PRETTY_FUNCTION__);) - if(!GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)) { - CHAOS_NOT_THROW(StartableService::stopImplementation(event_client, event_client->getName(), __PRETTY_FUNCTION__);) - CHAOS_NOT_THROW(StartableService::stopImplementation(event_server, event_server->getName(), __PRETTY_FUNCTION__);) - CHAOS_NOT_THROW(StartableService::stopImplementation(event_dispatcher, "DefaultEventDispatcher", __PRETTY_FUNCTION__);) + if (!GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)) { + CHAOS_NOT_THROW(StartableService::stopImplementation(event_client, event_client->getName(), __PRETTY_FUNCTION__);) + CHAOS_NOT_THROW(StartableService::stopImplementation(event_server, event_server->getName(), __PRETTY_FUNCTION__);) + CHAOS_NOT_THROW(StartableService::stopImplementation(event_dispatcher, "DefaultEventDispatcher", __PRETTY_FUNCTION__);) } CHAOS_NOT_THROW(StartableService::stopImplementation(direct_io_server, direct_io_server->getName(), __PRETTY_FUNCTION__);) + } /*! Return the port where the rpc server has been published */ int NetworkBroker::getPublishedPort() { - CHAOS_ASSERT(rpc_server); - return rpc_server->getPublishedPort(); + CHAOS_ASSERT(rpc_server); + return rpc_server->getPublishedPort(); } /*! Fill the parameter withe rigth value of host and port for the internale rpc server of message broker */ -void NetworkBroker::getPublishedHostAndPort(string& hostAndPort) { - CHAOS_ASSERT(rpc_server); - hostAndPort = GlobalConfiguration::getInstance()->getLocalServerAddress(); - hostAndPort.append(":"); - hostAndPort.append(lexical_cast<string>(rpc_server->getPublishedPort())); +void NetworkBroker::getPublishedHostAndPort(string &hostAndPort) { + CHAOS_ASSERT(rpc_server); + hostAndPort = GlobalConfiguration::getInstance()->getLocalServerAddress(); + hostAndPort.append(":"); + hostAndPort.append(lexical_cast<string>(rpc_server->getPublishedPort())); } std::string NetworkBroker::getRPCUrl() { - std::string rpc_endpoint; - getPublishedHostAndPort(rpc_endpoint); - return rpc_endpoint; + std::string rpc_endpoint; + getPublishedHostAndPort(rpc_endpoint); + return rpc_endpoint; } std::string NetworkBroker::getDirectIOUrl() { - CHAOS_ASSERT(rpc_server); - return direct_io_server->getUrl(); + CHAOS_ASSERT(rpc_server); + return direct_io_server->getUrl(); } #pragma mark Event Registration and forwarding @@ -391,9 +370,9 @@ std::string NetworkBroker::getDirectIOUrl() { \param eventAction the actio to register \param eventType a type for the event for which the user want to register */ -void NetworkBroker::registerEventAction(EventAction *eventAction, common::event::EventType eventType, const char * const identification) { - CHAOS_ASSERT(event_dispatcher && eventAction && !GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)); - event_dispatcher->registerEventAction(eventAction, eventType, identification); +void NetworkBroker::registerEventAction(EventAction *eventAction, common::event::EventType eventType, const char *const identification) { + CHAOS_ASSERT(event_dispatcher && eventAction && !GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)); + event_dispatcher->registerEventAction(eventAction, eventType, identification); } //!Event Action deregistration @@ -401,8 +380,8 @@ void NetworkBroker::registerEventAction(EventAction *eventAction, common::event: Deregister an event action */ void NetworkBroker::deregisterEventAction(EventAction *eventAction) { - CHAOS_ASSERT(event_dispatcher && eventAction && !GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)); - event_dispatcher->deregisterEventAction(eventAction); + CHAOS_ASSERT(event_dispatcher && eventAction && !GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)); + event_dispatcher->deregisterEventAction(eventAction); } //!Event channel creation @@ -411,27 +390,27 @@ void NetworkBroker::deregisterEventAction(EventAction *eventAction) { \param eventType is one of the value listent in EventType enum that specify the type of the eventfor wich we want a channel */ -channel::EventChannel *NetworkBroker::getNewEventChannelFromType(common::event::EventType event_type) { - CHAOS_ASSERT(!GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)); - common::event::channel::EventChannel *new_event_channel = NULL; - switch (event_type) { - case common::event::EventTypeAlert: - new_event_channel = new event::channel::AlertEventChannel(this); - break; - case common::event::EventTypeInstrument: - new_event_channel = new event::channel::InstrumentEventChannel(this); - break; - default: - break; - } - //check if the channel has been created - if(new_event_channel){ - new_event_channel->init(); - boost::mutex::scoped_lock lock(muext_map_event_channel_access); - active_event_channel.insert(make_pair(new_event_channel->channelID, new_event_channel)); - } - - return new_event_channel; +channel::EventChannel *NetworkBroker::getNewEventChannelFromType(common::event::EventType event_type) { + CHAOS_ASSERT(!GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)); + common::event::channel::EventChannel *new_event_channel = NULL; + switch (event_type) { + case common::event::EventTypeAlert: + new_event_channel = new event::channel::AlertEventChannel(this); + break; + case common::event::EventTypeInstrument: + new_event_channel = new event::channel::InstrumentEventChannel(this); + break; + default: + break; + } + //check if the channel has been created + if (new_event_channel) { + new_event_channel->init(); + boost::mutex::scoped_lock lock(muext_map_event_channel_access); + active_event_channel.insert(make_pair(new_event_channel->channelID, new_event_channel)); + } + + return new_event_channel; } //!Device channel creation /*! @@ -439,13 +418,12 @@ channel::EventChannel *NetworkBroker::getNewEventChannelFromType(common::event:: \param deviceNetworkAddress device node address */ AlertEventChannel *NetworkBroker::getNewAlertEventChannel() { - - if(GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)){ - MB_LAPP<<"No Event Alert Instanced, EVENTS are DISABLED"; - return NULL; - } - - return static_cast<event::channel::AlertEventChannel*>(NetworkBroker::getNewEventChannelFromType(event::EventTypeAlert)); + if (GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)) { + MB_LAPP << "No Event Alert Instanced, EVENTS are DISABLED"; + return NULL; + } + + return static_cast<event::channel::AlertEventChannel *>(NetworkBroker::getNewEventChannelFromType(event::EventTypeAlert)); } //!Device channel creation @@ -454,8 +432,8 @@ AlertEventChannel *NetworkBroker::getNewAlertEventChannel() { \param deviceNetworkAddress device node address */ InstrumentEventChannel *NetworkBroker::getNewInstrumentEventChannel() { - CHAOS_ASSERT(!GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)); - return static_cast<event::channel::InstrumentEventChannel*>(NetworkBroker::getNewEventChannelFromType(event::EventTypeInstrument)); + CHAOS_ASSERT(!GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)); + return static_cast<event::channel::InstrumentEventChannel *>(NetworkBroker::getNewEventChannelFromType(event::EventTypeInstrument)); } //!Event channel deallocation @@ -463,22 +441,22 @@ InstrumentEventChannel *NetworkBroker::getNewInstrumentEventChannel() { Perform the event channel deallocation */ void NetworkBroker::disposeEventChannel(common::event::channel::EventChannel *event_channel_to_dispose) { - CHAOS_ASSERT(!GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)); - if(!event_channel_to_dispose) return; - - boost::mutex::scoped_lock lock(muext_map_event_channel_access); - - //check if the channel is active - if(active_event_channel.count(event_channel_to_dispose->channelID) == 0) return; - - //remove the channel as active - active_event_channel.erase(event_channel_to_dispose->channelID); - - //deallocate it - event_channel_to_dispose->deinit(); - - //dispose it - delete(event_channel_to_dispose); + CHAOS_ASSERT(!GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)); + if (!event_channel_to_dispose) return; + + boost::mutex::scoped_lock lock(muext_map_event_channel_access); + + //check if the channel is active + if (active_event_channel.count(event_channel_to_dispose->channelID) == 0) return; + + //remove the channel as active + active_event_channel.erase(event_channel_to_dispose->channelID); + + //deallocate it + event_channel_to_dispose->deinit(); + + //dispose it + delete (event_channel_to_dispose); } //!message event @@ -487,35 +465,35 @@ void NetworkBroker::disposeEventChannel(common::event::channel::EventChannel *ev \param event the new evento to submit */ bool NetworkBroker::submitEvent(common::event::EventDescriptor *event) { - CHAOS_ASSERT(event_client && !GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)); - bool result = true; - try{ - event_client->submitEvent(MOVE(EventDescriptorSPtr(event))); - } catch(CException& ex) { - result = false; - DECODE_CHAOS_EXCEPTION(ex); - } - return result; + CHAOS_ASSERT(event_client && !GlobalConfiguration::getInstance()->getOption<bool>(InitOption::OPT_EVENT_DISABLE)); + bool result = true; + try { + event_client->submitEvent(MOVE(EventDescriptorSPtr(event))); + } catch (CException &ex) { + result = false; + DECODE_CHAOS_EXCEPTION(ex); + } + return result; } #pragma mark Action Registration /* Register actions defined by AbstractActionDescriptor instance contained in the array */ -void NetworkBroker::registerAction(DeclareAction* declare_action_class) { - CHAOS_ASSERT(declare_action_class) - rpc_dispatcher->registerAction(declare_action_class); +void NetworkBroker::registerAction(DeclareAction *declare_action_class) { + CHAOS_ASSERT(declare_action_class) + rpc_dispatcher->registerAction(declare_action_class); } /* Deregister actions for a determianted domain */ -void NetworkBroker::deregisterAction(DeclareAction* declare_action_class) { - if(rpc_dispatcher&&declare_action_class){ - rpc_dispatcher->deregisterAction(declare_action_class); - } else { - LERR_<<"declare_action_class null"; - } +void NetworkBroker::deregisterAction(DeclareAction *declare_action_class) { + if (rpc_dispatcher && declare_action_class) { + rpc_dispatcher->deregisterAction(declare_action_class); + } else { + LERR_ << "declare_action_class null"; + } } #pragma mark Message Submission @@ -523,14 +501,14 @@ void NetworkBroker::deregisterAction(DeclareAction* declare_action_class) { /*! Submit a message specifing the destination */ -bool NetworkBroker::submitMessage(const string& host, - CDWUniquePtr message) { - CHAOS_ASSERT(rpc_client) - ChaosSharedPtr<NetworkForwardInfo> nfi = ChaosMakeSharedPtr<NetworkForwardInfo>(true); - nfi->destinationAddr = host; - nfi->setMessage(MOVE(message)); - //add answer id to datawrapper - return rpc_client->submitMessage(MOVE(nfi), false); +bool NetworkBroker::submitMessage(const string &host, + CDWUniquePtr message) { + CHAOS_ASSERT(rpc_client) + ChaosSharedPtr<NetworkForwardInfo> nfi = ChaosMakeSharedPtr<NetworkForwardInfo>(true); + nfi->destinationAddr = host; + nfi->setMessage(MOVE(message)); + //add answer id to datawrapper + return rpc_client->submitMessage(MOVE(nfi), false); } //!send interparocess message @@ -539,93 +517,89 @@ bool NetworkBroker::submitMessage(const string& host, to the registered rpc domain */ chaos::common::data::CDWUniquePtr NetworkBroker::submitInterProcessMessage(chaos::common::data::CDWUniquePtr message, - bool onThisThread) { - CHAOS_ASSERT(rpc_dispatcher) - if(onThisThread) { - return rpc_dispatcher->executeCommandSync(MOVE(message)); - }else{ - return rpc_dispatcher->dispatchCommand(MOVE(message)); - } + bool onThisThread) { + CHAOS_ASSERT(rpc_dispatcher) + if (onThisThread) { + return rpc_dispatcher->executeCommandSync(MOVE(message)); + } else { + return rpc_dispatcher->dispatchCommand(MOVE(message)); + } } /*! Submite a new request to send to the remote host */ -bool NetworkBroker::submiteRequest(const string& host, - CDWUniquePtr request, - std::string sender_node_id, - uint32_t sender_request_id) { - CHAOS_ASSERT(rpc_client) - request->addStringValue(RpcActionDefinitionKey::CS_CMDM_ANSWER_HOST_IP, published_host_and_port); - ChaosSharedPtr<NetworkForwardInfo> nfi = ChaosMakeSharedPtr<NetworkForwardInfo>(true); - if(nfi.get()){ - nfi->destinationAddr = host; - nfi->sender_node_id = sender_node_id; - nfi->sender_request_id = sender_request_id; - nfi->setMessage(MOVE(request)); - return rpc_client->submitMessage(MOVE(nfi), false); - } else { - LERR_<<sender_request_id<<"] to '"<<host<<"' invalid NetworkForwardInfo for:'"<<published_host_and_port<<"' sender node id:"<<sender_node_id; - - } - return false; +bool NetworkBroker::submiteRequest(const string &host, + CDWUniquePtr request, + std::string sender_node_id, + uint32_t sender_request_id) { + CHAOS_ASSERT(rpc_client) + request->addStringValue(RpcActionDefinitionKey::CS_CMDM_ANSWER_HOST_IP, published_host_and_port); + ChaosSharedPtr<NetworkForwardInfo> nfi = ChaosMakeSharedPtr<NetworkForwardInfo>(true); + if (nfi.get()) { + nfi->destinationAddr = host; + nfi->sender_node_id = sender_node_id; + nfi->sender_request_id = sender_request_id; + nfi->setMessage(MOVE(request)); + return rpc_client->submitMessage(MOVE(nfi), false); + } else { + LERR_ << sender_request_id << "] to '" << host << "' invalid NetworkForwardInfo for:'" << published_host_and_port << "' sender node id:" << sender_node_id; + } + return false; } /* */ MessageChannel *NetworkBroker::getNewMessageChannelForRemoteHost(CNetworkAddress *node_network_address, - EntityType type, - bool use_shared_request_domain) { - MessageChannel *channel = NULL; - CHAOS_ASSERT(getServiceState() == 2) - if(getServiceState() != 2) return NULL; - switch (type) { - case RAW:{ - CHAOS_ASSERT(!node_network_address) - channel = use_shared_request_domain? - new MessageChannel(this, - global_request_domain): - new MessageChannel(this); - break; - } - case RAW_MULTI_ADDRESS:{ - CHAOS_ASSERT(!node_network_address) - channel = use_shared_request_domain? - new MultiAddressMessageChannel(this, - global_request_domain): - new MultiAddressMessageChannel(this); - break; - } - case DEVICE:{ - if(!node_network_address) return NULL; - channel = use_shared_request_domain? - new DeviceMessageChannel(this, - static_cast<CDeviceNetworkAddress*>(node_network_address), - false, - global_request_domain): - new DeviceMessageChannel(this, static_cast<CDeviceNetworkAddress*>(node_network_address)); - break; - } -// case PERFORMANCE:{ -// if(!node_network_address) return NULL; -// channel = use_shared_request_domain? -// new common::message::PerformanceNodeChannel(this, -// node_network_address, -// performance_session_managment.getLocalDirectIOClientInstance(), -// global_request_domain): -// new common::message::PerformanceNodeChannel(this, -// node_network_address, -// performance_session_managment.getLocalDirectIOClientInstance()); -// break; -// } + EntityType type, + bool use_shared_request_domain) { + MessageChannel *channel = NULL; + CHAOS_ASSERT(getServiceState() == 2) + if (getServiceState() != 2) return NULL; + switch (type) { + case RAW: { + CHAOS_ASSERT(!node_network_address) + channel = use_shared_request_domain ? new MessageChannel(this, + global_request_domain) + : new MessageChannel(this); + break; } - //check if the channel has been created - if(channel){ - channel->init(); - boost::mutex::scoped_lock lock(mutex_map_rpc_channel_acces); - active_rpc_channel.insert(make_pair(channel->getChannelUUID(), channel)); + case RAW_MULTI_ADDRESS: { + CHAOS_ASSERT(!node_network_address) + channel = use_shared_request_domain ? new MultiAddressMessageChannel(this, + global_request_domain) + : new MultiAddressMessageChannel(this); + break; } - return channel; + case DEVICE: { + if (!node_network_address) return NULL; + channel = use_shared_request_domain ? new DeviceMessageChannel(this, + static_cast<CDeviceNetworkAddress *>(node_network_address), + false, + global_request_domain) + : new DeviceMessageChannel(this, static_cast<CDeviceNetworkAddress *>(node_network_address)); + break; + } + // case PERFORMANCE:{ + // if(!node_network_address) return NULL; + // channel = use_shared_request_domain? + // new common::message::PerformanceNodeChannel(this, + // node_network_address, + // performance_session_managment.getLocalDirectIOClientInstance(), + // global_request_domain): + // new common::message::PerformanceNodeChannel(this, + // node_network_address, + // performance_session_managment.getLocalDirectIOClientInstance()); + // break; + // } + } + //check if the channel has been created + if (channel) { + channel->init(); + boost::mutex::scoped_lock lock(mutex_map_rpc_channel_acces); + active_rpc_channel.insert(make_pair(channel->getChannelUUID(), channel)); + } + return channel; } //!Metadata server channel creation @@ -633,47 +607,42 @@ MessageChannel *NetworkBroker::getNewMessageChannelForRemoteHost(CNetworkAddress Performe the creation of metadata server */ MDSMessageChannel *NetworkBroker::getMetadataserverMessageChannel(MessageRequestDomainSHRDPtr shared_request_domain) { - MDSMessageChannel *channel = (shared_request_domain.get() == NULL)? - (new MDSMessageChannel(this, GlobalConfiguration::getInstance()->getMetadataServerAddressList(), global_request_domain)): - (new MDSMessageChannel(this, GlobalConfiguration::getInstance()->getMetadataServerAddressList(), shared_request_domain)); - if(channel){ - channel->init(); - boost::mutex::scoped_lock lock(mutex_map_rpc_channel_acces); - active_rpc_channel.insert(make_pair(channel->getChannelUUID(), static_cast<MessageChannel*>(channel))); - } - return channel; + MDSMessageChannel *channel = (shared_request_domain.get() == NULL) ? (new MDSMessageChannel(this, GlobalConfiguration::getInstance()->getMetadataServerAddressList(), global_request_domain)) : (new MDSMessageChannel(this, GlobalConfiguration::getInstance()->getMetadataServerAddressList(), shared_request_domain)); + if (channel) { + channel->init(); + boost::mutex::scoped_lock lock(mutex_map_rpc_channel_acces); + active_rpc_channel.insert(make_pair(channel->getChannelUUID(), static_cast<MessageChannel *>(channel))); + } + return channel; } /*! Performe the creation of metadata server */ -MDSMessageChannel *NetworkBroker::getMetadataserverMessageChannel(const VectorNetworkAddress& endpoints, MessageRequestDomainSHRDPtr shared_request_domain) { - MDSMessageChannel *channel = (shared_request_domain.get() == NULL)? - (new MDSMessageChannel(this, endpoints, global_request_domain)): - (new MDSMessageChannel(this, endpoints, shared_request_domain)); - if(channel){ - channel->init(); - boost::mutex::scoped_lock lock(mutex_map_rpc_channel_acces); - active_rpc_channel.insert(make_pair(channel->getChannelUUID(), static_cast<MessageChannel*>(channel))); - } - return channel; +MDSMessageChannel *NetworkBroker::getMetadataserverMessageChannel(const VectorNetworkAddress &endpoints, MessageRequestDomainSHRDPtr shared_request_domain) { + MDSMessageChannel *channel = (shared_request_domain.get() == NULL) ? (new MDSMessageChannel(this, endpoints, global_request_domain)) : (new MDSMessageChannel(this, endpoints, shared_request_domain)); + if (channel) { + channel->init(); + boost::mutex::scoped_lock lock(mutex_map_rpc_channel_acces); + active_rpc_channel.insert(make_pair(channel->getChannelUUID(), static_cast<MessageChannel *>(channel))); + } + return channel; } - //!Metadata server channel creation /*! Performe the creation of metadata server */ MultiAddressMessageChannel *NetworkBroker::getMultiMetadataServiceRawMessageChannel() { - MultiAddressMessageChannel *mc = getRawMultiAddressMessageChannel(); - if(mc){ - mc->addNode(GlobalConfiguration::getInstance()->getMetadataServerAddress()); - } - if(mc){ - mc->init(); - boost::mutex::scoped_lock lock(mutex_map_rpc_channel_acces); - active_rpc_channel.insert(make_pair(mc->getChannelUUID(), mc)); - } - return mc; + MultiAddressMessageChannel *mc = getRawMultiAddressMessageChannel(); + if (mc) { + mc->addNode(GlobalConfiguration::getInstance()->getMetadataServerAddress()); + } + if (mc) { + mc->init(); + boost::mutex::scoped_lock lock(mutex_map_rpc_channel_acces); + active_rpc_channel.insert(make_pair(mc->getChannelUUID(), mc)); + } + return mc; } //!Device channel creation @@ -682,53 +651,51 @@ MultiAddressMessageChannel *NetworkBroker::getMultiMetadataServiceRawMessageChan \param deviceNetworkAddress device node address */ DeviceMessageChannel *NetworkBroker::getDeviceMessageChannelFromAddress(CDeviceNetworkAddress *node_network_address, - bool self_managed, - bool use_shared_request_domain) { - DeviceMessageChannel *channel = (use_shared_request_domain? - new DeviceMessageChannel(this, - node_network_address, - self_managed, - global_request_domain): - new DeviceMessageChannel(this, - node_network_address, - self_managed)); - - - if(channel){ - channel->init(); - boost::mutex::scoped_lock lock(mutex_map_rpc_channel_acces); - active_rpc_channel.insert(make_pair(channel->getChannelUUID(), static_cast<MessageChannel*>(channel))); - } - return channel; + bool self_managed, + bool use_shared_request_domain) { + DeviceMessageChannel *channel = (use_shared_request_domain ? new DeviceMessageChannel(this, + node_network_address, + self_managed, + global_request_domain) + : new DeviceMessageChannel(this, + node_network_address, + self_managed)); + + if (channel) { + channel->init(); + boost::mutex::scoped_lock lock(mutex_map_rpc_channel_acces); + active_rpc_channel.insert(make_pair(channel->getChannelUUID(), static_cast<MessageChannel *>(channel))); + } + return channel; } //!performance channel creation -PerformanceNodeChannel *NetworkBroker::getPerformanceChannelFromAddress(CNetworkAddress *node_network_address) { - return static_cast<chaos::common::message::PerformanceNodeChannel*>(getNewMessageChannelForRemoteHost(node_network_address, PERFORMANCE)); +PerformanceNodeChannel *NetworkBroker::getPerformanceChannelFromAddress(CNetworkAddress *node_network_address) { + return static_cast<chaos::common::message::PerformanceNodeChannel *>(getNewMessageChannelForRemoteHost(node_network_address, PERFORMANCE)); } //! Return a raw message channel MessageChannel *NetworkBroker::getRawMessageChannel() { - return getNewMessageChannelForRemoteHost(NULL, RAW); + return getNewMessageChannelForRemoteHost(NULL, RAW); } //! Return a raw multinode message channel MultiAddressMessageChannel *NetworkBroker::getRawMultiAddressMessageChannel() { - return static_cast<MultiAddressMessageChannel*>(getNewMessageChannelForRemoteHost(NULL, RAW_MULTI_ADDRESS)); + return static_cast<MultiAddressMessageChannel *>(getNewMessageChannelForRemoteHost(NULL, RAW_MULTI_ADDRESS)); } //! Return a raw multinode message channel /*! Performe the creation of a raw multinode message channel */ -chaos::common::message::MultiAddressMessageChannel *NetworkBroker::getRawMultiAddressMessageChannel(const std::vector<chaos::common::network::CNetworkAddress>& node_address) { - MultiAddressMessageChannel *mc = new MultiAddressMessageChannel(this, node_address); - if(mc){ - mc->init(); - boost::mutex::scoped_lock lock(mutex_map_rpc_channel_acces); - active_rpc_channel.insert(make_pair(mc->getChannelUUID(), mc)); - } - return mc; +chaos::common::message::MultiAddressMessageChannel *NetworkBroker::getRawMultiAddressMessageChannel(const std::vector<chaos::common::network::CNetworkAddress> &node_address) { + MultiAddressMessageChannel *mc = new MultiAddressMessageChannel(this, node_address); + if (mc) { + mc->init(); + boost::mutex::scoped_lock lock(mutex_map_rpc_channel_acces); + active_rpc_channel.insert(make_pair(mc->getChannelUUID(), mc)); + } + return mc; } //!Channel deallocation @@ -736,56 +703,55 @@ chaos::common::message::MultiAddressMessageChannel *NetworkBroker::getRawMultiAd Perform the message channel deallocation */ void NetworkBroker::disposeMessageChannel(MessageChannel *message_channel_to_dispose) { - if(!message_channel_to_dispose) return; - - std::string uid=message_channel_to_dispose->getChannelUUID(); - //deallocate it - CHAOS_NOT_THROW(message_channel_to_dispose->deinit();); - //check if the channel is active + if (!message_channel_to_dispose) return; - if(active_rpc_channel.count(uid) == 0) return; + std::string uid = message_channel_to_dispose->getChannelUUID(); + //deallocate it + CHAOS_NOT_THROW(message_channel_to_dispose->deinit();); + //check if the channel is active - boost::mutex::scoped_lock lock(mutex_map_rpc_channel_acces); - - //remove the channel as active - active_rpc_channel.erase(uid); - - //dispose it - delete(message_channel_to_dispose); + if (active_rpc_channel.count(uid) == 0) return; + + boost::mutex::scoped_lock lock(mutex_map_rpc_channel_acces); + + //remove the channel as active + active_rpc_channel.erase(uid); + + //dispose it + delete (message_channel_to_dispose); } //!Channel deallocation void NetworkBroker::disposeMessageChannel(NodeMessageChannel *messageChannelToDispose) { - NetworkBroker::disposeMessageChannel((MessageChannel*)messageChannelToDispose); + NetworkBroker::disposeMessageChannel((MessageChannel *)messageChannelToDispose); } //!Rpc Channel deallocation void NetworkBroker::disposeMessageChannel(chaos::common::message::MultiAddressMessageChannel *messageChannelToDispose) { - NetworkBroker::disposeMessageChannel((MessageChannel*)messageChannelToDispose); + NetworkBroker::disposeMessageChannel((MessageChannel *)messageChannelToDispose); } //!Rpc Channel deallocation void NetworkBroker::disposeMessageChannel(chaos::common::message::MDSMessageChannel *messageChannelToDispose) { - NetworkBroker::disposeMessageChannel(static_cast<MessageChannel*>(messageChannelToDispose)); + NetworkBroker::disposeMessageChannel(static_cast<MessageChannel *>(messageChannelToDispose)); } - //Allocate a new endpoint in the direct io server chaos_direct_io::DirectIOServerEndpoint *NetworkBroker::getDirectIOServerEndpoint() { - CHAOS_ASSERT(direct_io_dispatcher) - chaos_direct_io::DirectIOServerEndpoint *result_endpoint = direct_io_dispatcher->getNewEndpoint(); - return result_endpoint; + CHAOS_ASSERT(direct_io_dispatcher) + chaos_direct_io::DirectIOServerEndpoint *result_endpoint = direct_io_dispatcher->getNewEndpoint(); + return result_endpoint; } //Dispose an endpoint of the direct io server void NetworkBroker::releaseDirectIOServerEndpoint(chaos_direct_io::DirectIOServerEndpoint *end_point) { - direct_io_dispatcher->releaseEndpoint(end_point); + direct_io_dispatcher->releaseEndpoint(end_point); } //Return a new direct io client instance chaos_direct_io::DirectIOClient *NetworkBroker::getSharedDirectIOClientInstance() { - return direct_io_client; + return direct_io_client; } //Return a new direct io client instance chaos_direct_io::DirectIOClient *NetworkBroker::getNewDirectIOClientInstance() { - MB_LAPP << "Allocate a new DirectIOClient of type " << direct_io_client_impl; - return ObjectFactoryRegister<common::direct_io::DirectIOClient>::getInstance()->getNewInstanceByName(direct_io_client_impl); + MB_LAPP << "Allocate a new DirectIOClient of type " << direct_io_client_impl; + return ObjectFactoryRegister<common::direct_io::DirectIOClient>::getInstance()->getNewInstanceByName(direct_io_client_impl); } diff --git a/chaos/common/network/NetworkBroker.h b/chaos/common/network/NetworkBroker.h index dfbfa3467d683105a1635a50957097156a72490c..03ffc6e431d844af9ddbba1212ec2f80f04bcde3 100644 --- a/chaos/common/network/NetworkBroker.h +++ b/chaos/common/network/NetworkBroker.h @@ -121,10 +121,6 @@ namespace chaos { //! Rpc server for message listening chaos::RpcServer *rpc_server; - // publish subscribe - chaos::common::message::producer_uptr_t prod; - chaos::common::message::consumer_uptr_t cons; - //rpc action dispatcher AbstractCommandDispatcher *rpc_dispatcher; @@ -160,7 +156,7 @@ namespace chaos { public: - + bool usepsbroker; //! Basic Destructor virtual ~NetworkBroker(); diff --git a/chaos/common/network/NetworkForwardInfo.h b/chaos/common/network/NetworkForwardInfo.h index 90dc94a650608b91e24f57d9a2313984529ba52b..63483ed2d0e4639c0b1ff84f740bc20436403f0e 100644 --- a/chaos/common/network/NetworkForwardInfo.h +++ b/chaos/common/network/NetworkForwardInfo.h @@ -37,6 +37,7 @@ namespace chaos { typedef struct NetworkForwardInfo { bool is_request; bool is_synchronous_request; + bool is_psm;//is publish subscribe //!Define the information ip:port used to reach a remote chaos network broker std::string destinationAddr; //! the message data @@ -55,6 +56,7 @@ namespace chaos { destinationAddr(), message(), tag(0), + is_psm(false), sender_node_id(), sender_request_id(0){} diff --git a/chaos/common/rpc/ChaosRpc.h b/chaos/common/rpc/ChaosRpc.h index eb79e62bf9b41d015436f86a3b6a1d2687ec153c..ea1239ed52d6bc12e0259079d6744c192f0bf5d2 100644 --- a/chaos/common/rpc/ChaosRpc.h +++ b/chaos/common/rpc/ChaosRpc.h @@ -22,4 +22,5 @@ #define ChaosFramework_ChaosRpc_h #define RPC_SYNC_KEY "sync" #define RPC_SEQ_KEY "seq_id" +#define RPC_SRC_UID "src_uid" #endif diff --git a/chaos/common/rpc/psm/PSMClient.cpp b/chaos/common/rpc/psm/PSMClient.cpp new file mode 100644 index 0000000000000000000000000000000000000000..6f0b3b53f5f3d4bb5bf12161af75e8b1a3bf5c0e --- /dev/null +++ b/chaos/common/rpc/psm/PSMClient.cpp @@ -0,0 +1,160 @@ +/* + * Copyright 2012, 2017 INFN + * + * Licensed under the EUPL, Version 1.2 or – as soon they + * will be approved by the European Commission - subsequent + * versions of the EUPL (the "Licence"); + * You may not use this work except in compliance with the + * Licence. + * You may obtain a copy of the Licence at: + * + * https://joinup.ec.europa.eu/software/page/eupl + * + * Unless required by applicable law or agreed to in + * writing, software distributed under the Licence is + * distributed on an "AS IS" basis, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the Licence for the specific language governing + * permissions and limitations under the Licence. + */ + +#include <chaos/common/global.h> +#include <chaos/common/rpc/ChaosRpc.h> +#include <chaos/common/async_central/AsyncCentralManager.h> +#include <chaos/common/chaos_constants.h> +#include <chaos/common/configuration/GlobalConfiguration.h> +#include <chaos/common/async_central/AsyncCentralManager.h> +#include <string> +#include <boost/lexical_cast.hpp> +#include <boost/algorithm/string.hpp> +#include <chaos/common/chaos_errors.h> + +using namespace chaos; +using namespace chaos::common::data; +using namespace std; +using namespace boost; +using namespace boost::algorithm; +#include "PSMClient.h" +#define PSMC_LAPP INFO_LOG(PSMClient) +#define PSMC_LDBG DBG_LOG(PSMClient) +#define PSMC_LERR ERR_LOG(PSMClient) + +#define PSM_DO_AGAIN(x) do{x}while(err == EAGAIN); +#define PSM_SOCKET_MAINTENANCE_TIMEOUT (5000 * 30) +#define PSM_SOCKET_LIFETIME_TIMEOUT (5000 * 60) +//------------------------------------------------------- +DEFINE_CLASS_FACTORY(PSMClient, RpcClient); + +PSMClient::PSMClient(const string& alias): +RpcClient(alias){ + seq_id=0; + +} + +PSMClient::~PSMClient(){ +// #ifndef CHAOS_PROMETHEUS +// delete counter_zmqerror_uptr; +// #endif +} + +/* + Initialization method for output buffer + */ +void PSMClient::init(void *init_data) { + CDataWrapper *cfg = reinterpret_cast<CDataWrapper*>(init_data); + seq_id=0; + PSMC_LAPP << "initialization"; + if(!cfg->hasKey(InitOption::OPT_MSG_BROKER_SERVER)){ + throw chaos::CException(-1, "a not empty broker must be given", __PRETTY_FUNCTION__); + } + if(!cfg->hasKey(chaos::InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS)){ + throw chaos::CException(-1, "a not empty and unique id must be given", __PRETTY_FUNCTION__); + } + nodeuid = cfg->getStringValue(chaos::InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS); + std::string msgbrokerdrv = "kafka-rdk"; + if(cfg->hasKey(InitOption::OPT_MSG_BROKER_DRIVER)){ + msgbrokerdrv = cfg->getStringValue(chaos::InitOption::OPT_MSG_BROKER_DRIVER); + + } + + std::string msgbroker = cfg->getStringValue(InitOption::OPT_MSG_BROKER_SERVER); + + prod = chaos::common::message::MessagePSDriver::getNewProducerDriver(msgbrokerdrv); + PSMC_LAPP << "Initializing producer based on " << msgbroker<<" ("+msgbrokerdrv+")"; + + prod->addServer(msgbroker); + + if (prod->applyConfiguration() != 0) { + throw chaos::CException(-2, "cannot initialize Publish Subscribe Producer:" + prod->getLastError(), __PRETTY_FUNCTION__); + } + +} + +/* + start the rpc adapter + */ +void PSMClient::start() { + int err = 0; + prod->start(); + + //start timere after and repeat every one minut + // if((err = chaos::common::async_central::AsyncCentralManager::getInstance()->addTimer(this, PSM_SOCKET_MAINTENANCE_TIMEOUT, PSM_SOCKET_MAINTENANCE_TIMEOUT))) {LOG_AND_TROW(PSMC_LERR, err, "Error adding timer")} +} + +/* + start the rpc adapter + */ +void PSMClient::stop() { + int err = 0; + prod->stop(); + // if((err = chaos::common::async_central::AsyncCentralManager::getInstance()->removeTimer(this))) {LOG_AND_TROW(PSMC_LERR, err, "Error removing timer")} +} + +/* + Deinitialization method for output buffer + */ +void PSMClient::deinit() { + + PSMC_LAPP << "PSM Destroyed"; +} + +/* + + */ +bool PSMClient::submitMessage(NFISharedPtr forwardInfo, + bool onThisThread) { + CHAOS_ASSERT(forwardInfo); + ElementManagingPolicy ePolicy; + try{ + forwardInfo->is_psm=true; + if(!forwardInfo->destinationAddr.size()) + throw CException(0, "No destination in message description", __PRETTY_FUNCTION__); + if(!forwardInfo->hasMessage()) + throw CException(0, "No message in description", __PRETTY_FUNCTION__); + + forwardInfo->message->addBoolValue(RPC_SYNC_KEY, RpcClient::syncrhonous_call); + forwardInfo->message->addInt64Value(RPC_SEQ_KEY, (++seq_id)); + forwardInfo->message->addStringValue(RPC_SRC_UID, nodeuid); + std::string key=forwardInfo->destinationAddr+ chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX; + PSMC_LDBG<<"Sending message to:"<<key<<" msg:"<<forwardInfo->message->getJSONString(); + prod->pushMsg(*forwardInfo->message.get(),key); + + } catch(CException& ex){ + //in this case i need to delete the memory + //in this case i need to delete te memory allocated by message + DECODE_CHAOS_EXCEPTION(ex) + } + return true; +} + +//-----timer handler------ +void PSMClient::timeout() { + +} + + + +uint64_t PSMClient::getMessageQueueSize() { + return 0; +} diff --git a/chaos/common/rpc/psm/PSMClient.h b/chaos/common/rpc/psm/PSMClient.h new file mode 100644 index 0000000000000000000000000000000000000000..ce9bfed4d3905771703f72ff313d48760679e50c --- /dev/null +++ b/chaos/common/rpc/psm/PSMClient.h @@ -0,0 +1,92 @@ +// +// PSMClient.h +// CHAOSFramework +// +// Created by Bisegni Claudio on 11/03/12. +// Copyright (c) 2012 INFN. All rights reserved. +// + +#ifndef CHAOSFramework_PSMClient_h +#define CHAOSFramework_PSMClient_h + +//#pragma GCC diagnostic ignored "-Woverloaded-virtual" + +#include <chaos/common/rpc/RpcClient.h> +#include <chaos/common/pqueue/ChaosProcessingQueue.h> +#include <chaos/common/utility/ObjectFactoryRegister.h> +#include <chaos/common/utility/TimingUtil.h> +#include <chaos/common/pool/ResourcePool.h> + +#include <boost/thread.hpp> +#include <boost/shared_ptr.hpp> +#include <chaos/common/message/MessagePSDriver.h> +#include <map> +#include <deque> +namespace chaos { + + class PSMClient; + class SocketEndpointPool; + + struct PSMSocketPoolDef{ + void * socket; + std::string identity; + }; + + typedef chaos::common::pool::ResourcePool<PSMSocketPoolDef> PSMSocketPool; + + //define the pool my for every endpoint + CHAOS_DEFINE_MAP_FOR_TYPE(std::string, ChaosSharedPtr< PSMSocketPool >, SocketMap) + + /* + Class that implemnt !CHAOS RPC messaggin gusing PSM + + driver parameter: + key:zmq_timeout value is a stirng that represent the integer used as timeout + */ + DECLARE_CLASS_FACTORY(PSMClient, RpcClient), + public chaos::common::async_central::TimerHandler { + REGISTER_AND_DEFINE_DERIVED_CLASS_FACTORY_HELPER(PSMClient) + PSMClient(const std::string& alias); + virtual ~PSMClient(); + boost::shared_mutex map_socket_mutex; + ChaosAtomic<uint64_t> seq_id; + std::string nodeuid; + protected: + //timer handler + void timeout(); + + // publish subscribe + chaos::common::message::producer_uptr_t prod; + public: + + /* + init the rpc adapter + */ + void init(void *init_data); + + /* + start the rpc adapter + */ + void start(); + + /* + start the rpc adapter + */ + void stop(); + + /* + deinit the rpc adapter + */ + void deinit(); + + /* + Submit the message to be send to a certain ip, the datawrapper must contains + the key CS_CMDM_REMOTE_HOST_IP + */ + bool submitMessage(NFISharedPtr forwardInfo, bool onThisThread=false); + + //inherited method + virtual uint64_t getMessageQueueSize(); + }; +} +#endif diff --git a/chaos/common/rpc/psm/PSMServer.cpp b/chaos/common/rpc/psm/PSMServer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..baa8f74f66946c0023f5c1b36e230a37b831fffb --- /dev/null +++ b/chaos/common/rpc/psm/PSMServer.cpp @@ -0,0 +1,130 @@ +/* + * Copyright 2012, 2017 INFN + * + * Licensed under the EUPL, Version 1.2 or – as soon they + * will be approved by the European Commission - subsequent + * versions of the EUPL (the "Licence"); + * You may not use this work except in compliance with the + * Licence. + * You may obtain a copy of the Licence at: + * + * https://joinup.ec.europa.eu/software/page/eupl + * + * Unless required by applicable law or agreed to in + * writing, software distributed under the Licence is + * distributed on an "AS IS" basis, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the Licence for the specific language governing + * permissions and limitations under the Licence. + */ + +#include <chaos/common/global.h> +#include <chaos/common/rpc/ChaosRpc.h> +#include <chaos/common/chaos_constants.h> +#include <chaos/common/exception/exception.h> +#include <chaos/common/data/CDataWrapper.h> +#include <chaos/common/rpc/RpcServerHandler.h> +#define PSMS_LAPP INFO_LOG(PSMServer) +#define PSMS_LDBG DBG_LOG(PSMServer) +#define PSMS_LERR ERR_LOG(PSMServer) + +using namespace std; +using namespace chaos; +using namespace boost; +using namespace chaos::common::data; +#include "PSMServer.h" + +DEFINE_CLASS_FACTORY(PSMServer, RpcServer); +PSMServer::PSMServer(const string& alias): +RpcServer(alias){ + +} + +PSMServer::~PSMServer() { + +} + +//init the server getting the configuration value +void PSMServer::init(void *init_data) { + CDataWrapper *cfg = reinterpret_cast<CDataWrapper*>(init_data); + PSMS_LAPP << "initialization"; + try{ + if(!cfg->hasKey(InitOption::OPT_MSG_BROKER_SERVER)){ + throw chaos::CException(-1, "a not empty broker must be given", __PRETTY_FUNCTION__); + } + if(!cfg->hasKey(chaos::InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS)){ + throw chaos::CException(-1, "a not empty and unique id must be given", __PRETTY_FUNCTION__); + } + nodeuid = cfg->getStringValue(chaos::InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS); + std::string msgbrokerdrv = "kafka-rdk"; + if(cfg->hasKey(InitOption::OPT_MSG_BROKER_DRIVER)){ + msgbrokerdrv = cfg->getStringValue(chaos::InitOption::OPT_MSG_BROKER_DRIVER); + + } + + std::string msgbroker = cfg->getStringValue(InitOption::OPT_MSG_BROKER_SERVER); + + cons = chaos::common::message::MessagePSDriver::getNewConsumerDriver(msgbrokerdrv, ""); + if (cons.get() == 0) { + throw chaos::CException(-3, "cannot initialize Publish Subscribe Consumer of topic:" + nodeuid + "_cmd", __PRETTY_FUNCTION__); + } + cons->addServer(msgbroker); + // subscribe to the queue of commands + cons->addHandler(chaos::common::message::MessagePublishSubscribeBase::ONARRIVE, boost::bind(&PSMServer::messageHandler, this, _1)); + cons->addHandler(chaos::common::message::MessagePublishSubscribeBase::ONERROR, boost::bind(&PSMServer::messageError, this, _1)); + + } catch (std::exception& e) { + throw CException(-2, e.what(), "PSMServer::init"); + } catch (...) { + throw CException(-3, "generic error", "PSMServer::init"); + } +} +void PSMServer::messageHandler( chaos::common::message::ele_t& data) { + int64_t seq_id=-1; + std::string src; + //chaos::common::data::CDWUniquePtr data(d.cd.release()); + if(data.cd->hasKey(RPC_SEQ_KEY)){ + seq_id=data.cd->getInt64Value(RPC_SEQ_KEY); + } + if(data.cd->hasKey(RPC_SRC_UID)){ + src=data.cd->getInt64Value(RPC_SRC_UID); + } + PSMS_LDBG << "Message Received from node:"<<src<<" seq_id:"<<seq_id << " desc:"<<data.cd->getJSONString(); + CDWShrdPtr result_data_pack; + + if(data.cd->hasKey(RPC_SYNC_KEY) && + data.cd->getBoolValue(RPC_SYNC_KEY)) { + + result_data_pack = command_handler->executeCommandSync(MOVE(data.cd)); + } else { + result_data_pack = command_handler->dispatchCommand(MOVE(data.cd)); + } + if(result_data_pack.get()){ + PSMS_LDBG << "Something to send back:"<<seq_id << "to node:"<<src<<" desc:"<<result_data_pack->getJSONString(); + + } + +} +void PSMServer::messageError( chaos::common::message::ele_t& data) { + PSMS_LERR << "ERROR:"; + +} + +//start the rpc adapter +void PSMServer::start() { + PSMS_LAPP << "Subscribing to " << nodeuid + chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX; + cons->subscribe(nodeuid + chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX); +} + +//start the rpc adapter +void PSMServer::stop() { + cons->stop(); + +} + +//deinit the rpc adapter +void PSMServer::deinit() { + + PSMS_LAPP << "PSMServer deinit"; +} diff --git a/chaos/common/rpc/psm/PSMServer.h b/chaos/common/rpc/psm/PSMServer.h new file mode 100644 index 0000000000000000000000000000000000000000..64fc6ed36fb884c6f2ddc485781f2617ca2dbff8 --- /dev/null +++ b/chaos/common/rpc/psm/PSMServer.h @@ -0,0 +1,85 @@ +/* + * Copyright 2012, 2017 INFN + * + * Licensed under the EUPL, Version 1.2 or – as soon they + * will be approved by the European Commission - subsequent + * versions of the EUPL (the "Licence"); + * You may not use this work except in compliance with the + * Licence. + * You may obtain a copy of the Licence at: + * + * https://joinup.ec.europa.eu/software/page/eupl + * + * Unless required by applicable law or agreed to in + * writing, software distributed under the Licence is + * distributed on an "AS IS" basis, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the Licence for the specific language governing + * permissions and limitations under the Licence. + */ + +#ifndef CHAOSFramework_PSMServer_h +#define CHAOSFramework_PSMServer_h +//#pragma GCC diagnostic ignored "-Woverloaded-virtual" + +#include <vector> +#include <boost/thread.hpp> +#include <chaos/common/message/MessagePSDriver.h> + +#include <chaos/common/rpc/RpcServer.h> +#include <chaos/common/utility/ObjectFactoryRegister.h> + + +namespace chaos { + /* + Class that implement the Chaos RPC adapter for 0mq protocoll + */ + DECLARE_CLASS_FACTORY(PSMServer, RpcServer) { + REGISTER_AND_DEFINE_DERIVED_CLASS_FACTORY_HELPER(PSMServer) + // publish subscribe + chaos::common::message::consumer_uptr_t cons; + std::string nodeuid; + PSMServer(const std::string& alias); + virtual ~PSMServer(); + //worker that process request in a separate thread + void messageHandler( chaos::common::message::ele_t& data); + void messageError( chaos::common::message::ele_t& data); + + int sendMessage(void *socket, + void *message_data, + size_t message_size, + bool more_to_send); + + int readMessage(void *socket, + std::string& buffer, + bool& has_next, + std::string *peer_ip = NULL); + public: + + /* + init the rpc adapter + */ + void init(void *init_data); + /* + start the rpc adapter + */ + void start(); + /* + start the rpc adapter + */ + void stop(); + /* + deinit the rpc adapter + */ + void deinit(); + + //server worker thread + /*! + Thread where data is received and managed + */ + void executeOnThread(); + }; + +} +#endif diff --git a/chaos/cu_toolkit/ChaosCUToolkit.cpp b/chaos/cu_toolkit/ChaosCUToolkit.cpp index e052e1397f7492673e777e7fab55ebec8a8e78f7..0d9ff069afb22ebe8eb25a6dcfbc9212c7c97cbe 100644 --- a/chaos/cu_toolkit/ChaosCUToolkit.cpp +++ b/chaos/cu_toolkit/ChaosCUToolkit.cpp @@ -133,8 +133,8 @@ void ChaosCUToolkit::init(void* init_data) { if (GlobalConfiguration::getInstance()->hasOption(InitOption::OPT_LOG_ON_MDS) && GlobalConfiguration::getInstance()->hasOption(InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS)) { - chaos::common::log::LogManager::getInstance()->addMDSLoggingBackend(GlobalConfiguration::getInstance()->getOption<std::string>(InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS)); - nodeuid=GlobalConfiguration::getInstance()->getOption<std::string>(InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS); + nodeuid=GlobalConfiguration::getInstance()->getNodeUID(); + chaos::common::log::LogManager::getInstance()->addMDSLoggingBackend(nodeuid); } //force first allocation of metadata logging diff --git a/chaos/cu_toolkit/control_manager/AbstractControlUnit.cpp b/chaos/cu_toolkit/control_manager/AbstractControlUnit.cpp index 695b55e0b9306aa5b0be59c54d5a777db04e7483..ba39fce54e2013faac321d8d8c635842256fb24b 100644 --- a/chaos/cu_toolkit/control_manager/AbstractControlUnit.cpp +++ b/chaos/cu_toolkit/control_manager/AbstractControlUnit.cpp @@ -2331,7 +2331,7 @@ void AbstractControlUnit::completeInputAttribute() { AbstractSharedDomainCache* AbstractControlUnit::_getAttributeCache() { return attribute_value_shared_cache; } -int AbstractControlUnit::incomingMessage(const std::string& key, const chaos::common::data::CDWShrdPtr& data) { +int AbstractControlUnit::incomingMessage(const std::string& key, chaos::common::data::CDWUniquePtr& data) { ACULDBG_ << "message from :" << key << " data:" << (data.get() ? data->getJSONString() : "NONE"); return 0; } @@ -3447,7 +3447,7 @@ void AbstractControlUnit::metadataLogging(const StandardLoggingChannel::LogLevel message); } -void AbstractControlUnit::consumerHandler(const chaos::common::message::ele_t& data) { +void AbstractControlUnit::consumerHandler( chaos::common::message::ele_t& data) { incomingMessage(data.key, data.cd); } void AbstractControlUnit::updateDataSet(chaos::common::data::CDataWrapper& cd, chaos::DataType::DataSetAttributeIOAttribute io) { diff --git a/chaos/cu_toolkit/control_manager/AbstractControlUnit.h b/chaos/cu_toolkit/control_manager/AbstractControlUnit.h index aebacd5166b256a929ed72d5cd44d60559f7c037..77b319dd0cc3268ebc469a2c15f81437857502ac 100644 --- a/chaos/cu_toolkit/control_manager/AbstractControlUnit.h +++ b/chaos/cu_toolkit/control_manager/AbstractControlUnit.h @@ -562,7 +562,7 @@ class AbstractControlUnit : public DeclareAction, void _setBypassState(bool bypass_stage, bool high_priority = false); - virtual void consumerHandler(const chaos::common::message::ele_t& data); + virtual void consumerHandler( chaos::common::message::ele_t& data); template <typename T> bool _setDrvProp(const std::string& name, const T& value, uint32_t size,bool bi) { @@ -728,7 +728,7 @@ class AbstractControlUnit : public DeclareAction, * @param data pack * @return int return 0 if succefully handled */ - virtual int incomingMessage(const std::string& key, const chaos::common::data::CDWShrdPtr& data); + virtual int incomingMessage(const std::string& key, chaos::common::data::CDWUniquePtr& data); //!callback for put a veto on property value change request virtual bool propertyChangeHandler(const std::string& group_name, diff --git a/chaos/cu_toolkit/control_manager/ControlManager.cpp b/chaos/cu_toolkit/control_manager/ControlManager.cpp index b131d7158f29fe8b1bb3795869572347ec117345..e5ce262f37aadcc8c8e85d238da2f4a86d94e3e2 100644 --- a/chaos/cu_toolkit/control_manager/ControlManager.cpp +++ b/chaos/cu_toolkit/control_manager/ControlManager.cpp @@ -58,7 +58,7 @@ using namespace std; Constructor */ ControlManager::ControlManager() - : publishing_counter_delay(0), use_unit_server(false), use_execution_pools(false), thread_run(false), mds_channel(NULL) { + : publishing_counter_delay(0), use_execution_pools(false), thread_run(false), mds_channel(NULL) { //! register default control unit registerControlUnit<chaos::cu::control_manager::ProxyControlUnit>(); registerControlUnit<chaos::cu::control_manager::script::ScriptableExecutionUnit>(); @@ -77,9 +77,6 @@ void ControlManager::init(void* initParameter) { //control manager action initialization AbstActionDescShrPtr actionDescription; - //check if we need to start the unit server - use_unit_server = GlobalConfiguration::getInstance()->hasOption(InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS); - //check for execution pools use_execution_pools = GlobalConfiguration::getInstance()->hasOption(CONTROL_MANAGER_EXECUTION_POOLS); @@ -90,11 +87,11 @@ void ControlManager::init(void* initParameter) { else throw CException(-2, "Error allcoating metadata server channel", __PRETTY_FUNCTION__); - if (use_unit_server) { + { LCMAPP_ << "Enable unit server"; if (!GlobalConfiguration::getInstance()->hasOption(InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS)) { - throw CException(-1, "No server alias param found", __PRETTY_FUNCTION__); + throw CException(-1, "No required "+std::string(InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS)+" option given", __PRETTY_FUNCTION__); } if (GlobalConfiguration::getInstance()->hasOption(CONTROL_MANAGER_UNIT_SERVER_KEY)) { @@ -115,7 +112,7 @@ void ControlManager::init(void* initParameter) { } } - unit_server_alias = GlobalConfiguration::getInstance()->getOption<std::string>(InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS); + unit_server_alias = GlobalConfiguration::getInstance()->getNodeUID(); //init CU action actionDescription = DeclareAction::addActionDescritionInstance<ControlManager>(this, @@ -179,14 +176,11 @@ void ControlManager::init(void* initParameter) { void ControlManager::start() { LCMAPP_ << "Start cu scan timer"; int err = 0; - if (use_unit_server) { //add unit server registration managment timer - if ((err = chaos_async::AsyncCentralManager::getInstance()->addTimer(this, 0, GlobalConfiguration::getInstance()->getOption<uint64_t>(CONTROL_MANAGER_UNIT_SERVER_REGISTRATION_RETRY_MSEC)))) { - throw chaos::CException(-1, "Error registering the Control managet timer", __PRETTY_FUNCTION__); - } - } else { - startControlUnitSMThread(); + if ((err = chaos_async::AsyncCentralManager::getInstance()->addTimer(this, 0, GlobalConfiguration::getInstance()->getOption<uint64_t>(CONTROL_MANAGER_UNIT_SERVER_REGISTRATION_RETRY_MSEC)))) { + throw chaos::CException(-1, "Error registering the Control managet timer", __PRETTY_FUNCTION__); } + } // start control units state machine thread @@ -688,17 +682,13 @@ void ControlManager::timeout() { //Unpublished case 0: LCMAPP_ << "[Unpublished] Send first registration pack to mds"; - if (use_unit_server) { - if (unit_server_sm.process_event(unit_server_state_machine::UnitServerEventType::UnitServerEventTypePublishing()) == boost::msm::back::HANDLED_TRUE) { - //gone to publishing - sendUnitServerRegistration(); - } else { - LCMERR_ << "[Unpublished] i can't be here"; - } + if (unit_server_sm.process_event(unit_server_state_machine::UnitServerEventType::UnitServerEventTypePublishing()) == boost::msm::back::HANDLED_TRUE) { + //gone to publishing + sendUnitServerRegistration(); } else { - LCMDBG_ << "[Publishing] Unit server registration not sucessfull, turn off the timer"; - TimerHandler::stopMe(); + LCMERR_ << "[Unpublished] i can't be here"; } + break; //Publishing case 1: @@ -720,7 +710,6 @@ void ControlManager::timeout() { case 3: LCMAPP_ << "[Published failed] Perform Unpublishing state"; TimerHandler::stopMe(); - use_unit_server = false; break; } } diff --git a/chaos/cu_toolkit/control_manager/ControlManager.h b/chaos/cu_toolkit/control_manager/ControlManager.h index d1bf27221ae9bbca51eee0de84599d9497c42d80..5de740c07c46a4f6ddaee7c65dfff79865529075 100644 --- a/chaos/cu_toolkit/control_manager/ControlManager.h +++ b/chaos/cu_toolkit/control_manager/ControlManager.h @@ -133,7 +133,6 @@ namespace chaos { //mutable boost::shared_mutex mutex_registration; //unit server state machine - bool use_unit_server; bool use_execution_pools; unsigned int publishing_counter_delay; diff --git a/chaos/cu_toolkit/control_manager/execution_pool/ExecutionPool.cpp b/chaos/cu_toolkit/control_manager/execution_pool/ExecutionPool.cpp index d18cc4cafea318166df1a85bf47dfb00502674dd..680ca1b34b256511058b1b19dc7851d021f8a5f0 100644 --- a/chaos/cu_toolkit/control_manager/execution_pool/ExecutionPool.cpp +++ b/chaos/cu_toolkit/control_manager/execution_pool/ExecutionPool.cpp @@ -49,7 +49,7 @@ ExecutionPoolManager::~ExecutionPoolManager() {} void ExecutionPoolManager::init(void *init_data) { int err = 0; if(GlobalConfiguration::getInstance()->hasOption(InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS)){ - unit_server_alias = GlobalConfiguration::getInstance()->getOption<std::string>(InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS); + unit_server_alias = GlobalConfiguration::getInstance()->getNodeUID(); } execution_pool_list = GlobalConfiguration::getInstance()->getOption< std::vector<std::string> >(CONTROL_MANAGER_EXECUTION_POOLS);