diff --git a/chaos/common/ChaosCommon.cpp b/chaos/common/ChaosCommon.cpp index 1d35309aec47ed318e02f04ecda3bfcbf8fc1409..8f050a3928bc0fcefb9972a699dac0645d9c3a08 100644 --- a/chaos/common/ChaosCommon.cpp +++ b/chaos/common/ChaosCommon.cpp @@ -287,8 +287,8 @@ void ChaosAbstractCommon::init(void *init_data) { //initialize the plugin manager chaos::common::utility::InizializableService::initImplementation(chaos::common::plugin::PluginManager::getInstance(), NULL, "PluginManager", __PRETTY_FUNCTION__); } - if (GlobalConfiguration::getInstance()->hasOption(InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS)) { - nodeuid=GlobalConfiguration::getInstance()->getConfiguration()->getStringValue(InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS); + if (GlobalConfiguration::getInstance()->hasOption(InitOption::OPT_NODEUID)) { + nodeuid=GlobalConfiguration::getInstance()->getConfiguration()->getStringValue(InitOption::OPT_NODEUID); } else { nodeuid=NetworkBroker::getInstance()->getRPCUrl(); } diff --git a/chaos/common/chaos_constants.h b/chaos/common/chaos_constants.h index 8d955840e66e1ab2c98f312eaf0632a1e4c7cdb2..76ae31473a890e2e90f3549dbc35f03e3c15dd51 100644 --- a/chaos/common/chaos_constants.h +++ b/chaos/common/chaos_constants.h @@ -113,11 +113,10 @@ static const char* const OPT_SCRIPT_VM_KV_PARAM = "script-vm-kvp"; static const char* const OPT_REST_POLL_TIME_US = "rest-poll-us"; //!data directory for storage and checkpoint of nodes static const char* const OPT_DATA_DIR = "data-dir"; -#if defined(KAFKA_RDK_ENABLE) || defined(KAFKA_ASIO_ENABLE) + static const char* const OPT_MSG_PRODUCER_KVP = "msgopt-producer-kvp"; static const char* const OPT_MSG_CONSUMER_KVP = "msgopt-consumer-kvp"; -#endif #if CHAOS_PROMETHEUS //! config file parameter @@ -132,7 +131,7 @@ static const char* const OPT_MSG_BROKER_SERVER = "msg-broker-server"; static const char* const OPT_MSG_BROKER_DRIVER = "msg-broker-driver"; static const char* const OPT_HA_ZONE_NAME = "ha-zone-name"; -static const char* const CONTROL_MANAGER_UNIT_SERVER_ALIAS = "unit-server-alias"; +static const char* const OPT_NODEUID = "node-uid"; static const char* OPT_GROUP_NAME ="group-name"; #if ENABLE_ZMQ_MONITOR @@ -247,6 +246,8 @@ static const char* const NODE_SUB_TYPE = "ndk_sub_type"; that is given by the network broker where the node si attacched. */ static const char* const NODE_RPC_ADDR = "ndk_rpc_addr"; +static const char* const NODE_IP_ADDR = "ndk_ip_addr"; + //! identify the node rest port if any /*! diff --git a/chaos/common/configuration/GlobalConfiguration.cpp b/chaos/common/configuration/GlobalConfiguration.cpp index fc4028f629b84155a58babb4135ed7aff49cf0f1..46fb7c681056ba2a60a03c42ed128e823e50729f 100644 --- a/chaos/common/configuration/GlobalConfiguration.cpp +++ b/chaos/common/configuration/GlobalConfiguration.cpp @@ -120,7 +120,7 @@ void GlobalConfiguration::preParseStartupParameters() { addOption(InitOption::OPT_MSG_BROKER_DRIVER, po::value< std::string >()->default_value(std::string("kafka-rdk")), "Message broker driver"); addOption(InitOption::OPT_GROUP_NAME, po::value< std::string >()->default_value(std::string("")), "Group Name"); - addOption(InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS, po::value< std::string >()/*->default_value(std::string("NONAME"))*/,"UID of the node"); + addOption(InitOption::OPT_NODEUID, po::value< std::string >()/*->default_value(std::string("NONAME"))*/,"UID of the node"); # @@ -291,8 +291,12 @@ void GlobalConfiguration::checkDefaultOption() { CHECK_AND_DEFINE_CONFIG_OPTION(std::string,InitOption::OPT_MSG_BROKER_SERVER); CHECK_AND_DEFINE_CONFIG_OPTION(std::string,InitOption::OPT_NODE_DESC); - CHECK_AND_DEFINE_CONFIG_OPTION(std::string,chaos::InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS); + CHECK_AND_DEFINE_CONFIG_OPTION(std::string,chaos::InitOption::OPT_NODEUID); CHECK_AND_DEFINE_CONFIG_OPTION(std::string,InitOption::OPT_MSG_BROKER_DRIVER); + #if defined(KAFKA_RDK_ENABLE) || defined(KAFKA_ASIO_ENABLE) + CHECK_AND_DEFINE_CONFIG_OPTION(std::vector<std::string>,InitOption::OPT_MSG_PRODUCER_KVP); + CHECK_AND_DEFINE_CONFIG_OPTION(std::vector<std::string>,InitOption::OPT_MSG_CONSUMER_KVP); + #endif CHECK_AND_DEFINE_OPTION(bool, OPT_RPC_SYNC_ENABLE, InitOption::OPT_RPC_SYNC_ENABLE) else{ OPT_RPC_SYNC_ENABLE = false; @@ -550,7 +554,7 @@ std::string GlobalConfiguration::getDesc(){ } std::string GlobalConfiguration::getNodeUID(){ - return configuration->getStringValue(chaos::InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS); + return configuration->getStringValue(chaos::InitOption::OPT_NODEUID); } diff --git a/chaos/common/direct_io/impl/PSMDirectIOClient.cpp b/chaos/common/direct_io/impl/PSMDirectIOClient.cpp new file mode 100644 index 0000000000000000000000000000000000000000..5655fc40bd4885fad9d82e506093b83768cd3dac --- /dev/null +++ b/chaos/common/direct_io/impl/PSMDirectIOClient.cpp @@ -0,0 +1,140 @@ +/* + * Copyright 2012, 2017 INFN + * + * Licensed under the EUPL, Version 1.2 or – as soon they + * will be approved by the European Commission - subsequent + * versions of the EUPL (the "Licence"); + * You may not use this work except in compliance with the + * Licence. + * You may obtain a copy of the Licence at: + * + * https://joinup.ec.europa.eu/software/page/eupl + * + * Unless required by applicable law or agreed to in + * writing, software distributed under the Licence is + * distributed on an "AS IS" basis, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the Licence for the specific language governing + * permissions and limitations under the Licence. + */ +#include <chaos/common/utility/UUIDUtil.h> +#include <chaos/common/utility/InetUtility.h> +#include <chaos/common/direct_io/impl/PSMDirectIOClient.h> +#include <chaos/common/data/cache/FastHash.h> + +#include <boost/algorithm/string.hpp> +#include <boost/format.hpp> + +#include <string.h> +#include <assert.h> /* assert */ + +#define PSMDIO_LOG_HEAD "["<<getName()<<"] - " + +#define PSMDIOLAPP_ LAPP_ << PSMDIO_LOG_HEAD +#define PSMDIOLDBG_ LDBG_ << PSMDIO_LOG_HEAD << __FUNCTION__ << " - " +#define PSMDIOLERR_ LERR_ << PSMDIO_LOG_HEAD + +using namespace chaos::common::utility; + +using namespace chaos::common::direct_io; +using namespace chaos::common::direct_io::impl; + +typedef boost::unique_lock<boost::shared_mutex> PSMDirectIOClientWriteLock; +typedef boost::shared_lock<boost::shared_mutex> PSMDirectIOClientReadLock; + +DEFINE_CLASS_FACTORY(PSMDirectIOClient, DirectIOClient); + +//------------------------------STATIC METHOD--------------------------------- + + +PSMDirectIOClient::PSMDirectIOClient(std::string alias): +DirectIOClient(alias), +priority_port(0), +service_port(0), +thread_run(false), +zmq_context(NULL){}; + +PSMDirectIOClient::~PSMDirectIOClient(){}; + +//! Initialize instance +void PSMDirectIOClient::init(void *init_data) { + int err = 0; + MapPSMConfiguration default_configuration; + default_configuration["PSM_IO_THREADS"] = "1"; + + DirectIOClient::init(init_data); + PSMDIOLDBG_ << "Allocating zmq context"; + thread_run= true; + zmq_context = zmq_ctx_new(); + if(zmq_context == NULL) throw chaos::CException(0, "Error creating zmq context", __FUNCTION__); + if((err = PSMBaseClass::configureContextWithStartupParameter(zmq_context, + default_configuration, + chaos::GlobalConfiguration::getInstance()->getDirectIOClientImplKVParam(), + "PSM DirectIO Client"))) { + throw chaos::CException(2, "Error configuring service socket", __FUNCTION__); + } + + + PSMDIOLDBG_ << "Inizilizing zmq implementation with zmq lib version = " << PSM_VERSION; + PSMDIOLDBG_ << "Set number of thread for the contex"; + zmq_ctx_set(zmq_context, PSM_IO_THREADS, 2); + + PSMDIOLDBG_ << "Initialized"; +} + +//! Deinit the implementation +void PSMDirectIOClient::deinit() { + int err = 0; + //remove all active connection (never need to be exists at this step) + map_connections.clearElement(); + //destroy the zmq context + PSMDIOLDBG_ << "Destroing zmq context"; + thread_run = false; + err = zmq_ctx_destroy(zmq_context); + if(err) PSMDIOLERR_ << "Error closing context"; + //monitor_thread_group.join_all(); + + zmq_context = NULL; + PSMDIOLDBG_ << "PSM context destroyed"; + DirectIOClient::deinit(); +} + +DirectIOClientConnection *PSMDirectIOClient::_getNewConnectionImpl(std::string server_description, + uint16_t endpoint) { + //allocate client + PSMDirectIOClientConnection *connection = new PSMDirectIOClientConnection(zmq_context, + server_description, + endpoint); + if(connection == NULL) return NULL; + try{ + InizializableService::initImplementation(connection, NULL, "PSMDirectIOClientConnection", __PRETTY_FUNCTION__); + //register client with the hash of the xzmq decoded endpoint address (tcp://ip:port) + DEBUG_CODE(PSMDIOLDBG_ << "Register client for " << server_description << " with zmq decoded hash " << connection->getUniqueUUID();) + map_connections.registerElement(connection->getUniqueUUID(), connection); + } catch (...) { + PSMDIOLERR_ << CHAOS_FORMAT("We got error initilizing connection to %1%:%2% so we goning to deinitilize it an return NULL channel", %server_description%endpoint); + //in case of error + CHAOS_NOT_THROW(InizializableService::deinitImplementation(connection, "PSMDirectIOClientConnection", __PRETTY_FUNCTION__);); + connection = NULL; + } + return connection; +} + +void PSMDirectIOClient::_releaseConnectionImpl(DirectIOClientConnection *connection_to_release) { + PSMDirectIOClientConnection *conn=reinterpret_cast<PSMDirectIOClientConnection*>(connection_to_release); + if(!conn) return; + CHAOS_NOT_THROW(InizializableService::deinitImplementation(conn, "PSMDirectIOClientConnection", __PRETTY_FUNCTION__);); + //CHAOS_ASSERT(conn->monitor_info) + //stop the monitor + DEBUG_CODE(PSMDIOLDBG_ << "Release the connection for: " << connection_to_release->getServerDescription() <<" ptr:"<<std::hex<<(uint64_t)connection_to_release;) + map_connections.deregisterElementKey(conn->getUniqueUUID()); + delete(connection_to_release); +} + +void PSMDirectIOClient::freeObject(const DCKeyObjectContainer::TKOCElement& element) { + if(!element.element) return; + DirectIOClientConnection *connection = element.element; + DEBUG_CODE(PSMDIOLDBG_ << "Autorelease connection for " << connection->getServerDescription();) + releaseConnection(connection); +} diff --git a/chaos/common/direct_io/impl/PSMDirectIOClient.h b/chaos/common/direct_io/impl/PSMDirectIOClient.h new file mode 100644 index 0000000000000000000000000000000000000000..963614abc12256b5aeadc444c27e145a79796ba3 --- /dev/null +++ b/chaos/common/direct_io/impl/PSMDirectIOClient.h @@ -0,0 +1,90 @@ + +/* + * PSMDirectIOClient.h + * !CHAOS + * Created by Bisegni Claudio. + * + * Copyright 2012 INFN, National Institute of Nuclear Physics + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __CHAOSFramework__PSMDirectIOClient__ +#define __CHAOSFramework__PSMDirectIOClient__ + +#include <map> + +#include <chaos/common/direct_io/DirectIOClient.h> +#include <chaos/common/utility/ObjectFactoryRegister.h> +#include <chaos/common/direct_io/impl/PSMDirectIOClientConnection.h> + +#include <boost/thread.hpp> +#include <boost/shared_ptr.hpp> + +#include <zmq.h> + +namespace chaos { + namespace common { + namespace direct_io { + namespace channel { + class DirectIOVirtualClientChannel; + } + namespace impl { + + // PSM Direct IO Implementation + /*! + */ + DECLARE_CLASS_FACTORY(PSMDirectIOClient, DirectIOClient), + private PSMBaseClass { + REGISTER_AND_DEFINE_DERIVED_CLASS_FACTORY_HELPER(PSMDirectIOClient) + + friend class DirectIOVirtualClientChannel; + + int32_t priority_port; + + int32_t service_port; + + void *zmq_context; + + bool thread_run; + boost::thread_group monitor_thread_group; + + std::map<uint32_t, ConnectionMonitorInfo*> map_connection_socket_monitor; + + PSMDirectIOClient(std::string alias); + + ~PSMDirectIOClient(); + + protected: + //overriding ofr free object fuunction for the tempalted key object container superclass + void freeObject(const DCKeyObjectContainer::TKOCElement& element); + + //! get new connection + DirectIOClientConnection *_getNewConnectionImpl(std::string server_description, uint16_t endpoint); + + //! release an instantiated connection + void _releaseConnectionImpl(DirectIOClientConnection *connection_to_release); + public: + + //! Initialize instance + void init(void *init_data); + + + //! Deinit the implementation + void deinit(); + }; + } + } + } +} + +#endif /* defined(__CHAOSFramework__DirectIOPSMClient__) */ diff --git a/chaos/common/direct_io/impl/PSMDirectIOServer.cpp b/chaos/common/direct_io/impl/PSMDirectIOServer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..49f79d8d322e3d4e3ffbc2519be43b1353245dcf --- /dev/null +++ b/chaos/common/direct_io/impl/PSMDirectIOServer.cpp @@ -0,0 +1,340 @@ +/* + * Copyright 2012, 2017 INFN + * + * Licensed under the EUPL, Version 1.2 or – as soon they + * will be approved by the European Commission - subsequent + * versions of the EUPL (the "Licence"); + * You may not use this work except in compliance with the + * Licence. + * You may obtain a copy of the Licence at: + * + * https://joinup.ec.europa.eu/software/page/eupl + * + * Unless required by applicable law or agreed to in + * writing, software distributed under the Licence is + * distributed on an "AS IS" basis, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the Licence for the specific language governing + * permissions and limitations under the Licence. + */ + +#include <chaos/common/configuration/GlobalConfiguration.h> +#include <chaos/common/utility/UUIDUtil.h> +#include <chaos/common/data/CDataWrapper.h> +#include <chaos/common/direct_io/DirectIODataPack.h> +#include <chaos/common/direct_io/impl/PSMDirectIOServer.h> +#include <boost/format.hpp> + + +#define PSMDIO_SRV_LOG_HEAD "["<<getName()<<"] - " + +#define PSMDIO_SRV_LAPP_ LAPP_ << PSMDIO_SRV_LOG_HEAD +#define PSMDIO_SRV_LDBG_ LDBG_ << PSMDIO_SRV_LOG_HEAD +#define PSMDIO_SRV_LERR_ LERR_ << PSMDIO_SRV_LOG_HEAD + + +#define DIRECTIO_FREE_ANSWER_DATA(x)\ +if(x && x->answer_data) free(x->answer_data);\ +if(x) free(x);\ +x = NULL; + +#define INPROC_PRIORITY "inproc://priority" +#define INPROC_SERVICE "inproc://service" + +namespace chaos_data = chaos::common::data; + +using namespace chaos::common::direct_io::impl; +using namespace chaos::common::direct_io; + +DEFINE_CLASS_FACTORY(PSMDirectIOServer, DirectIOServer); + +PSMDirectIOServer::PSMDirectIOServer(std::string alias): +DirectIOServer(alias), +zmq_context(NULL), +run_server(false), +direct_io_thread_number(2){}; + +PSMDirectIOServer::~PSMDirectIOServer(){}; + +//! Initialize instance +void PSMDirectIOServer::init(void *init_data) { + + chaos_data::CDataWrapper *init_cw = static_cast<chaos_data::CDataWrapper*>(init_data); + if(!init_cw) throw chaos::CException(0, "No configration has been provided", __PRETTY_FUNCTION__); + LDBG_<<"configuration:"<<init_cw->getCompliantJSONString(); + //get the port from configuration + priority_port = init_cw->getInt32Value(common::direct_io::DirectIOConfigurationKey::DIRECT_IO_PRIORITY_PORT); + if(priority_port <= 0) throw chaos::CException(0, "Bad priority port configured", __PRETTY_FUNCTION__); + + service_port = init_cw->getInt32Value(common::direct_io::DirectIOConfigurationKey::DIRECT_IO_SERVICE_PORT); + if(service_port <= 0) throw chaos::CException(0, "Bad service port configured", __PRETTY_FUNCTION__); + DirectIOServer::init(init_data); + + //create the endpoint strings + priority_socket_bind_str = boost::str( boost::format("tcp://*:%1%") % priority_port); + PSMDIO_SRV_LDBG_ << "priority socket bind url: " << priority_socket_bind_str; + + service_socket_bind_str = boost::str( boost::format("tcp://*:%1%") % service_port); + PSMDIO_SRV_LDBG_ << "service socket bind url: " << service_socket_bind_str; +} + +//! Start the implementation +void PSMDirectIOServer::start() { + int err = 0; + MapPSMConfiguration default_context_configuration; + default_context_configuration["PSM_IO_THREADS"] = "1"; + + direct_io_thread_number = 1; + DirectIOServer::start(); + run_server = true; + + //get custm configuration for direct io server + if(GlobalConfiguration::getInstance()->hasOption(InitOption::OPT_DIRECT_IO_SERVER_THREAD_NUMBER)) { + direct_io_thread_number = GlobalConfiguration::getInstance()->getOption<uint32_t>(InitOption::OPT_DIRECT_IO_SERVER_THREAD_NUMBER); + } + + //create the PSMContext + zmq_context = zmq_ctx_new(); + if(zmq_context == NULL) throw chaos::CException(0, "Error creating zmq context", __PRETTY_FUNCTION__); + if((err = PSMBaseClass::configureContextWithStartupParameter(zmq_context, + default_context_configuration, + chaos::GlobalConfiguration::getInstance()->getDirectIOServerImplKVParam(), + "PSM DirectIO Server"))) { + throw chaos::CException(err, "Error configuring zmq context", __PRETTY_FUNCTION__); + } + + //queue thread + PSMDIO_SRV_LDBG_ << CHAOS_FORMAT("Allocating and binding socket to %1%/%2%",%priority_socket_bind_str%service_socket_bind_str); + try{ + //start the treads for the proxies + server_threads_group.add_thread(new boost::thread(boost::bind(&PSMDirectIOServer::poller, + this, + priority_socket_bind_str, + INPROC_PRIORITY))); + server_threads_group.add_thread(new boost::thread(boost::bind(&PSMDirectIOServer::poller, + this, + service_socket_bind_str, + INPROC_SERVICE))); + //thread for service worker + direct_io_thread_number--;//remove one thread because it is the default one + server_threads_group.add_thread(new boost::thread(boost::bind(&PSMDirectIOServer::worker, + this, + WorkerTypePriority, + &DirectIOHandler::priorityDataReceived))); + server_threads_group.add_thread(new boost::thread(boost::bind(&PSMDirectIOServer::worker, + this, + WorkerTypeService, + &DirectIOHandler::serviceDataReceived))); + //threads for priority worker + for(int idx_thrd = 0; + idx_thrd < direct_io_thread_number; + idx_thrd++) { + server_threads_group.add_thread(new boost::thread(boost::bind(&PSMDirectIOServer::worker, + this, + WorkerTypePriority, + &DirectIOHandler::priorityDataReceived))); + server_threads_group.add_thread(new boost::thread(boost::bind(&PSMDirectIOServer::worker, + this, + WorkerTypeService, + &DirectIOHandler::serviceDataReceived))); + } + PSMDIO_SRV_LDBG_ << CHAOS_FORMAT("PSM high priority socket managed by %1% threads", %direct_io_thread_number); + } catch(boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::lock_error> >& lock_error_exception) { + PSMDIO_SRV_LERR_ << lock_error_exception.what(); + throw chaos::CException(0, std::string(lock_error_exception.what()), __PRETTY_FUNCTION__); + } + PSMDIO_SRV_LDBG_ << "Threads allocated and started"; +} + +//! Stop the implementation +void PSMDirectIOServer::stop() { + run_server = false; + DirectIOServer::stop(); + PSMDIO_SRV_LDBG_ << "Deallocating zmq context"; + zmq_ctx_shutdown(zmq_context); + zmq_ctx_term(zmq_context); + PSMDIO_SRV_LDBG_ << "PSM Context deallocated"; + + //wiath all thread + PSMDIO_SRV_LDBG_ << "Join on all thread"; + server_threads_group.join_all(); + PSMDIO_SRV_LDBG_ << "All thread stopped"; +} + +//! Deinit the implementation +void PSMDirectIOServer::deinit() { + //serverThreadGroup.stopGroup(true); + + DirectIOServer::deinit(); +} + +void PSMDirectIOServer::poller(const std::string& public_url, + const std::string& inproc_url) { + int err = 0; + void *public_socket = NULL; + void *inrpoc_socket = NULL; + MapPSMConfiguration default_socket_configuration; + + MapPSMConfiguration proxy_empty_default_configuration; + MapPSMConfiguration proxy_socket_configuration; + + default_socket_configuration["PSM_LINGER"] = "500"; + default_socket_configuration["PSM_RCVHWM"] = "1000"; + default_socket_configuration["PSM_SNDHWM"] = "1000"; + default_socket_configuration["PSM_RCVTIMEO"] = "-1"; + default_socket_configuration["PSM_SNDTIMEO"] = "1000"; + + proxy_socket_configuration["PSM_LINGER"] = "500"; + //keep space for 2 compelte direct io message(3 message part) for every working thread + proxy_socket_configuration["PSM_RCVHWM"] = "1000";//boost::lexical_cast<std::string>((direct_io_thread_number*3)*2); + proxy_socket_configuration["PSM_SNDHWM"] = "1000"; + proxy_socket_configuration["PSM_RCVTIMEO"] = "-1"; + proxy_socket_configuration["PSM_SNDTIMEO"] = "1000"; + + PSMDIO_SRV_LDBG_ << CHAOS_FORMAT("Enter pooler for %1%", %public_url); + //start creating two socker for service and priority + PSMDIO_SRV_LDBG_ << "Allocating and binding priority socket to "<< priority_socket_bind_str; + + public_socket = zmq_socket (zmq_context, PSM_ROUTER); + if(public_socket == NULL){ + return; + } + if((err = PSMBaseClass::configureSocketWithStartupParameter(public_socket, + default_socket_configuration, + chaos::GlobalConfiguration::getInstance()->getDirectIOServerImplKVParam(), + CHAOS_FORMAT("PSM DirectIO Server socket bind %1%", %public_url)))){ + return; + } + + if((err = zmq_bind(public_socket, public_url.c_str()))){ + return; + } + //create proxy for priority + inrpoc_socket = zmq_socket (zmq_context, PSM_DEALER); + if(inrpoc_socket == NULL) { + return; + } + if((err = PSMBaseClass::configureSocketWithStartupParameter(inrpoc_socket, + proxy_socket_configuration, + proxy_empty_default_configuration, + CHAOS_FORMAT("PSM DirectIO Server proxy bind %1%", %inproc_url)))){ + return; + } + + if((err = zmq_bind(inrpoc_socket, inproc_url.c_str()))) { + return; + } + + try { + zmq_proxy(public_socket, inrpoc_socket, NULL); + }catch (std::exception &e) {} + if(public_socket) { + if((err = zmq_unbind(public_socket, public_url.c_str()))){ + PSMDIO_SRV_LERR_ << CHAOS_FORMAT("Error %1% unbindind socket for %2%", %err%public_url); + } + if((err = zmq_close(public_socket))){ + PSMDIO_SRV_LERR_ << CHAOS_FORMAT("Error %1% closing socket for %2%", %err%public_url); + } + if(inrpoc_socket) { + if((err = zmq_close(inrpoc_socket))){ + PSMDIO_SRV_LERR_ << CHAOS_FORMAT("Error %1% closing proxy for %2%", %err%public_url); + } + inrpoc_socket = NULL; + } + public_socket = NULL; + } + PSMDIO_SRV_LDBG_ << CHAOS_FORMAT("Leaving pooler for %1%", %public_url); +} + +void PSMDirectIOServer::worker(unsigned int w_type, + DirectIOHandlerPtr delegate) { + int err = 0; + + std::string identity; + void *worker_socket = NULL; + bool send_synchronous_answer = false; + + DirectIODataPackSPtr data_pack_received; + DirectIODataPackSPtr data_pack_answer; + + MapPSMConfiguration worker_empty_default_configuration; + MapPSMConfiguration worker_socket_configuration; + worker_socket_configuration["PSM_LINGER"] = "500"; + worker_socket_configuration["PSM_RCVHWM"] = "1000"; + worker_socket_configuration["PSM_SNDHWM"] = "1000"; + worker_socket_configuration["PSM_RCVTIMEO"] = "-1"; + worker_socket_configuration["PSM_SNDTIMEO"] = "1000"; + + if((worker_socket = zmq_socket(zmq_context, + PSM_DEALER)) == NULL) { + PSMDIO_SRV_LERR_ << "Error creating worker socket"; + return; + } + + if((err = PSMBaseClass::configureSocketWithStartupParameter(worker_socket, + worker_socket_configuration, + worker_empty_default_configuration, + "PSM DirectIO Server worker"))){ + return; + } + + if((err = PSMBaseClass::configureSocketWithStartupParameter(worker_socket, + worker_socket_configuration, + worker_empty_default_configuration, + "PSM DirectIO Server worker"))){ + return; + } + + if(w_type == WorkerTypePriority) { + if((err = PSMBaseClass::connectSocket(worker_socket, + INPROC_PRIORITY, + "PSM Server Worker"))) { + PSMDIO_SRV_LERR_ << CHAOS_FORMAT("Error connecting worker socket with error %1%",%err); + return; + } + } else if(w_type == WorkerTypeService) { + if((err = PSMBaseClass::connectSocket(worker_socket, + INPROC_SERVICE, + "PSM Server Worker"))) { + PSMDIO_SRV_LERR_ << CHAOS_FORMAT("Error connecting worker socket with error %1%",%err); + return; + } + } + + PSMDIO_SRV_LDBG_ << "Entering in the thread loop for worker socket"; + while (run_server) { + try { + if((err = reveiceDatapack(worker_socket, + identity, + data_pack_received))) { + data_pack_received.reset(); + continue; + } else { + //keep track if the cleint want the answer + send_synchronous_answer = data_pack_received->header.dispatcher_header.fields.synchronous_answer; + //call handler + if((err = DirectIOHandlerPtrCaller(handler_impl, delegate)(MOVE(data_pack_received), + data_pack_answer)) == 0) { + if(send_synchronous_answer && + data_pack_answer) { + + if((err = sendDatapack(worker_socket, + identity, + MOVE(data_pack_answer)))){ + PSMDIO_SRV_LERR_ << CHAOS_FORMAT("Error sending answer with code %1%", %err); + } + } + } else { + PSMDIO_SRV_LERR_ << CHAOS_FORMAT("Error dispatching received message with code %1%", %err); + } + } + } catch (CException& ex) { + DECODE_CHAOS_EXCEPTION(ex) + } + } + PSMDIO_SRV_LDBG_ << "Leaving the thread loop for worker socket"; + if((err = zmq_close(worker_socket))) { + PSMDIO_SRV_LERR_ << CHAOS_FORMAT("Error closing worker socket with error %1%",%err); + } +} diff --git a/chaos/common/direct_io/impl/PSMDirectIOServer.h b/chaos/common/direct_io/impl/PSMDirectIOServer.h new file mode 100644 index 0000000000000000000000000000000000000000..4b3cf11747019dfc8209a3a7017fc31a4fdddd36 --- /dev/null +++ b/chaos/common/direct_io/impl/PSMDirectIOServer.h @@ -0,0 +1,84 @@ +/* + * Copyright 2012, 2017 INFN + * + * Licensed under the EUPL, Version 1.2 or – as soon they + * will be approved by the European Commission - subsequent + * versions of the EUPL (the "Licence"); + * You may not use this work except in compliance with the + * Licence. + * You may obtain a copy of the Licence at: + * + * https://joinup.ec.europa.eu/software/page/eupl + * + * Unless required by applicable law or agreed to in + * writing, software distributed under the Licence is + * distributed on an "AS IS" basis, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the Licence for the specific language governing + * permissions and limitations under the Licence. + */ +#ifndef __CHAOSFramework__PSMDirectIOServer__ +#define __CHAOSFramework__PSMDirectIOServer__ + +#include <string> + +#include <chaos/common/direct_io/DirectIOServer.h> +#include <chaos/common/utility/ObjectFactoryRegister.h> + +#include <boost/thread.hpp> + +#include <zmq.h> + +namespace chaos { + namespace common { + namespace direct_io { + namespace impl { + + typedef enum WorkerType { + WorkerTypePriority = 1, + WorkerTypeService = 2 + } WorkerType; + + DECLARE_CLASS_FACTORY(PSMDirectIOServer, DirectIOServer), + private PSMBaseClass { + REGISTER_AND_DEFINE_DERIVED_CLASS_FACTORY_HELPER(PSMDirectIOServer) + void *zmq_context; + unsigned int direct_io_thread_number; + boost::thread_group server_threads_group; + + bool run_server; + + std::string priority_socket_bind_str; + + std::string service_socket_bind_str; + + void poller(const std::string& public_url, + const std::string& inproc_url); + void worker(unsigned int w_type, + DirectIOHandlerPtr delegate); + + + PSMDirectIOServer(std::string alias); + ~PSMDirectIOServer(); + public: + + //! Initialize instance + void init(void *init_data); + + //! Start the implementation + void start(); + + //! Stop the implementation + void stop(); + + //! Deinit the implementation + void deinit(); + }; + } + } + } +} + + +#endif /* defined(__CHAOSFramework__PSMDirectIOServer__) */ diff --git a/chaos/common/rpc/psm/PSMClient.cpp b/chaos/common/rpc/psm/PSMClient.cpp index 78cbf162d9d8d3e497b883a11cb3ac339a787971..a3638ae849c2f8a7efbb68e0931ce1f3991a3b4f 100644 --- a/chaos/common/rpc/psm/PSMClient.cpp +++ b/chaos/common/rpc/psm/PSMClient.cpp @@ -69,10 +69,10 @@ void PSMClient::init(void *init_data) { if(!cfg->hasKey(InitOption::OPT_MSG_BROKER_SERVER)){ throw chaos::CException(-1, "a not empty broker must be given", __PRETTY_FUNCTION__); } - if(!cfg->hasKey(chaos::InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS)){ + if(!cfg->hasKey(chaos::InitOption::OPT_NODEUID)){ throw chaos::CException(-1, "a not empty and unique id must be given", __PRETTY_FUNCTION__); } - nodeuid = cfg->getStringValue(chaos::InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS); + nodeuid = cfg->getStringValue(chaos::InitOption::OPT_NODEUID); std::string msgbrokerdrv = "kafka-rdk"; if(cfg->hasKey(InitOption::OPT_MSG_BROKER_DRIVER)){ msgbrokerdrv = cfg->getStringValue(chaos::InitOption::OPT_MSG_BROKER_DRIVER); @@ -135,12 +135,12 @@ bool PSMClient::submitMessage(NFISharedPtr forwardInfo, throw CException(0, "No message in description", __PRETTY_FUNCTION__); if(forwardInfo->message->hasKey(RpcActionDefinitionKey::CS_CMDM_ANSWER_HOST_IP)){ forwardInfo->message->removeKey(RpcActionDefinitionKey::CS_CMDM_ANSWER_HOST_IP); - forwardInfo->message->addStringValue(RpcActionDefinitionKey::CS_CMDM_ANSWER_HOST_IP,nodeuid); + forwardInfo->message->addStringValue(RpcActionDefinitionKey::CS_CMDM_ANSWER_HOST_IP,nodeuid+chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX); } forwardInfo->message->addBoolValue(RPC_SYNC_KEY, RpcClient::syncrhonous_call); forwardInfo->message->addInt64Value(RPC_SEQ_KEY, (++seq_id)); - std::string key=forwardInfo->destinationAddr+ chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX; + std::string key=forwardInfo->destinationAddr; PSMC_LDBG<<"Sending message to:"<<forwardInfo->destinationAddr<<" ("<<key<<") msg:"<<forwardInfo->message->getJSONString(); prod->pushMsgAsync(*forwardInfo->message.get(),key); diff --git a/chaos/common/rpc/psm/PSMServer.cpp b/chaos/common/rpc/psm/PSMServer.cpp index c37b59e981e368af5cd42b7090c0703aee967285..2bb42d2b4421f4b8b2e95dc5e7664a65b7dc4796 100644 --- a/chaos/common/rpc/psm/PSMServer.cpp +++ b/chaos/common/rpc/psm/PSMServer.cpp @@ -54,10 +54,10 @@ void PSMServer::init(void *init_data) { if(!cfg->hasKey(InitOption::OPT_MSG_BROKER_SERVER)){ throw chaos::CException(-1, "a not empty broker must be given", __PRETTY_FUNCTION__); } - if(!cfg->hasKey(chaos::InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS)){ + if(!cfg->hasKey(chaos::InitOption::OPT_NODEUID)){ throw chaos::CException(-1, "a not empty and unique id must be given", __PRETTY_FUNCTION__); } - nodeuid = cfg->getStringValue(chaos::InitOption::CONTROL_MANAGER_UNIT_SERVER_ALIAS); + nodeuid = cfg->getStringValue(chaos::InitOption::OPT_NODEUID); std::string msgbrokerdrv = "kafka-rdk"; if(cfg->hasKey(InitOption::OPT_MSG_BROKER_DRIVER)){ msgbrokerdrv = cfg->getStringValue(chaos::InitOption::OPT_MSG_BROKER_DRIVER); @@ -89,8 +89,8 @@ void PSMServer::init(void *init_data) { throw chaos::CException(-1, "cannot initialize Publish Subscribe:" + cons->getLastError(), __PRETTY_FUNCTION__); } if(cfg->hasKey("ismds")){ - PSMS_LAPP << "Subscribing to " <<chaos::common::constants::CHAOS_ADMIN_ADMIN_TOPIC+ std::string(chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX); - cons->subscribe(chaos::common::constants::CHAOS_ADMIN_ADMIN_TOPIC+ std::string(chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX)); + PSMS_LAPP << "Subscribing to " <<chaos::common::constants::CHAOS_ADMIN_ADMIN_TOPIC; + cons->subscribe(chaos::common::constants::CHAOS_ADMIN_ADMIN_TOPIC); } } catch (std::exception& e) { throw CException(-2, e.what(), "PSMServer::init"); @@ -121,7 +121,7 @@ void PSMServer::messageHandler( chaos::common::message::ele_t& data) { if(result_data_pack.get() && src.size()){ PSMS_LDBG << "Something to send back:"<<seq_id << "to node:"<<src<<" desc:"<<result_data_pack->getJSONString(); - prod->pushMsgAsync(*result_data_pack.get(),src+chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX); + prod->pushMsgAsync(*result_data_pack.get(),src); } }