diff --git a/ChaosMetadataService/object_storage/influxDB/InfluxDB.cpp b/ChaosMetadataService/object_storage/influxDB/InfluxDB.cpp index 035c270b80224e1745d31ae0a1e77580e42fee69..a76a33c3519fe15067d41ad678cbf58b8029dcde 100644 --- a/ChaosMetadataService/object_storage/influxDB/InfluxDB.cpp +++ b/ChaosMetadataService/object_storage/influxDB/InfluxDB.cpp @@ -404,12 +404,39 @@ int InfluxDB::findObject(const std::string& uint64_t seqid = last_record_found_seq.datapack_counter; uint64_t runid = last_record_found_seq.run_id; + std::stringstream ss; + ss<<"SELECT "; + if(projection_keys.size()==0){ + ss<<"*"; + } else { + for(ChaosStringSet::iterator i = projection_keys.begin();i!=projection_keys.end();i++){ + ss<<*i; + if((++i)!=projection_keys.end()){ + ss<<","; + } + --i; + } + } + + ss<<" FROM \""<<key<<"\" WHERE time>="<<timestamp_from*1000<<" AND time<"<<timestamp_to*1000; + if(meta_tags.size()){ + ss<<" AND \"tag\"='"<<*meta_tags.begin()<<"'"; + } + if(page_len>0){ + ss<<" LIMIT "<<page_len; + } + + std::string resp; + int ret=influxdb_cpp::query(resp,ss.str(),si); + + DBG<<ss.str()<<" returned "<<ret<<" ->"<<resp; + #if CHAOS_PROMETHEUS // (*gauge_query_time_uptr) = (chaos::common::utility::TimingUtil::getTimeStamp() - ts); #endif - return err; + return ret; } //! fast search object into object persistence layer @@ -462,7 +489,7 @@ void InfluxDB::push_process() { measurements.str(""); } - usleep(1000*si.max_time_ms); + usleep(1000*si.poll_time_ms); } } diff --git a/ChaosMetadataService/object_storage/influxDB/InfluxDBLogStorageDriver.cpp b/ChaosMetadataService/object_storage/influxDB/InfluxDBLogStorageDriver.cpp index ae2748fc74fcc6fe40c53e35671ce261fa83d79e..05c3b77984267a55d70f30aaead5cb6a5f603595 100644 --- a/ChaosMetadataService/object_storage/influxDB/InfluxDBLogStorageDriver.cpp +++ b/ChaosMetadataService/object_storage/influxDB/InfluxDBLogStorageDriver.cpp @@ -59,6 +59,8 @@ void InfluxDBLogStorageDriver::init(void *init_data) { const std::string retention = DriverPoolManager::logSetting.persistence_kv_param_map["retention"]; const std::string max_measure_opt = DriverPoolManager::logSetting.persistence_kv_param_map["max_mesure"]; const std::string max_measure_ms_opt = DriverPoolManager::logSetting.persistence_kv_param_map["max_time_ms"]; + const std::string poll_time_ms_opt = DriverPoolManager::logSetting.persistence_kv_param_map["poll_time_ms"]; + const std::string max_array_size_opt = DriverPoolManager::logSetting.persistence_kv_param_map["max_array_size"]; @@ -98,6 +100,9 @@ void InfluxDBLogStorageDriver::init(void *init_data) { if(max_measure_ms_opt.size()){ si.max_time_ms=atoi(max_measure_ms_opt.c_str()); } + if(poll_time_ms_opt.size()){ + si.poll_time_ms=atoi(max_measure_ms_opt.c_str()); + } if(max_measure_opt.size()){ si.max_mesurements=atoi(max_measure_opt.c_str()); } diff --git a/ChaosMetadataService/object_storage/influxDB/influxdb.hpp b/ChaosMetadataService/object_storage/influxDB/influxdb.hpp index 923107d5f9319b2726ad2c90cc254ae9dc4a7284..ac38332da380912efc30e3bd650d1e7db7037219 100644 --- a/ChaosMetadataService/object_storage/influxDB/influxdb.hpp +++ b/ChaosMetadataService/object_storage/influxDB/influxdb.hpp @@ -31,6 +31,8 @@ #define closesocket close #endif #define MAX_MEASURES 2000 +#define MAX_TIME 1000 // max time after that push + #define MAX_POLL_TIME 300 // default poll 300 ms #define MAX_ARRAY_SIZE 512 // default max array size @@ -46,9 +48,11 @@ namespace influxdb_cpp { std::string funcprefix; int max_mesurements; //commit when n mesures reach max_measurement int max_time_ms; // commit when expired max_time_ms + int poll_time_ms; // poll time + int max_array_size; // push maximum array of this size server_info(const std::string& host, int port, const std::string& db = "", const std::string& usr = "", const std::string& pwd = "", const std::string& precision="ms", const std::string& retention="1095d", const std::string& prefix="") - : host_(host), port_(port), db_(db), usr_(usr), pwd_(pwd), precision_(precision), max_array_size(MAX_ARRAY_SIZE),retention_(retention),funcprefix(prefix),max_mesurements(MAX_MEASURES),max_time_ms(MAX_POLL_TIME) {} + : host_(host), port_(port), db_(db), usr_(usr), pwd_(pwd), precision_(precision), poll_time_ms(MAX_POLL_TIME),max_array_size(MAX_ARRAY_SIZE),retention_(retention),funcprefix(prefix),max_mesurements(MAX_MEASURES),max_time_ms(MAX_TIME) {} }; namespace detail { struct meas_caller;