Skip to content
Snippets Groups Projects
Commit 8a72bddc authored by Andrea Michelotti's avatar Andrea Michelotti
Browse files

added influxDB and the log storage channel

parent 20d93264
No related branches found
No related tags found
No related merge requests found
/*
* Copyright 2012, 2017 INFN
*
* Licensed under the EUPL, Version 1.2 or – as soon they
* will be approved by the European Commission - subsequent
* versions of the EUPL (the "Licence");
* You may not use this work except in compliance with the
* Licence.
* You may obtain a copy of the Licence at:
*
* https://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in
* writing, software distributed under the Licence is
* distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied.
* See the Licence for the specific language governing
* permissions and limitations under the Licence.
*/
#include "InfluxDB.h"
#include <chaos/common/configuration/GlobalConfiguration.h>
#include <boost/algorithm/string/join.hpp>
#include <boost/filesystem.hpp>
#include <boost/regex.hpp>
#define INFO INFO_LOG(InfluxDB)
#define DBG DBG_LOG(InfluxDB)
#define ERR ERR_LOG(InfluxDB)
#include <chaos/common/utility/TimingUtil.h>
using namespace chaos::metadata_service::object_storage;
#if CHAOS_PROMETHEUS
using namespace chaos::common::metric;
#endif
using namespace chaos::common::async_central;
namespace chaos {
namespace metadata_service {
namespace object_storage {
#if CHAOS_PROMETHEUS
/*static global*/
chaos::common::metric::CounterUniquePtr InfluxDB::counter_write_data_uptr;
chaos::common::metric::CounterUniquePtr InfluxDB::counter_read_data_uptr;
chaos::common::metric::GaugeUniquePtr InfluxDB::gauge_insert_time_uptr;
chaos::common::metric::GaugeUniquePtr InfluxDB::gauge_query_time_uptr;
#endif
std::stringstream InfluxDB::measurements;
uint32_t InfluxDB::nmeas;
/**************/
InfluxDB::InfluxDB(const influxdb_cpp::server_info& serverinfo)
: si(serverinfo) {
nmeas = 0;
#if CHAOS_PROMETHEUS
MetricManager::getInstance()->createCounterFamily("mds_log_io_data", "Measure the data rate for the data sent and read to log storage [byte]");
counter_write_data_uptr = MetricManager::getInstance()->getNewCounterFromFamily("mds_log_io_data", {{"type", "write_byte"}});
counter_read_data_uptr = MetricManager::getInstance()->getNewCounterFromFamily("mds_log_io_data", {{"type", "read_byte"}});
MetricManager::getInstance()->createGaugeFamily("mds_log_op_time", "Measure the time spent by object storageto complete operation [milliseconds]");
gauge_insert_time_uptr = MetricManager::getInstance()->getNewGaugeFromFamily("mds_log_op_time", {{"type", "insert_time"}});
gauge_query_time_uptr = MetricManager::getInstance()->getNewGaugeFromFamily("mds_log_op_time", {{"type", "query_time"}});
DBG << " CREATED METRICS";
#endif
AsyncCentralManager::getInstance()->addTimer(this, 1000, 1000);
}
InfluxDB::~InfluxDB() {
}
int InfluxDB::pushObject(const std::string& key,
const ChaosStringSetConstSPtr meta_tags,
const chaos::common::data::CDataWrapper& stored_object) {
if (!stored_object.hasKey(chaos::DataPackCommonKey::DPCK_DEVICE_ID) ||
!stored_object.hasKey(chaos::ControlUnitDatapackCommonKey::RUN_ID) ||
!stored_object.hasKey(chaos::DataPackCommonKey::DPCK_SEQ_ID)) {
ERR << CHAOS_FORMAT("Object to store doesn't has the default key!\n %1%", % stored_object.getJSONString());
return -1;
}
const uint64_t now = chaos::common::utility::TimingUtil::getTimeStamp();
const uint64_t ts = stored_object.getInt64Value(NodeHealtDefinitionKey::NODE_HEALT_MDS_TIMESTAMP);
uint8_t* buf;
size_t buflen;
int64_t seq, runid;
std::string tag;
std::string meas = stored_object.getStringValue(chaos::DataPackCommonKey::DPCK_DEVICE_ID);
ChaosStringVector contained_key;
stored_object.getAllKey(contained_key);
boost::mutex::scoped_lock ll(iolock);
if (nmeas >= MAX_MEASURES) {
ERR<<" reached max number of measurements sending "<<nmeas<< " measurements";
return -1;
}
measurements << stored_object.getStringValue(chaos::DataPackCommonKey::DPCK_DEVICE_ID);
if (meta_tags->size() > 0) {
//tag=std::accumulate(meta_tags->begin(),meta_tags->end(),std::string("_"));
measurements << ",tag=" << *(meta_tags->begin());
}
int first=0;
for (std::vector<std::string>::iterator i = contained_key.begin(); i != contained_key.end(); i++) {
if (*i != chaos::DataPackCommonKey::DPCK_DEVICE_ID) {
char c=(first==0)?' ':',';
switch (stored_object.getValueType(*i)) {
case DataType::TYPE_BOOLEAN:
measurements << c << *i << "=" << (stored_object.getBoolValue(*i)) ? 't' : 'f';
nmeas++;
first++;
break;
case DataType::TYPE_INT32:
case DataType::TYPE_INT64:
measurements << c << *i << "=" << stored_object.getStringValue(*i)<<'i';
nmeas++;
first++;
break;
case DataType::TYPE_DOUBLE:
measurements << c << *i << "=" << stored_object.getStringValue(*i);
nmeas++;
first++;
break;
case DataType::TYPE_STRING:
measurements << c << *i << "=\"" << stored_object.getStringValue(*i) << "\"";
first++;
nmeas++;
// l(meas,influxdb::api::key_value_pairs(*i,stored_object.getStringValue(*i)));
// break;
}
}
if ((i + 1) == contained_key.end()) {
measurements << " " << ts << "\n";
}
}
#if CHAOS_PROMETHEUS
(*counter_write_data_uptr) += stored_object.getBSONRawSize();
#endif
#if CHAOS_PROMETHEUS
(*gauge_insert_time_uptr) = (chaos::common::utility::TimingUtil::getTimeStamp() - ts);
#endif
if (nmeas >= MAX_MEASURES) {
DBG<<" reached max number of measurements sending "<<nmeas<< " measurements";
nmeas = 0;
influxdb_cpp::detail::inner::http_request("POST", "write", "", measurements.str(), si, NULL);
measurements.clear();
measurements.str("");
}
return 0;
}
//!Retrieve an object from the object persistence layer
int InfluxDB::getObject(const std::string& key,
const uint64_t& timestamp,
chaos::common::data::CDWShrdPtr& object_ptr_ref) {
ERR << " NOT IMPLEMENTED";
return -1;
}
//!Retrieve the last inserted object from the object persistence layer
int InfluxDB::getLastObject(const std::string& key,
chaos::common::data::CDWShrdPtr& object_ptr_ref) {
ERR << " NOT IMPLEMENTED";
return -1;
}
//!delete objects that are contained between intervall (exstreme included)
int InfluxDB::deleteObject(const std::string& key,
uint64_t start_timestamp,
uint64_t end_timestamp) {
return 0;
}
//!search object into object persistence layer
int InfluxDB::findObject(const std::string& key,
const ChaosStringSet& meta_tags,
const ChaosStringSet& projection_keys,
const uint64_t timestamp_from,
const uint64_t timestamp_to,
const uint32_t page_len,
abstraction::VectorObject& found_object_page,
chaos::common::direct_io::channel::opcode_headers::SearchSequence& last_record_found_seq) {
int err = 0;
uint64_t seqid = last_record_found_seq.datapack_counter;
uint64_t runid = last_record_found_seq.run_id;
#if CHAOS_PROMETHEUS
// (*gauge_query_time_uptr) = (chaos::common::utility::TimingUtil::getTimeStamp() - ts);
#endif
return err;
}
//!fast search object into object persistence layer
/*!
Fast search return only data index to the client, in this csae client ned to use api to return the single
or grouped data
*/
int InfluxDB::findObjectIndex(const abstraction::DataSearch& search,
abstraction::VectorObject& found_object_page,
chaos::common::direct_io::channel::opcode_headers::SearchSequence& last_record_found_seq) {
ERR << " NOT IMPLEMENTED";
return 0;
}
//! return the object asosciated with the index array
/*!
For every index object witl be returned the associated data object, if no data is received will be
insert an empty object
*/
int InfluxDB::getObjectByIndex(const chaos::common::data::CDWShrdPtr& index,
chaos::common::data::CDWShrdPtr& found_object) {
ERR << " NOT IMPLEMENTED";
return 0;
}
void InfluxDB::timeout() {
boost::mutex::scoped_lock ll(iolock);
if (nmeas >0) {
DBG<<" sending "<<nmeas<< " measurements";
nmeas = 0;
std::string ret;
int res=influxdb_cpp::push_db( ret, measurements.str(), si);
if(res!=0){
ERR<<" result:"<<res<<" database:"<<ret<<" sent:\""<<measurements.str()<<"\"";
}
measurements.clear();
measurements.str("");
}
}
//!return the number of object for a determinated key that are store for a time range
int InfluxDB::countObject(const std::string& key,
const uint64_t timestamp_from,
const uint64_t timestamp_to,
uint64_t& object_count) {
return 0;
}
} // namespace object_storage
} // namespace metadata_service
} // namespace chaos
/*
* Copyright 2012, 2017 INFN
*
* Licensed under the EUPL, Version 1.2 or – as soon they
* will be approved by the European Commission - subsequent
* versions of the EUPL (the "Licence");
* You may not use this work except in compliance with the
* Licence.
* You may obtain a copy of the Licence at:
*
* https://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in
* writing, software distributed under the Licence is
* distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied.
* See the Licence for the specific language governing
* permissions and limitations under the Licence.
*/
#ifndef __CHAOSFramework_C9E19CC7_5691_4873_9DBC_39596C17E8C2_InfluxDB_h
#define __CHAOSFramework_C9E19CC7_5691_4873_9DBC_39596C17E8C2_InfluxDB_h
#include <chaos/common/chaos_types.h>
#include <chaos/common/utility/ObjectInstancer.h>
#include <chaos/common/utility/LockableObject.h>
#include <chaos/common/async_central/async_central.h>
#include <boost/iostreams/device/mapped_file.hpp>
#include <boost/lockfree/queue.hpp>
#include "influxdb.hpp"
#include <chaos/common/data/CDataWrapper.h>
#include <chaos/common/direct_io/channel/DirectIODeviceChannelGlobal.h>
#include <chaos/common/pqueue/CObjectProcessingQueue.h>
#include "../abstraction/ObjectStorageDataAccess.h"
#if CHAOS_PROMETHEUS
#include <chaos/common/metric/metric.h>
#endif //CHAOS_PROMETHEUS
//#include "influxdb-cpp-rest/influxdb_raw_db_utf8.h"
//#include "influxdb-cpp-rest/influxdb_simple_async_api.h"
#include "influxdb.hpp"
#define MAX_MEASURES 5000
namespace chaos {
namespace metadata_service {
namespace object_storage {
//static const char * POSIX_FINAL_EXT=".bson.final";
static const uint64_t POSIX_MINUTES_MS=60*1000;
static const uint64_t POSIX_HOURS_MS=60*(60*1000);
static const uint64_t POSIX_DAY_MS=24*60*(60*1000);
static const uint64_t POSIX_YEAR_MS=365*24*60*(60*1000ULL);
static const uint64_t POSIX_YEARB_MS=366*24*60*(60*1000ULL);
// typedef ChaosSharedPtr<influxdb::async_api::simple_db> influxdb_t;
class InfluxDB:public metadata_service::object_storage::abstraction::ObjectStorageDataAccess,public chaos::common::async_central::TimerHandler {
protected:
const influxdb_cpp::server_info si;
public:
static std::stringstream measurements;
static uint32_t nmeas;
boost::mutex iolock;
#if CHAOS_PROMETHEUS
static chaos::common::metric::CounterUniquePtr counter_write_data_uptr;
static chaos::common::metric::CounterUniquePtr counter_read_data_uptr;
static chaos::common::metric::GaugeUniquePtr gauge_insert_time_uptr;
static chaos::common::metric::GaugeUniquePtr gauge_query_time_uptr;
#endif
// return number of items, or negative if error
void timeout();
public:
//! Construct the driver
InfluxDB(const influxdb_cpp::server_info&);
//! defautl destructor
~InfluxDB();
//!dispose the driver
//!Put an object within the object persistence layer
virtual int pushObject(const std::string& key,
const ChaosStringSetConstSPtr meta_tags,
const chaos::common::data::CDataWrapper& stored_object) ;
//!Retrieve an object from the object persistence layer
virtual int getObject(const std::string& key,
const uint64_t& timestamp,
chaos::common::data::CDWShrdPtr& object_ptr_ref);
//!Retrieve the last inserted object from the object persistence layer
virtual int getLastObject(const std::string& key,
chaos::common::data::CDWShrdPtr& object_ptr_ref);
//!delete objects that are contained between intervall (exstreme included)
virtual int deleteObject(const std::string& key,
uint64_t start_timestamp,
uint64_t end_timestamp);
//!search object into object persistence layer
virtual int findObject(const std::string& key,
const ChaosStringSet& meta_tags,
const ChaosStringSet& projection_keys,
const uint64_t timestamp_from,
const uint64_t timestamp_to,
const uint32_t page_len,
abstraction::VectorObject& found_object_page,
chaos::common::direct_io::channel::opcode_headers::SearchSequence& last_record_found_seq);
//!fast search object into object persistence layer
/*!
Fast search return only data index to the client, in this csae client ned to use api to return the single
or grouped data
*/
virtual int findObjectIndex(const abstraction::DataSearch& search,
abstraction::VectorObject& found_object_page,
chaos::common::direct_io::channel::opcode_headers::SearchSequence& last_record_found_seq);
//! return the object asosciated with the index array
/*!
For every index object witl be returned the associated data object, if no data is received will be
insert an empty object
*/
virtual int getObjectByIndex(const chaos::common::data::CDWShrdPtr& index,
chaos::common::data::CDWShrdPtr& found_object);
//!return the number of object for a determinated key that are store for a time range
virtual int countObject(const std::string& key,
const uint64_t timestamp_from,
const uint64_t timestamp_to,
uint64_t& object_count);
};
}
}
}
#endif /* __CHAOSFramework_C9E19CC7_5691_4873_9DBC_39596C17E8C2_InfluxDB_h */
/*
* Copyright 2012, 18/06/2018 INFN
*
* Licensed under the EUPL, Version 1.2 or – as soon they
* will be approved by the European Commission - subsequent
* versions of the EUPL (the "Licence");
* You may not use this work except in compliance with the
* Licence.
* You may obtain a copy of the Licence at:
*
* https://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in
* writing, software distributed under the Licence is
* distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied.
* See the Licence for the specific language governing
* permissions and limitations under the Licence.
*/
//#include <chaos/common/global.h>
#include "../../ChaosMetadataService.h"
#include <boost/algorithm/string.hpp>
#include "InfluxDBLogStorageDriver.h"
#include "InfluxDB.h"
using namespace chaos;
using namespace chaos::service_common::persistence::data_access;
using namespace chaos::metadata_service;
using namespace chaos::metadata_service::object_storage;
using namespace chaos::metadata_service::object_storage::abstraction;
#define INFO INFO_LOG(InfluxDBLogStorageDriver)
#define DBG DBG_LOG(InfluxDBLogStorageDriver)
#define ERR ERR_LOG(InfluxDBLogStorageDriver)
DEFINE_CLASS_FACTORY(InfluxDBLogStorageDriver,
chaos::service_common::persistence::data_access::AbstractPersistenceDriver);
InfluxDBLogStorageDriver::InfluxDBLogStorageDriver(const std::string& name):
AbstractPersistenceDriver(name){}
InfluxDBLogStorageDriver::~InfluxDBLogStorageDriver() {}
void InfluxDBLogStorageDriver::init(void *init_data) throw (chaos::CException) {
AbstractPersistenceDriver::init(init_data);
const ChaosStringVector url_list = ChaosMetadataService::getInstance()->setting.log_storage_setting.url_list;
const std::string user = ChaosMetadataService::getInstance()->setting.log_storage_setting.key_value_custom_param["user"];
const std::string password = ChaosMetadataService::getInstance()->setting.log_storage_setting.key_value_custom_param["pwd"];
const std::string database = ChaosMetadataService::getInstance()->setting.log_storage_setting.key_value_custom_param["db"];
std::string dir;
MapKVP& obj_storage_kvp = metadata_service::ChaosMetadataService::getInstance()->setting.object_storage_setting.key_value_custom_param;
std::string basedatapath;
if(dir.size()){
basedatapath=dir;
} else if(metadata_service::ChaosMetadataService::getInstance()->getGlobalConfigurationInstance()->hasOption(InitOption::OPT_DATA_DIR)){
basedatapath=metadata_service::ChaosMetadataService::getInstance()->getGlobalConfigurationInstance()->getOption< std::string>(InitOption::OPT_DATA_DIR);
} else {
basedatapath=boost::filesystem::current_path().string();
}
std::string servername="localhost";
int port=8086;
if(url_list.size()>0){
std::vector<std::string> ele;
boost::split(ele,url_list[0],boost::is_any_of(":"));
if(ele.size()>0){
servername=ele[0];
}
if(ele.size()>1){
port=atoi(ele[1].c_str());
}
}
if(database.size()==0){
ERR<<"You must specify a valid database name";
throw chaos::CException(-1,"You must specify a valid database name",__FUNCTION__);
}
//influxdb_t asyncdb = influxdb_t( new influxdb::async_api::simple_db(url_list[0], database));
// asyncdb->with_authentication(user,password);
DBG<<"server:"<<servername<<"\nport:"<<port<<"\ndatabase:"<<database<<"\nuser:"<<user<<"\npassw:"<<password;
influxdb_cpp::server_info si(servername,port,database,user,password,"ms","365d");
//register the data access implementations
std::string resp;
int ret;
if((ret=influxdb_cpp::show_db(resp,si))<0){
ERR<<"cannot show DB:"<<database<< " on:"<<servername<<" port:"<<port;
throw chaos::CException(ret,"cannot connect or create DB:"+database+" on server:"+servername,__FUNCTION__);
}
CDataWrapper r;
r.setSerializedJsonData(resp.c_str());
DBG<<" DB returned:"<<ret<<" answer:\""<<resp<<"\"";
if((ret=influxdb_cpp::create_db(resp,database,si))<0){
ERR<<"cannot connect or create DB:"<<database<< " on:"<<servername<<" port:"<<port;
throw chaos::CException(ret,"cannot connect or create DB:"+database+" on server:"+servername,__FUNCTION__);
}
r.setSerializedJsonData(resp.c_str());
DBG<<" DB returned:"<<ret<<" answer:\""<<resp<<"\"";
registerDataAccess<ObjectStorageDataAccess>(new InfluxDB(si));
}
void InfluxDBLogStorageDriver::deinit() throw (chaos::CException) {
//call sublcass
AbstractPersistenceDriver::deinit();
}
void InfluxDBLogStorageDriver::deleteDataAccess(void *instance) {
AbstractDataAccess *da_instance = static_cast<AbstractDataAccess*>(instance);
if(da_instance != NULL)delete(da_instance);
}
/*
* Copyright 2012, 18/06/2018 INFN
*
* Licensed under the EUPL, Version 1.2 or – as soon they
* will be approved by the European Commission - subsequent
* versions of the EUPL (the "Licence");
* You may not use this work except in compliance with the
* Licence.
* You may obtain a copy of the Licence at:
*
* https://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in
* writing, software distributed under the Licence is
* distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied.
* See the Licence for the specific language governing
* permissions and limitations under the Licence.
*/
#ifndef __CHAOSFramework_E927A5B7_1CA0_802F_AA20_DD63646EA30A_InfluxDBLogStorageDriver_h
#define __CHAOSFramework_E927A5B7_1CA0_802F_AA20_DD63646EA30A_InfluxDBLogStorageDriver_h
#include <chaos/common/utility/ObjectFactoryRegister.h>
#include <chaos_service_common/persistence/data_access/AbstractPersistenceDriver.h>
namespace chaos {
namespace metadata_service {
namespace object_storage {
//! new mongodb implementation of persistence driver
/*!
The driver is define as class in the object factor
*/
DECLARE_CLASS_FACTORY(InfluxDBLogStorageDriver,
chaos::service_common::persistence::data_access::AbstractPersistenceDriver){
REGISTER_AND_DEFINE_DERIVED_CLASS_FACTORY_HELPER(InfluxDBLogStorageDriver)
//! Construct the driver
InfluxDBLogStorageDriver(const std::string& name);
//!dispose the driver
~InfluxDBLogStorageDriver();
//!inherited by AbstractPersistenceDriver
void deleteDataAccess(void *instance);
public:
//! Initialize the driver
void init(void *init_data) throw (chaos::CException);
//!deinitialize the driver
void deinit() throw (chaos::CException);
};
}
}
}
#endif /* __CHAOSFramework_E927A5B7_1CA0_802F_AA20_DD63646EA30A_InfluxDBLogStorageDriver_h */
/*
influxdb-cpp -- 💜 C++ client for InfluxDB.
Copyright (c) 2010-2018 <http://ez8.co> <orca.zhang@yahoo.com>
This library is released under the MIT License.
Please see LICENSE file or visit https://github.com/orca-zhang/influxdb-cpp for details.
*/
#include <sstream>
#include <cstring>
#include <cstdio>
#ifndef __INFLUXDBCPP__
#define __INFLUXDBCPP__
#ifdef _WIN32
#define NOMINMAX
#include <windows.h>
#include <algorithm>
#pragma comment(lib, "ws2_32")
typedef struct iovec { void* iov_base; size_t iov_len; } iovec;
inline __int64 writev(int sock, struct iovec* iov, int cnt) {
__int64 r = send(sock, (const char*)iov->iov_base, iov->iov_len, 0);
return (r < 0 || cnt == 1) ? r : r + writev(sock, iov + 1, cnt - 1);
}
#else
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define closesocket close
#endif
namespace influxdb_cpp {
struct server_info {
std::string host_;
int port_;
std::string db_;
std::string usr_;
std::string pwd_;
std::string precision_;
std::string retention_;
std::string funcprefix;
server_info(const std::string& host, int port, const std::string& db = "", const std::string& usr = "", const std::string& pwd = "", const std::string& precision="ms", const std::string& retention="365d", const std::string& prefix="")
: host_(host), port_(port), db_(db), usr_(usr), pwd_(pwd), precision_(precision), retention_(retention),funcprefix(prefix) {}
};
namespace detail {
struct meas_caller;
struct tag_caller;
struct field_caller;
struct ts_caller;
struct inner {
static int http_request(const char*, const char*, const std::string&, const std::string&, const server_info&, std::string*);
static inline unsigned char to_hex(unsigned char x) { return x > 9 ? x + 55 : x + 48; }
static void url_encode(std::string& out, const std::string& src);
};
}
inline int query(std::string& resp, const std::string& query, const server_info& si) {
std::string qs("&q=");
detail::inner::url_encode(qs, query);
std::string path=(si.funcprefix.size()>0)?si.funcprefix+"/query":"query";
return detail::inner::http_request("GET", path.c_str(), qs, "", si, &resp);
}
inline int create_db(std::string& resp, const std::string& db_name, const server_info& si) {
std::string qs("&q=create+database+");
detail::inner::url_encode(qs, db_name);
std::string path=(si.funcprefix.size()>0)?si.funcprefix+"/query":"query";
return detail::inner::http_request("POST", path.c_str(), qs, "", si, &resp);
}
inline int show_db(std::string& resp, const server_info& si) {
std::string qs("&q=show+databases");
std::string path=(si.funcprefix.size()>0)?si.funcprefix+"/query":"query";
return detail::inner::http_request("POST", path.c_str(), qs, "", si, &resp);
}
inline int push_db(std::string& resp, const std::string& lines,const server_info& si) {
std::string path=(si.funcprefix.size()>0)?si.funcprefix+"/write":"write";
return detail::inner::http_request("POST", path.c_str(), "", lines, si, &resp);
}
struct builder {
detail::tag_caller& meas(const std::string& m) {
lines_.imbue(std::locale("C"));
lines_.clear();
return _m(m);
}
detail::tag_caller& _m(const std::string& m) {
_escape(m, ", ");
return (detail::tag_caller&)*this;
}
detail::tag_caller& _t(const std::string& k, const std::string& v) {
lines_ << ',';
_escape(k, ",= ");
lines_ << '=';
_escape(v, ",= ");
return (detail::tag_caller&)*this;
}
detail::field_caller& _f_s(char delim, const std::string& k, const std::string& v) {
lines_ << delim;
_escape(k, ",= ");
lines_ << "=\"";
_escape(v, "\"");
lines_ << '\"';
return (detail::field_caller&)*this;
}
detail::field_caller& _f_i(char delim, const std::string& k, long long v) {
lines_ << delim;
_escape(k, ",= ");
lines_ << '=';
lines_ << v << 'i';
return (detail::field_caller&)*this;
}
detail::field_caller& _f_f(char delim, const std::string& k, double v, int prec) {
lines_ << delim;
_escape(k, ",= ");
lines_.precision(prec);
lines_ << '=' << v;
return (detail::field_caller&)*this;
}
detail::field_caller& _f_b(char delim, const std::string& k, bool v) {
lines_ << delim;
_escape(k, ",= ");
lines_ << '=' << (v ? 't' : 'f');
return (detail::field_caller&)*this;
}
detail::ts_caller& _ts(long long ts) {
lines_ << ' ' << ts;
return (detail::ts_caller&)*this;
}
int _post_http(const server_info& si, std::string* resp) {
std::string path=(si.funcprefix.size()>0)?si.funcprefix+"/write":"write";
return detail::inner::http_request("POST", path.c_str(), "", lines_.str(), si, resp);
}
int _send_udp(const std::string& host, int port) {
int sock, ret = 0;
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
if((addr.sin_addr.s_addr = inet_addr(host.c_str())) == INADDR_NONE) return -1;
if((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) return -2;
lines_ << '\n';
if(sendto(sock, &lines_.str()[0], lines_.str().length(), 0, (struct sockaddr *)&addr, sizeof(addr)) < (int)lines_.str().length())
ret = -3;
closesocket(sock);
return ret;
}
void _escape(const std::string& src, const char* escape_seq) {
size_t pos = 0, start = 0;
while((pos = src.find_first_of(escape_seq, start)) != std::string::npos) {
lines_.write(src.c_str() + start, pos - start);
lines_ << '\\' << src[pos];
start = ++pos;
}
lines_.write(src.c_str() + start, src.length() - start);
}
std::stringstream lines_;
};
namespace detail {
struct tag_caller : public builder {
detail::tag_caller& tag(const std::string& k, const std::string& v) { return _t(k, v); }
detail::field_caller& field(const std::string& k, const std::string& v) { return _f_s(' ', k, v); }
detail::field_caller& field(const std::string& k, bool v) { return _f_b(' ', k, v); }
detail::field_caller& field(const std::string& k, short v) { return _f_i(' ', k, v); }
detail::field_caller& field(const std::string& k, int v) { return _f_i(' ', k, v); }
detail::field_caller& field(const std::string& k, long v) { return _f_i(' ', k, v); }
detail::field_caller& field(const std::string& k, long long v) { return _f_i(' ', k, v); }
detail::field_caller& field(const std::string& k, double v, int prec = 2) { return _f_f(' ', k, v, prec); }
private:
detail::tag_caller& meas(const std::string& m);
};
struct ts_caller : public builder {
detail::tag_caller& meas(const std::string& m) { lines_ << '\n'; return _m(m); }
int post_http(const server_info& si, std::string* resp = NULL) { return _post_http(si, resp); }
int send_udp(const std::string& host, int port) { return _send_udp(host, port); }
};
struct field_caller : public ts_caller {
detail::field_caller& field(const std::string& k, const std::string& v) { return _f_s(',', k, v); }
detail::field_caller& field(const std::string& k, bool v) { return _f_b(',', k, v); }
detail::field_caller& field(const std::string& k, short v) { return _f_i(',', k, v); }
detail::field_caller& field(const std::string& k, int v) { return _f_i(',', k, v); }
detail::field_caller& field(const std::string& k, long v) { return _f_i(',', k, v); }
detail::field_caller& field(const std::string& k, long long v) { return _f_i(',', k, v); }
detail::field_caller& field(const std::string& k, double v, int prec = 2) { return _f_f(',', k, v, prec); }
detail::ts_caller& timestamp(unsigned long long ts) { return _ts(ts); }
};
inline void inner::url_encode(std::string& out, const std::string& src) {
size_t pos = 0, start = 0;
while((pos = src.find_first_not_of("abcdefghijklmnopqrstuvwxyqABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_.~", start)) != std::string::npos) {
out.append(src.c_str() + start, pos - start);
if(src[pos] == ' ')
out += "+";
else {
out += '%';
out += to_hex((unsigned char)src[pos] >> 4);
out += to_hex((unsigned char)src[pos] & 0xF);
}
start = ++pos;
}
out.append(src.c_str() + start, src.length() - start);
}
inline int inner::http_request(const char* method, const char* uri,
const std::string& querystring, const std::string& body, const server_info& si, std::string* resp) {
std::string header;
struct iovec iv[2];
struct sockaddr_in addr;
int sock, ret_code = 0, content_length = 0, len = 0;
char ch;
unsigned char chunked = 0;
addr.sin_family = AF_INET;
addr.sin_port = htons(si.port_);
struct hostent * record = gethostbyname(si.host_.c_str());
if(record==NULL){
return -1;
}
bcopy(record->h_addr,&addr.sin_addr,record->h_length);
// if((addr.sin_addr.s_addr = inet_addr(si.host_.c_str())) == INADDR_NONE) return -1;
if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) return -2;
if(connect(sock, (struct sockaddr*)(&addr), sizeof(addr)) < 0) {
closesocket(sock);
return -3;
}
header.resize(len = 0x100);
for(;;) {
iv[0].iov_len = snprintf(&header[0], len,
"%s /%s?db=%s&u=%s&p=%s&precision=%s%s HTTP/1.1\r\nHost: %s\r\nContent-Length: %d\r\n\r\n",
method, uri, si.db_.c_str(), si.usr_.c_str(), si.pwd_.c_str(), si.precision_.c_str(),
querystring.c_str(), si.host_.c_str(), (int)body.length());
if((int)iv[0].iov_len >= len)
header.resize(len *= 2);
else
break;
}
iv[0].iov_base = &header[0];
iv[1].iov_base = (void*)&body[0];
iv[1].iov_len = body.length();
if(writev(sock, iv, 2) < (int)(iv[0].iov_len + iv[1].iov_len)) {
ret_code = -6;
goto END;
}
iv[0].iov_len = len;
#define _NO_MORE() (len >= (int)iv[0].iov_len && \
(iv[0].iov_len = recv(sock, &header[0], header.length(), len = 0)) == size_t(-1))
#define _GET_NEXT_CHAR() (ch = _NO_MORE() ? 0 : header[len++])
#define _LOOP_NEXT(statement) for(;;) { if(!(_GET_NEXT_CHAR())) { ret_code = -7; goto END; } statement }
#define _UNTIL(c) _LOOP_NEXT( if(ch == c) break; )
#define _GET_NUMBER(n) _LOOP_NEXT( if(ch >= '0' && ch <= '9') n = n * 10 + (ch - '0'); else break; )
#define _GET_CHUNKED_LEN(n, c) _LOOP_NEXT( if(ch >= '0' && ch <= '9') n = n * 16 + (ch - '0'); \
else if(ch >= 'A' && ch <= 'F') n = n * 16 + (ch - 'A') + 10; \
else if(ch >= 'a' && ch <= 'f') n = n * 16 + (ch - 'a') + 10; else {if(ch != c) { ret_code = -8; goto END; } break;} )
#define _(c) if((_GET_NEXT_CHAR()) != c) break;
#define __(c) if((_GET_NEXT_CHAR()) != c) { ret_code = -9; goto END; }
if(resp) resp->clear();
_UNTIL(' ')_GET_NUMBER(ret_code)
for(;;) {
_UNTIL('\n')
switch(_GET_NEXT_CHAR()) {
case 'C':_('o')_('n')_('t')_('e')_('n')_('t')_('-')
_('L')_('e')_('n')_('g')_('t')_('h')_(':')_(' ')
_GET_NUMBER(content_length)
break;
case 'T':_('r')_('a')_('n')_('s')_('f')_('e')_('r')_('-')
_('E')_('n')_('c')_('o')_('d')_('i')_('n')_('g')_(':')
_(' ')_('c')_('h')_('u')_('n')_('k')_('e')_('d')
chunked = 1;
break;
case '\r':__('\n')
switch(chunked) {
do {__('\r')__('\n')
case 1:
_GET_CHUNKED_LEN(content_length, '\r')__('\n')
if(!content_length) {
__('\r')__('\n')
goto END;
}
case 0:
while(content_length > 0 && !_NO_MORE()) {
content_length -= (iv[1].iov_len = std::min(content_length, (int)iv[0].iov_len - len));
if(resp) resp->append(&header[len], iv[1].iov_len);
len += iv[1].iov_len;
}
} while(chunked);
}
goto END;
}
if(!ch) {
ret_code = -10;
goto END;
}
}
ret_code = -11;
END:
closesocket(sock);
return ret_code / 100 == 2 ? 0 : ret_code;
#undef _NO_MORE
#undef _GET_NEXT_CHAR
#undef _LOOP_NEXT
#undef _UNTIL
#undef _GET_NUMBER
#undef _GET_CHUNKED_LEN
#undef _
#undef __
}
}
}
#endif
\ No newline at end of file
...@@ -41,6 +41,9 @@ namespace chaos { ...@@ -41,6 +41,9 @@ namespace chaos {
#define OPT_OBJ_STORAGE_LOG_METRIC "obj-storage-driver-log-metric" #define OPT_OBJ_STORAGE_LOG_METRIC "obj-storage-driver-log-metric"
#define OPT_OBJ_STORAGE_LOG_METRIC_UPDATE_INTERVAL "obj-storage-driver-log-metric-update-interval" #define OPT_OBJ_STORAGE_LOG_METRIC_UPDATE_INTERVAL "obj-storage-driver-log-metric-update-interval"
#define OPT_OBJ_STORAGE_DRIVER_KVP "obj-storage-kvp" #define OPT_OBJ_STORAGE_DRIVER_KVP "obj-storage-kvp"
#define OPT_LOG_STORAGE_SERVER_URL "log-storage-driver-server_url"
#define OPT_LOG_STORAGE_DRIVER "log-storage-driver"
#define OPT_LOG_STORAGE_DRIVER_KVP "log-storage-kvp" #define OPT_LOG_STORAGE_DRIVER_KVP "log-storage-kvp"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment