Skip to content
Snippets Groups Projects
GlobalConfiguration.cpp 29.6 KiB
Newer Older
  • Learn to ignore specific revisions
  •  * Copyright 2012, 2017 INFN
    
     * Licensed under the EUPL, Version 1.2 or – as soon they
     * will be approved by the European Commission - subsequent
     * versions of the EUPL (the "Licence");
     * You may not use this work except in compliance with the
     * Licence.
     * You may obtain a copy of the Licence at:
    
     * https://joinup.ec.europa.eu/software/page/eupl
    
     * Unless required by applicable law or agreed to in
     * writing, software distributed under the Licence is
     * distributed on an "AS IS" basis,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
     * express or implied.
     * See the Licence for the specific language governing
     * permissions and limitations under the Licence.
    
    #include <iostream>
    
    #include <chaos/common/log/LogManager.h>
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
    
    
    #include <boost/filesystem.hpp>
    
    //#include <boost/algorithm/string.hpp>
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
    #include <regex>
    
    #include <chaos/common/ChaosCommon.h>
    
    Claudio Bisegni's avatar
    Claudio Bisegni committed
    #include "GlobalConfiguration.h"
    
    #include <chaos/common/external_unit/external_unit.h>
    
    Claudio Bisegni's avatar
    Claudio Bisegni committed
    using namespace chaos;
    
    using namespace chaos::common::data;
    
    using namespace chaos::common::utility;
    
    Claudio Bisegni's avatar
    Claudio Bisegni committed
    namespace po = boost::program_options;
    
    namespace ext_unt = chaos::common::external_unit;
    
    Claudio Bisegni's avatar
    Claudio Bisegni committed
    
    
    #define _RPC_PORT					8888
    
    #define _DIRECT_IO_PRIORITY_PORT	1672
    
    #define _DIRECT_IO_SERVICE_PORT		30175
    
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
    namespace chaos{
        void fillKVParameter(std::map<std::string, std::string>& kvmap,
                                              const std::vector<std::string>& kv_vector,
                                              const std::string& regex) {
        //no cache server provided
        //clear previosly pair
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
        const std::regex rx(regex);
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
        for(std::vector<std::string>::const_iterator it = kv_vector.begin(),
            end = kv_vector.end();
            it != end;
            it++) {
            
            const std::string& kv_param_value = *it;
            
            /*if(regex.size() && kv_param_value.size()&&
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
               !std::regex_match(kv_param_value,rx)) {
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
                   std::stringstream ss;
                   ss<<"Malformed kv parameter string:"<<kv_param_value<<" regex:"<<regex;
                   LERR_<<ss.str();
                  // throw chaos::CException(-3,ss.str(), __PRETTY_FUNCTION__);
               } else*/ {
                        
                        std::vector<std::string> kv_splitted;
                        
                        //get new pair
    
                       /* boost::algorithm::split(kv_splitted,
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
                                                kv_param_value,
                                                boost::algorithm::is_any_of(":"),
    
                                                boost::algorithm::token_compress_on);*/
    
                        kv_splitted=chaos::split(kv_param_value,":",true);
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
                        // add key/value pair
                        if((kv_splitted.size()>1)&&(kv_splitted[0].size())){
                         kvmap.insert(make_pair(kv_splitted[0], kv_splitted[1]));
                        }
            }
        }
    }
    
    };
    
    GlobalConfiguration::GlobalConfiguration():
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
    desc(new  po::options_description("!CHAOS Framework Allowed options")){}
    GlobalConfiguration::~GlobalConfiguration(){
        if(desc) delete desc;
    }
    
    void GlobalConfiguration::preParseStartupParameters()  {
    
            addOption(InitOption::OPT_HELP, "Produce help message");
    
            addOption<std::string>(InitOption::OPT_CONF_FILE,"File configuration path");
    
            addOption(InitOption::OPT_VERSION, "Printout version");
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
            addOption(InitOption::OPT_NODE_DESC, po::value< std::string >()->default_value(""), "A string containing a brief description of the node");
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
            addOption(InitOption::OPT_DATA_DIR, po::value< std::string >()->default_value("/tmp"), "A data directory where the node can dump data and check points");
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
    
    
            addOption(InitOption::OPT_LOG_ON_CONSOLE, po::value< bool >()->zero_tokens(), "Specify when the log must be forwarded on console");
    
            addOption(InitOption::OPT_LOG_ON_SYSLOG, po::value< bool >()->zero_tokens(), "Specify when the log must be forwarded on syslog server");
            addOption(InitOption::OPT_LOG_SYSLOG_SERVER, po::value< string >()->default_value("localhost"), "Specify the logsrv hostname");
    
            addOption(InitOption::OPT_LOG_SYSLOG_SERVER_PORT, po::value< uint32_t >()->default_value(514), "Specify the logsrv port");
    
            addOption(InitOption::OPT_LOG_ON_FILE, po::value< bool >()->zero_tokens(), "Specify when the log must be forwarded on file");
    
            addOption(InitOption::OPT_LOG_FILE, po::value< string >()->default_value("chaos_framework_log_%Y-%m-%d_%H-%M-%S.%N.log"), "Specify when the file path of the log");
    
            addOption(InitOption::OPT_LOG_LEVEL, po::value< string >()->default_value("info"), "Specify the level of the log using the value [debug, info, notice, warning, fatal]");
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
            addOption(InitOption::OPT_LOG_MAX_SIZE_MB, po::value< uint32_t >()->default_value(100), "Specify the max size in megabytes fo the file log");
    
            addOption(InitOption::OPT_METADATASERVER_ADDRESS, po::value< std::vector< std::string > >(), "Metadataserver server:port address");
    
            addOption(InitOption::OPT_METADATASERVER_AUTO_CONF, po::value< bool >()->zero_tokens(), "Enable auto configuration for metadataserver endpoints");
    
            #if defined(KAFKA_RDK_ENABLE) || defined(KAFKA_ASIO_ENABLE)
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
            addOption(InitOption::OPT_DATA_IO_IMPL, po::value< string >()->default_value(std::string("IODirectIOPSMsgDriver")), "Specify the data io implementation");
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
            addOption(InitOption::OPT_MSG_OPT, po::value< int >()->default_value(0), "0=copy,1=zero,2=synchronous");
    
            addOption(InitOption::OPT_MSG_PRODUCER_KVP, po::value< std::vector<std::string> >(), "K:V message producer options");
            addOption(InitOption::OPT_MSG_CONSUMER_KVP, po::value< std::vector<std::string> >(), "K:V message consumer options");
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
            //disable directio
            addOption(InitOption::OPT_DIRECT_IO_IMPLEMENTATION, po::value< string >()->default_value(std::string("")), "Specify the direct io implementation");
            addOption(InitOption::OPT_RPC_IMPLEMENTATION, po::value< string >()->default_value("PSM"), "Specify the rpc implementation");
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
            addOption(InitOption::OPT_DATA_IO_IMPL, po::value< string >()->default_value(std::string("IODirectIODriver")), "Specify the data io implementation");
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
            addOption(InitOption::OPT_DIRECT_IO_IMPLEMENTATION, po::value< string >()->default_value(std::string("ZMQ")), "Specify the direct io implementation");
            addOption(InitOption::OPT_RPC_IMPLEMENTATION, po::value< string >()->default_value("ZMQ"), "Specify the rpc implementation");
    
    
            #endif
            #if ENABLE_ZMQ_MONITOR
            addOption(InitOption::OPT_ENABLE_ZMQ_MONITOR, po::value< bool >()->default_value(true), "Monitor zmq connections");
    
    
            #endif
    
            addOption(InitOption::OPT_DIRECT_IO_PRIORITY_SERVER_PORT, po::value<uint32_t>()->default_value(_DIRECT_IO_PRIORITY_PORT), "DirectIO priority server port");
            addOption(InitOption::OPT_DIRECT_IO_SERVICE_SERVER_PORT, po::value<uint32_t>()->default_value(_DIRECT_IO_SERVICE_PORT), "DirectIO service server port");
    
    Claudio Bisegni's avatar
    Claudio Bisegni committed
            addOption(InitOption::OPT_DIRECT_IO_SERVER_THREAD_NUMBER, po::value<uint32_t>()->default_value(1),"DirectIO server thread number");
    
            addOption(InitOption::OPT_DIRECT_IO_SERVER_IMPL_KV_PARAM, po::value< std::vector<std::string> >(),"DirectIO implementation key value parameters[k:v]");
            addOption(InitOption::OPT_DIRECT_IO_CLIENT_IMPL_KV_PARAM, po::value< std::vector<std::string> >(),"DirectIO implementation key value parameters[k:v]");
    
            addOption(InitOption::OPT_RPC_SYNC_ENABLE, po::value< bool >()->default_value(false), "Enable the sync wrapper to rpc protocol");
    
            addOption(InitOption::OPT_RPC_SERVER_PORT, po::value<uint32_t>()->default_value(_RPC_PORT), "RPC server port");
            addOption(InitOption::OPT_RPC_SERVER_THREAD_NUMBER, po::value<uint32_t>()->default_value(2),"RPC server thread number");
    
            addOption(InitOption::OPT_RPC_IMPL_KV_PARAM, po::value< std::vector<std::string> >(),"RPC implementation key value parameter[k:v]");
    
            addOption(InitOption::OPT_RPC_DOMAIN_QUEUE_THREAD, po::value<uint32_t>()->default_value(1),"RPC domain scheduler queue's thread consumer number");
    
            addOption(InitOption::OPT_RPC_DOMAIN_SCHEDULER_TYPE, po::value<uint32_t>()->default_value(0),"RPC domain scheduler type[0-default, 1-shared]");
    
            addOption(InitOption::OPT_EVENT_DISABLE, po::value< bool >()->default_value(true), "Disable the event system [by default it is enable]");
    
            addOption(InitOption::OPT_PUBLISHING_IP, po::value< string >(), "Specify the ip address where to publish the framework rpc system");
    
            addOption(InitOption::OPT_PUBLISHING_INTERFACE, po::value< string >(), "Specify the interface where to publish the framework rpc system");
    
            addOption(InitOption::OPT_TIME_CALIBRATION, po::value< bool >()->zero_tokens(), "Enable the time calibration for chaos process");
    
            addOption(InitOption::OPT_TIME_CALIBRATION_OFFSET_BOUND, po::value< uint32_t >()->default_value(500), "The number of millisecond of difference after wich the calibration will be activated");
    
            addOption(InitOption::OPT_TIME_CALIBRATION_NTP_SERVER, po::value< string >(), "Specify the NTP server used for time calibration");
    
            
            addOption(InitOption::OPT_PLUGIN_ENABLE, po::value< bool >()->zero_tokens(), "Enable the use of the plugin");
    
            addOption(InitOption::OPT_PLUGIN_DIRECTORY_PATH, po::value< std::string >()->default_value("."), "Specify the directory where are stored the plugin");
    
            addOption(InitOption::OPT_SCRIPT_VM_KV_PARAM, po::value< std::vector<std::string> >(),"Script virtual machine key value parameter [k:v]");
    
    
            addOption(InitOption::OPT_REST_POLL_TIME_US, po::value< uint32_t >()->default_value(10),"Rest poll time in us (less means more responsive, but more cpu intensive)");
    
    
            addOption(ext_unt::InitOption::OPT_UNIT_GATEWAY_ENABLE, po::value< bool >()->zero_tokens(), ext_unt::InitOption::OPT_UNIT_GATEWAY_ENABLE_DESC);
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
            addOption(InitOption::OPT_DIRECT_HTTP_STREAM_ENABLE, po::value< bool >()->zero_tokens(), "Enable direct streaming of images ");
    
            addOption(InitOption::OPT_DIRECT_HTTP_STREAM_PORT, po::value<uint32_t>()->default_value(STREAMER_PORT), "Default server http port for streaming");
            addOption(InitOption::OPT_DIRECT_HTTP_STREAM_WORKER, po::value<uint32_t>()->default_value(1), "Number of stream thread");
    
            addOption(InitOption::OPT_DIRECT_HTTP_STREAM_HOST, po::value<std::string>()->default_value(""), "Default server http host ip for streaming, empty autodetect");
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
    
    
            addOption(ext_unt::InitOption::OPT_UNIT_GATEWAY_WORKER_THREAD_NUMBER, po::value< uint32_t >()->default_value(ext_unt::InitOption::OPT_UNIT_GATEWAY_WORKER_THREAD_NUMBER_DEFAULT), ext_unt::InitOption::OPT_UNIT_GATEWAY_WORKER_THREAD_NUMBER_DESC);
            addOption(ext_unt::InitOption::OPT_UNIT_GATEWAY_ADAPTER_KV_PARAM, po::value< std::vector<std::string> >(), ext_unt::InitOption::OPT_UNIT_GATEWAY_ADAPTER_KV_PARAM_DESC);
    
    Claudio Bisegni's avatar
    Claudio Bisegni committed
            
    #if CHAOS_PROMETHEUS
            addOption(InitOption::OPT_METRIC_ENABLE, po::value< bool >()->zero_tokens(), "Enable metric");
            addOption(InitOption::OPT_METRIC_WEB_SERVER_PORT, po::value< std::string >()->default_value("10000"), "Specify the port where publish the prometheus metrics");
    #endif
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
            addOption(InitOption::OPT_MSG_BROKER_SERVER, po::value< std::string >()->default_value(std::string("localhost:9092")), "Message broker");
            addOption(InitOption::OPT_MSG_BROKER_DRIVER, po::value< std::string >()->default_value(std::string("kafka-rdk")), "Message broker driver");
    
            addOption(InitOption::OPT_GROUP_NAME, po::value< std::string >()->default_value(std::string("")), "Group Name");
    
    
            addOption(InitOption::OPT_NODEUID, po::value< std::string >()/*->default_value(std::string("NONAME"))*/,"UID of the node");
    
    Claudio Bisegni's avatar
    Claudio Bisegni committed
            throw CException(0, e.what(), "GlobalConfiguration::preParseStartupParameters");
        }
    }
    
    
    void GlobalConfiguration::parseStartupParametersAllowingUnregistered(int argc, const char* argv[])  {
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
        parseParameter(po::command_line_parser(argc, argv).options(*desc).allow_unregistered().run());
    
    void GlobalConfiguration::parseStartupParameters(int argc, const char* argv[])  {
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
        parseParameter(po::parse_command_line(argc, argv, *desc));
    
    void GlobalConfiguration::parseStringStream(std::istream &sStreamOptions)  {
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
        parseParameter(po::parse_config_file(sStreamOptions, *desc));
    
    CDataWrapper& GlobalConfiguration::getBuildInfoRef() {
        return build_info;
    }
    
    chaos::common::utility::process::ProcessInfo&  GlobalConfiguration::getProcessInfoRef(){
        return pinfo;
    }
    
    int32_t GlobalConfiguration::filterLogLevel(string& levelStr)  {
    
        chaos::common::log::level::LogSeverityLevel level = chaos::common::log::level::LSLInfo;
    
        
        if (levelStr == "info")
    
            level = chaos::common::log::level::LSLInfo;
    
        else if (levelStr == "debug")
    
            level = chaos::common::log::level::LSLDebug;
    
        else if (levelStr == "notice")
    
            level = chaos::common::log::level::LSLNotice;
    
        else if (levelStr == "warning")
    
            level = chaos::common::log::level::LSLWarning;
    
        else if (levelStr == "fatal")
    
            level = chaos::common::log::level::LSLFatal;
    
        else
            throw chaos::CException(1, "Invalid log level", "GlobalConfiguration::filterLogLevel");
        
        return static_cast< int32_t >(level);
    
    void GlobalConfiguration::loadStartupParameter(int argc, const char* argv[])  {
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
            po::store(po::parse_command_line(argc, argv, *desc), vm);
    
        }catch (po::error &e) {
            //write error also on cerr
            std::cerr << e.what();
            throw CException(0, e.what(), __PRETTY_FUNCTION__);
        }
    }
    
    
    void GlobalConfiguration::loadStartupParameterFromEnv()  {
        try{
            //
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
            po::store(po::parse_environment(*desc, "CHAOS-"), vm);
    
            po::notify(vm);
        }catch (po::error &e) {
            //write error also on cerr
            std::cerr << e.what();
            throw CException(0, e.what(), __PRETTY_FUNCTION__);
        }
    }
    
    
    void GlobalConfiguration::loadStreamParameter(std::istream &config_file)   {
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
            po::store(po::parse_config_file(config_file, *desc), vm);
    
        }catch (po::error &e) {
            //write error also on cerr
            std::cerr << e.what();
            throw CException(0, e.what(), __PRETTY_FUNCTION__);
        }
    }
    
    
    void GlobalConfiguration::scanOption()   {
    
        try {
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
                std::cout << *desc;
    
            if (hasOption(InitOption::OPT_VERSION)) {
    
                std::cout <<"Version:"<< CSLIB_VERSION_MAJOR<<"."<<CSLIB_VERSION_MINOR<<"."<<CSLIB_VERSION_NUMBER<< " BuildID:"<<CSLIB_BUILD_ID<< " BuildDate:"<<__DATE__ <<" " <<__TIME__<<"\n";
    
        } catch (po::error &e) {
    
            //write error also on cerr
            std::cerr << e.what();
            throw CException(0, e.what(), __PRETTY_FUNCTION__);
    
    void GlobalConfiguration::parseParameter(const po::basic_parsed_options<char>& optionsParser) {
    
    Claudio Bisegni's avatar
    Claudio Bisegni committed
        }catch (po::error &e) {
    
            //write error also on cerr
    
    Claudio Bisegni's avatar
    Claudio Bisegni committed
            throw CException(0, e.what(), "GlobalConfiguration::preParseStartupParameters");
        }
    
        //scan option
        scanOption();
        
        //check the default option
        checkDefaultOption();
    
    #define CHECK_AND_DEFINE_CONFIG_OPTION(t,y)\
    {t x;\
    if(hasOption(y)){\
    x = getOption<t>(y);\
    configuration->append(y,x);}}
    
    
    void GlobalConfiguration::checkDefaultOption()  {
    
        configuration.reset(new CDataWrapper());
    
        //now we can fill the gloabl configuration
        //start with getting log configuration
    
        CHECK_AND_DEFINE_BOOL_ZERO_TOKEN_OPTION(logOnConsole, InitOption::OPT_LOG_ON_CONSOLE);
    
        configuration->addBoolValue(InitOption::OPT_LOG_ON_CONSOLE, logOnConsole);
    
        CHECK_AND_DEFINE_BOOL_ZERO_TOKEN_OPTION(logOnSyslog, InitOption::OPT_LOG_ON_SYSLOG);
    
        configuration->addBoolValue(InitOption::OPT_LOG_ON_SYSLOG, logOnSyslog);
    
        CHECK_AND_DEFINE_OPTION(string, logSyslogSrv, InitOption::OPT_LOG_SYSLOG_SERVER);
    
        configuration->addStringValue(InitOption::OPT_LOG_SYSLOG_SERVER, logSyslogSrv);
    
        CHECK_AND_DEFINE_OPTION_WITH_DEFAULT(uint32_t, logSyslogSrvPort, InitOption::OPT_LOG_SYSLOG_SERVER_PORT, 0);
    
        configuration->addInt32Value(InitOption::OPT_LOG_SYSLOG_SERVER_PORT, logSyslogSrvPort);
    
        CHECK_AND_DEFINE_BOOL_ZERO_TOKEN_OPTION(logOnFile, InitOption::OPT_LOG_ON_FILE);
    
        configuration->addBoolValue(InitOption::OPT_LOG_ON_FILE, logOnFile);
    
        CHECK_AND_DEFINE_OPTION(string, logFilePath, InitOption::OPT_LOG_FILE);
    
        configuration->addStringValue(InitOption::OPT_LOG_FILE, logFilePath);
    
        
        CHECK_AND_DEFINE_OPTION(string, logLevel, InitOption::OPT_LOG_LEVEL)
    
        configuration->addInt32Value(InitOption::OPT_LOG_LEVEL, filterLogLevel(logLevel));
    
        CHECK_AND_DEFINE_OPTION_WITH_DEFAULT(uint32_t, log_max_size_mb, InitOption::OPT_LOG_MAX_SIZE_MB, 10);
    
        configuration->addInt32Value(InitOption::OPT_LOG_MAX_SIZE_MB, log_max_size_mb);
    
        CHECK_AND_DEFINE_OPTION(string, publishingIp, InitOption::OPT_PUBLISHING_IP);
    
        if(publishingIp.size()&&InetUtility::checkWellFormedHostPort(publishingIp)){configuration->addStringValue(InitOption::OPT_PUBLISHING_IP, publishingIp);}
    
    Claudio Bisegni's avatar
    Claudio Bisegni committed
        
    
        CHECK_AND_DEFINE_OPTION(string, publishingInterface, InitOption::OPT_PUBLISHING_INTERFACE)
    
        configuration->addStringValue(InitOption::OPT_PUBLISHING_INTERFACE, publishingInterface);
    
        CHECK_AND_DEFINE_OPTION_WITH_DEFAULT(uint32_t, rpcServerPort, InitOption::OPT_RPC_SERVER_PORT, 8888);
    
        int32_t freeFoundPort = InetUtility::scanForLocalFreePort(rpcServerPort);
    
        addLocalServerBasePort(freeFoundPort);
    
        configuration->addInt32Value(InitOption::OPT_RPC_SERVER_PORT, freeFoundPort);
    
    Claudio Bisegni's avatar
    Claudio Bisegni committed
        
    
        CHECK_AND_DEFINE_OPTION_WITH_DEFAULT(uint32_t, rpcServerThreadNumber, InitOption::OPT_RPC_SERVER_THREAD_NUMBER, 1);
    
        configuration->addInt32Value(InitOption::OPT_RPC_SERVER_THREAD_NUMBER, rpcServerThreadNumber);
    
    Claudio Bisegni's avatar
    Claudio Bisegni committed
        
    
        CHECK_AND_DEFINE_OPTION(string, rpcImpl, InitOption::OPT_RPC_IMPLEMENTATION)
    
        configuration->addStringValue(InitOption::OPT_RPC_IMPLEMENTATION, rpcImpl);
    
    
        CHECK_AND_DEFINE_CONFIG_OPTION(std::string,InitOption::OPT_MSG_BROKER_SERVER);
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
        CHECK_AND_DEFINE_CONFIG_OPTION(std::string,InitOption::OPT_DATA_DIR);
    
    
        CHECK_AND_DEFINE_CONFIG_OPTION(std::string,InitOption::OPT_NODE_DESC);
    
        CHECK_AND_DEFINE_CONFIG_OPTION(std::string,chaos::InitOption::OPT_NODEUID);
    
        CHECK_AND_DEFINE_CONFIG_OPTION(std::string,InitOption::OPT_MSG_BROKER_DRIVER);
    
        #if defined(KAFKA_RDK_ENABLE) || defined(KAFKA_ASIO_ENABLE)
        CHECK_AND_DEFINE_CONFIG_OPTION(std::vector<std::string>,InitOption::OPT_MSG_PRODUCER_KVP);
        CHECK_AND_DEFINE_CONFIG_OPTION(std::vector<std::string>,InitOption::OPT_MSG_CONSUMER_KVP);
        #endif
    
        CHECK_AND_DEFINE_OPTION(bool, OPT_RPC_SYNC_ENABLE, InitOption::OPT_RPC_SYNC_ENABLE)
    
        configuration->addBoolValue(InitOption::OPT_RPC_SYNC_ENABLE, OPT_RPC_SYNC_ENABLE);
    
        CHECK_AND_DEFINE_OPTION(std::vector<std::string>, rpc_impl_kv_param, InitOption::OPT_RPC_IMPL_KV_PARAM);
    
        
        //fill the key value list
        if(rpc_impl_kv_param.size()) {
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
            fillKVParameter(map_kv_param_rpc_impl, rpc_impl_kv_param, "");
    
        CHECK_AND_DEFINE_OPTION_WITH_DEFAULT(uint32_t, rpc_domain_queue_thread_number, InitOption::OPT_RPC_DOMAIN_QUEUE_THREAD, 1);
    
        configuration->addInt32Value(InitOption::OPT_RPC_DOMAIN_QUEUE_THREAD, rpc_domain_queue_thread_number);
    
        CHECK_AND_DEFINE_OPTION_WITH_DEFAULT(uint32_t, rpc_domain_scheduler_type, InitOption::OPT_RPC_DOMAIN_SCHEDULER_TYPE, 0);
    
        configuration->addInt32Value(InitOption::OPT_RPC_DOMAIN_SCHEDULER_TYPE, rpc_domain_scheduler_type);
    
        CHECK_AND_DEFINE_OPTION_WITH_DEFAULT(uint32_t, direct_io_server_thread_number, InitOption::OPT_DIRECT_IO_SERVER_THREAD_NUMBER, 2);
    
        configuration->addInt32Value(common::direct_io::DirectIOConfigurationKey::DIRECT_IO_SERVER_THREAD_NUMBER, direct_io_server_thread_number);
    
        CHECK_AND_DEFINE_OPTION(std::vector<std::string>, directio_srv_impl_kv_param, InitOption::OPT_DIRECT_IO_SERVER_IMPL_KV_PARAM);
    
        if(directio_srv_impl_kv_param.size()) {
            fillKVParameter(map_kv_param_directio_srv_impl, directio_srv_impl_kv_param, "");
    
        CHECK_AND_DEFINE_OPTION(std::vector<std::string>, directio_clnt_impl_kv_param, InitOption::OPT_DIRECT_IO_CLIENT_IMPL_KV_PARAM);
    
        //fill the key value list
        if(directio_clnt_impl_kv_param.size()) {
            fillKVParameter(map_kv_param_directio_clnt_impl, directio_clnt_impl_kv_param, "");
        }
    
        CHECK_AND_DEFINE_OPTION(string, direct_io_server_impl, InitOption::OPT_DIRECT_IO_IMPLEMENTATION)
    
        if(direct_io_server_impl.size()){
            configuration->addStringValue(common::direct_io::DirectIOConfigurationKey::DIRECT_IO_IMPL_TYPE, direct_io_server_impl);
        }
    
        CHECK_AND_DEFINE_OPTION_WITH_DEFAULT(uint32_t, direct_io_priority_port, InitOption::OPT_DIRECT_IO_PRIORITY_SERVER_PORT, _DIRECT_IO_PRIORITY_PORT);
    
        freeFoundPort = InetUtility::scanForLocalFreePort(direct_io_priority_port);
    
        configuration->addInt32Value(common::direct_io::DirectIOConfigurationKey::DIRECT_IO_PRIORITY_PORT, (uint32_t)freeFoundPort);
    
        CHECK_AND_DEFINE_OPTION_WITH_DEFAULT(uint32_t, direct_io_service_port, InitOption::OPT_DIRECT_IO_SERVICE_SERVER_PORT, _DIRECT_IO_SERVICE_PORT);
    
        freeFoundPort = InetUtility::scanForLocalFreePort(direct_io_service_port);
    
        configuration->addInt32Value(common::direct_io::DirectIOConfigurationKey::DIRECT_IO_SERVICE_PORT, (uint32_t)freeFoundPort);
    
        CHECK_AND_DEFINE_BOOL_ZERO_TOKEN_OPTION(event_disable, InitOption::OPT_EVENT_DISABLE);
    
        configuration->addBoolValue(InitOption::OPT_EVENT_DISABLE, event_disable);
    
        configuration->addStringValue(chaos::common::event::EventConfiguration::OPTION_KEY_EVENT_ADAPTER_IMPLEMENTATION, "AsioImpl");
    
        //configure metadataserver as single or list
    
        CHECK_AND_DEFINE_OPTION(std::vector<std::string>, metadata_server_address_list, InitOption::OPT_METADATASERVER_ADDRESS);
    
        if(metadata_server_address_list.size()) {
            for(std::vector<std::string>::iterator it = metadata_server_address_list.begin();
                it != metadata_server_address_list.end();
                it++) {
                addMetadataServerAddress(*it);
            }
    
        finalizeMetadataServerAddress();
    
    
        } /*else {
            addMetadataServerAddress("localhost:5000");
        }*/
    
    
        CHECK_AND_DEFINE_BOOL_ZERO_TOKEN_OPTION(auto_conf_mds_endpoint, InitOption::OPT_METADATASERVER_AUTO_CONF);
        configuration->addBoolValue(InitOption::OPT_METADATASERVER_AUTO_CONF, auto_conf_mds_endpoint);
        
    
        CHECK_AND_DEFINE_BOOL_ZERO_TOKEN_OPTION(enable_time_calibration, InitOption::OPT_TIME_CALIBRATION);
    
        configuration->addBoolValue(InitOption::OPT_TIME_CALIBRATION, enable_time_calibration);
    
        CHECK_AND_DEFINE_OPTION_WITH_DEFAULT(uint32_t, offset_calibration_bound, InitOption::OPT_TIME_CALIBRATION_OFFSET_BOUND, 500);
    
        configuration->addInt32Value(InitOption::OPT_TIME_CALIBRATION_OFFSET_BOUND, offset_calibration_bound);
    
        CHECK_AND_DEFINE_OPTION_WITH_DEFAULT(std::string, time_calibration_ntp_server, InitOption::OPT_TIME_CALIBRATION_NTP_SERVER, "");
    
        configuration->addStringValue(InitOption::OPT_TIME_CALIBRATION_NTP_SERVER, time_calibration_ntp_server);
    
        
        CHECK_AND_DEFINE_OPTION(std::vector<std::string>, script_vm_kv_param, InitOption::OPT_SCRIPT_VM_KV_PARAM);
    
    Claudio Bisegni's avatar
    Claudio Bisegni committed
        // fill the key value list
    
        if(script_vm_kv_param.size()) {
            fillKVParameter(map_kv_param_script_vm, script_vm_kv_param, "");
        }
    
    Claudio Bisegni's avatar
    Claudio Bisegni committed
    
    #if CHAOS_PROMETHEUS
        // configura http metric port
        CHECK_AND_DEFINE_OPTION_WITH_DEFAULT(std::string, httpMetricPort, InitOption::OPT_METRIC_WEB_SERVER_PORT, "10000");
    
        httpMetricPort = ChaosToString(InetUtility::scanForLocalFreePort(boost::lexical_cast<int32_t>(httpMetricPort)));
    
    Claudio Bisegni's avatar
    Claudio Bisegni committed
        configuration->addStringValue(InitOption::OPT_METRIC_WEB_SERVER_PORT, httpMetricPort);
        
    #endif
    
    Claudio Bisegni's avatar
    Claudio Bisegni committed
    }
    
    
    /*
     Add a custom option
     */
    void GlobalConfiguration::addOption(const char* name,
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
            desc->add_options()(name, description);
    
    Claudio Bisegni's avatar
    Claudio Bisegni committed
        }catch (po::error &e) {
            throw CException(0, e.what(), "GlobalConfiguration::addOption");
        }
    }
    
    /*
     Add a custom option
     */
    void GlobalConfiguration::addOption(const char* name,
    
                                        const po::value_semantic* s,
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
            desc->add_options()(name, s, description);
        
    
        }catch (po::error &e) {
            throw CException(0, e.what(), "GlobalConfiguration::addOption");
        }
    
    /*
     Add a custom option
     */
    void GlobalConfiguration::addOptionZeroTokens(const char* name,
                                                  const char* description,
    
        try{
            addOption(name, po::value< bool >(default_variable)->zero_tokens(), description);
        }catch (po::error &e) {
            throw CException(0, e.what(), "GlobalConfiguration::addOptionZeroTokens");
        }
    }
    
    
    /**
     *return the cdatawrapper that contains the global configuraiton
     */
    chaos_data::CDataWrapper *GlobalConfiguration::getConfiguration(){
    
        return configuration.get();
    
    void GlobalConfiguration::setConfiguration(chaos_data::CDataWrapper *conf){
    
        configuration->copyAllTo(*conf);
    
    void GlobalConfiguration::addMetadataServerAddress(const string& mdsAddress)  {
    
        bool isHostnameAndPort = InetUtility::checkWellFormedHostNamePort(mdsAddress);
        bool isIpAndPort  = InetUtility::checkWellFormedHostIpPort(mdsAddress);
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
        if(!isHostnameAndPort && !isIpAndPort){
            std::stringstream ss;
    
            ss<< "Bad server address: '"<<mdsAddress<<"' expected ip:port or hostaddress:port, check address and port!";
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
            throw CException(1, ss.str(), "GlobalConfiguration::addMetadataServerAddress");
        }
    
        configuration->appendStringToArray(mdsAddress);
    
    }
    
    void GlobalConfiguration::finalizeMetadataServerAddress() {
    
        configuration->finalizeArrayForKey(InitOption::OPT_METADATASERVER_ADDRESS);
    
    void GlobalConfiguration::addLocalServerAddress(const std::string& mdsAddress)  {
    
        bool isIp = InetUtility::checkWellFormedHostPort(mdsAddress);
    
            std::stringstream ss;
            ss<<"Bad server address:'"<<mdsAddress<<"'";
            throw CException(1, ss.str(), "GlobalConfiguration::addMetadataServerAddress");
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
        configuration->addStringValue(chaos::InitOption::OPT_NODE_IP, mdsAddress);
    
    void GlobalConfiguration::addLocalServerBasePort(int32_t localDefaultPort)  {
    
        configuration->addInt32Value("base_port", localDefaultPort);
    
    std::string GlobalConfiguration::getHostname() {
        return InetUtility::getHostname();
    }
    
    
    /*
     return the address of metadataserver
     */
    string GlobalConfiguration::getMetadataServerAddress() {
    
        CMultiTypeDataArrayWrapperSPtr server_array = configuration->getVectorValue(InitOption::OPT_METADATASERVER_ADDRESS);
    
        CHAOS_ASSERT(server_array->size());
        return server_array->getStringElementAtIndex(0);
    }
    
    
    VectorNetworkAddress GlobalConfiguration::getMetadataServerAddressList() {
    
        CMultiTypeDataArrayWrapperSPtr server_array = configuration->getVectorValue(InitOption::OPT_METADATASERVER_ADDRESS);
    
        for(int idx = 0;
            idx < server_array->size();
            idx++) {
    
            result.push_back(CNetworkAddress(server_array->getStringElementAtIndex(idx)));
    
    }
    
    /*
     return the address of metadataserver
     */
    string GlobalConfiguration::getLocalServerAddress() {
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
        return configuration->getStringValue(chaos::InitOption::OPT_NODE_IP);
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
    std::string GlobalConfiguration::getDesc(){
        return configuration->getStringValue(chaos::InitOption::OPT_NODE_DESC);
    
    
    }
    std::string GlobalConfiguration::getNodeUID(){
    
        return configuration->getStringValue(chaos::InitOption::OPT_NODEUID);
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
    }
    void GlobalConfiguration::setNodeUID(const std::string& uid){
        if(uid.size()==0){
            return;
        }
        if(configuration->hasKey(chaos::InitOption::OPT_NODEUID)){
            configuration->removeKey(chaos::InitOption::OPT_NODEUID);
            
        }
        configuration->addStringValue(chaos::InitOption::OPT_NODEUID,uid);
    
    
    /*
     return the address of metadataserver
     */
    int32_t GlobalConfiguration::getLocalServerBasePort() {
    
        return configuration->getInt32Value("base_port");
    
    Claudio Bisegni's avatar
    Claudio Bisegni committed
    #if CHAOS_PROMETHEUS
    int32_t GlobalConfiguration::getHttpMetricsPort() {
        return configuration->getInt32Value(InitOption::OPT_METRIC_WEB_SERVER_PORT);
    }
    #endif
    
    
    string GlobalConfiguration::getLocalServerAddressAnBasePort(){
        char buf[128];
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
        string addr = configuration->getStringValue(chaos::InitOption::OPT_NODE_IP);
    
        sprintf ( buf, "%s:%d", addr.c_str(), (int)configuration->getInt32Value("base_port"));
    
        addr.assign(buf);
        return addr;
    }
    
    /*
     return the address of metadataserver
     */
    bool GlobalConfiguration::isMEtadataServerConfigured() {
    
        return configuration->hasKey(InitOption::OPT_METADATASERVER_ADDRESS);
    
    MapStrKeyStrValue& GlobalConfiguration::getRpcImplKVParam() {
    
    MapStrKeyStrValue& GlobalConfiguration::getDirectIOServerImplKVParam() {
        return map_kv_param_directio_srv_impl;
    }
    
    MapStrKeyStrValue& GlobalConfiguration::getDirectIOClientImplKVParam() {
        return map_kv_param_directio_clnt_impl;
    
    
    MapStrKeyStrValue& GlobalConfiguration::getScriptVMKVParam() {
        return map_kv_param_script_vm;
    }