Skip to content
Snippets Groups Projects
InfluxDBLogStorageDriver.cpp 5.89 KiB
Newer Older
  • Learn to ignore specific revisions
  • /*
     * Copyright 2012, 18/06/2018 INFN
     *
     * Licensed under the EUPL, Version 1.2 or – as soon they
     * will be approved by the European Commission - subsequent
     * versions of the EUPL (the "Licence");
     * You may not use this work except in compliance with the
     * Licence.
     * You may obtain a copy of the Licence at:
     *
     * https://joinup.ec.europa.eu/software/page/eupl
     *
     * Unless required by applicable law or agreed to in
     * writing, software distributed under the Licence is
     * distributed on an "AS IS" basis,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
     * express or implied.
     * See the Licence for the specific language governing
     * permissions and limitations under the Licence.
     */
    
    //#include <chaos/common/global.h>
    
    #include "../../ChaosMetadataService.h"
    #include <boost/algorithm/string.hpp>
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
    #include <chaos_service_common/DriverPoolManager.h>
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
    #include <regex>
    
    #include "InfluxDBLogStorageDriver.h"
    #include "InfluxDB.h"
    using namespace chaos;
    
    using namespace chaos::service_common::persistence::data_access;
    
    using namespace chaos::metadata_service;
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
    using namespace chaos::service_common;
    
    
    using namespace chaos::metadata_service::object_storage;
    using namespace chaos::metadata_service::object_storage::abstraction;
    
    #define INFO    INFO_LOG(InfluxDBLogStorageDriver)
    #define DBG     DBG_LOG(InfluxDBLogStorageDriver)
    #define ERR     ERR_LOG(InfluxDBLogStorageDriver)
    
    DEFINE_CLASS_FACTORY(InfluxDBLogStorageDriver,
                         chaos::service_common::persistence::data_access::AbstractPersistenceDriver);
    
    InfluxDBLogStorageDriver::InfluxDBLogStorageDriver(const std::string& name):
    AbstractPersistenceDriver(name){}
    
    InfluxDBLogStorageDriver::~InfluxDBLogStorageDriver() {}
    
    
    void InfluxDBLogStorageDriver::init(void *init_data)  {
    
        AbstractPersistenceDriver::init(init_data);
    
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
        const ChaosStringVector url_list = DriverPoolManager::logSetting.persistence_server_list;
        const std::string user = DriverPoolManager::logSetting.persistence_kv_param_map["user"];
        const std::string password = DriverPoolManager::logSetting.persistence_kv_param_map["pwd"];
        const std::string database = DriverPoolManager::logSetting.persistence_kv_param_map["db"];
        const std::string retention = DriverPoolManager::logSetting.persistence_kv_param_map["retention"];
    
        const std::string max_measure_opt = DriverPoolManager::logSetting.persistence_kv_param_map["max_mesure"];
        const std::string max_measure_ms_opt = DriverPoolManager::logSetting.persistence_kv_param_map["max_time_ms"];
    
        const std::string poll_time_ms_opt = DriverPoolManager::logSetting.persistence_kv_param_map["poll_time_ms"];
    
    
        const std::string max_array_size_opt = DriverPoolManager::logSetting.persistence_kv_param_map["max_array_size"];
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
    
    
        std::string servername="localhost";
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
        std::string funcpath="";
    
        std::string exptime="1095d";
    
        int port=8086;
        if(url_list.size()>0){
            std::vector<std::string> ele;
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
            std::regex expr{"(.+):(\\d+)/*(.*)"};
            std::smatch what;
            if (std::regex_search(url_list[0], what, expr)){
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
                
                servername=what[1];
                port=atoi(what[2].str().c_str());
                if(what.length()>=3){
                    funcpath=what[3];
                }
            }
        /*    boost::split(ele,url_list[0],boost::is_any_of(":"));
    
            if(ele.size()>0){
                servername=ele[0];
            }
            if(ele.size()>1){
                port=atoi(ele[1].c_str());
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
            }*/
    
        }
        if(database.size()==0){
            ERR<<"You must specify a valid database name";
            throw chaos::CException(-1,"You must specify a valid database name",__FUNCTION__);
        }
    
    Andrea Michelotti's avatar
    Andrea Michelotti committed
            exptime=retention;
        }
    
        influxdb_cpp::server_info si(servername,port,database,user,password,"ms",exptime,funcpath);
    
        if(max_measure_ms_opt.size()){
            si.max_time_ms=atoi(max_measure_ms_opt.c_str());
    
        }
        if(poll_time_ms_opt.size()){
            si.poll_time_ms=atoi(max_measure_ms_opt.c_str());
    
        }
        if(max_measure_opt.size()){
            si.max_mesurements=atoi(max_measure_opt.c_str());
        }
        if(max_array_size_opt.size()){
            si.max_array_size=atoi(max_array_size_opt.c_str());
        }
        
       
    
        //influxdb_t  asyncdb = influxdb_t( new influxdb::async_api::simple_db(url_list[0], database));
       // asyncdb->with_authentication(user,password);
    
        DBG<<"server:"<<servername<<"\nport:"<<port<<"\ndatabase:"<<database<<"\nuser:"<<user<<"\npassw:"<<password<<" retention:"<<exptime<<" path:"<<funcpath<<" max_measures:"<<si.max_mesurements<<" poll time(ms):"<<si.max_time_ms<<" max array size:"<<si.max_array_size;
    
        //register the data access implementations
        std::string resp;
        int ret;
        if((ret=influxdb_cpp::show_db(resp,si))<0){
           ERR<<"cannot show DB:"<<database<< " on:"<<servername<<" port:"<<port;
            throw chaos::CException(ret,"cannot connect or create DB:"+database+" on server:"+servername,__FUNCTION__);  
        }
        CDataWrapper r;
        r.setSerializedJsonData(resp.c_str());
        DBG<<" DB returned:"<<ret<<" answer:\""<<resp<<"\"";
    
        if((ret=influxdb_cpp::create_db(resp,database,si))<0){
           ERR<<"cannot connect or create DB:"<<database<< " on:"<<servername<<" port:"<<port;
        throw chaos::CException(ret,"cannot connect or create DB:"+database+" on server:"+servername,__FUNCTION__);  
        }
        r.setSerializedJsonData(resp.c_str());
    
        if(ret!=0){
            ERR<<" DB returned:"<<ret<<" answer:\""<<resp<<"\"";
            throw chaos::CException(ret,"Influx on server:"+servername+" error'"+resp+"'",__FUNCTION__);  
    
        } else{
                DBG<<" DB returned:"<<ret<<" answer:\""<<resp<<"\"";
    
        registerDataAccess<ObjectStorageDataAccess>(new InfluxDB(si));
    }
    
    
    void InfluxDBLogStorageDriver::deinit()  {
    
        //call sublcass
        AbstractPersistenceDriver::deinit();
    }
    
    void InfluxDBLogStorageDriver::deleteDataAccess(void *instance) {
        AbstractDataAccess *da_instance = static_cast<AbstractDataAccess*>(instance);
        if(da_instance != NULL)delete(da_instance);
    }