From ab9fa4dadd6e1073bb814db70356f6ea54db3382 Mon Sep 17 00:00:00 2001 From: amichelo <andrea.michelotti@lnf.infn.it> Date: Mon, 18 Oct 2021 11:44:33 +0200 Subject: [PATCH] reduced mongo accesses posix storage dont use timeout but thread --- ChaosMetadataService/ChaosMetadataService.cpp | 31 ++++++---- ChaosMetadataService/ChaosMetadataService.h | 2 +- .../common/CUCommonUtility.cpp | 2 +- .../object_storage/posixFile/PosixFile.cpp | 57 +++++++------------ .../object_storage/posixFile/PosixFile.h | 9 +-- 5 files changed, 48 insertions(+), 53 deletions(-) diff --git a/ChaosMetadataService/ChaosMetadataService.cpp b/ChaosMetadataService/ChaosMetadataService.cpp index e5eb277f9..72e97f06d 100644 --- a/ChaosMetadataService/ChaosMetadataService.cpp +++ b/ChaosMetadataService/ChaosMetadataService.cpp @@ -63,6 +63,7 @@ uint64_t ChaosMetadataService::timePrecisionMask=0xFFFFFFFFFFFFFFF0ULL; #define LCND_LERR ERR_LOG(ChaosMetadataService) #define log(x) LCND_LDBG<<x ChaosMetadataService::ChaosMetadataService(){ingore_unreg_po = true; +is_present=false; }; ChaosMetadataService::~ChaosMetadataService(){} @@ -71,6 +72,7 @@ ChaosMetadataService::~ChaosMetadataService(){} Specialized option for startup c and cpp program main options parameter */ void ChaosMetadataService::init(int argc, const char* argv[]) { + is_present=false; ChaosCommon<ChaosMetadataService>::init(argc, argv); } //!stringbuffer parser @@ -78,6 +80,7 @@ void ChaosMetadataService::init(int argc, const char* argv[]) { specialized option for string stream buffer with boost semantics */ void ChaosMetadataService::init(istringstream &initStringStream) { + is_present=false; ChaosCommon<ChaosMetadataService>::init(initStringStream); } @@ -135,6 +138,7 @@ void ChaosMetadataService::init(void *init_data) { } uint64_t tmp=pow(2,(uint32_t)log2(timeError_opt)); timePrecisionMask=~(tmp-1); + is_present=false; if(getGlobalConfigurationInstance()->hasOption(OPT_CACHE_DRIVER_KVP)) { GlobalConfiguration::fillKVParameter(setting.cache_driver_setting.key_value_custom_param, getGlobalConfigurationInstance()->getOption< std::vector<std::string> >(OPT_CACHE_DRIVER_KVP), ""); @@ -277,24 +281,27 @@ void ChaosMetadataService::start() { void ChaosMetadataService::timeout() { int err = 0; - bool presence = false; HealthStat service_proc_stat; const std::string ds_uid = NetworkBroker::getInstance()->getRPCUrl(); persistence::data_access::DataServiceDataAccess *ds_da = DriverPoolManager::getInstance()->getPersistenceDataAccess<persistence::data_access::DataServiceDataAccess>(); persistence::data_access::NodeDataAccess *n_da = DriverPoolManager::getInstance()->getPersistenceDataAccess<persistence::data_access::NodeDataAccess>(); service_proc_stat.mds_received_timestamp = TimingUtil::getTimeStamp(); - if(n_da->checkNodePresence(presence, ds_uid) != 0) { - LCND_LERR << CHAOS_FORMAT("Error check if this mds [%1%] description is registered", %NetworkBroker::getInstance()->getRPCUrl()); - return; - } + if(is_present==false){ + bool presence=false; + if(n_da->checkNodePresence(presence, ds_uid) != 0) { + LCND_LERR << CHAOS_FORMAT("Error check if this mds [%1%] description is registered", %NetworkBroker::getInstance()->getRPCUrl()); + //return; + } - if(presence == false) { - //reinsert mds - ds_da->registerNode(setting.ha_zone_name, - NetworkBroker::getInstance()->getRPCUrl(), - NetworkBroker::getInstance()->getDirectIOUrl(), - 0, - getBuildInfo(chaos::common::data::CDWUniquePtr())); + if(presence == false) { + //reinsert mds + ds_da->registerNode(setting.ha_zone_name, + NetworkBroker::getInstance()->getRPCUrl(), + NetworkBroker::getInstance()->getDirectIOUrl(), + 0, + getBuildInfo(chaos::common::data::CDWUniquePtr())); + } + is_present=presence; } //update proc stat diff --git a/ChaosMetadataService/ChaosMetadataService.h b/ChaosMetadataService/ChaosMetadataService.h index 6fdcbba7f..c2689320f 100644 --- a/ChaosMetadataService/ChaosMetadataService.h +++ b/ChaosMetadataService/ChaosMetadataService.h @@ -65,7 +65,7 @@ namespace chaos { const std::vector<std::string>& multitoken_param); //inherited by chaos::common::async_central::TimerHandler void timeout(); - + bool is_present;// check if exists the entry public: static uint64_t timePrecisionMask; static std::string mdsName; diff --git a/ChaosMetadataService/common/CUCommonUtility.cpp b/ChaosMetadataService/common/CUCommonUtility.cpp index 0be11bf25..9562994f2 100644 --- a/ChaosMetadataService/common/CUCommonUtility.cpp +++ b/ChaosMetadataService/common/CUCommonUtility.cpp @@ -219,6 +219,7 @@ void CUCommonUtility::addDataServicePack(ChaosUniquePtr<chaos::common::data::CDa if (now >= nu_cache_ts || data_services.size() == 0) { data_services.clear(); + nu_cache_ts = now + chaos::common::constants::RefreshEndpointMSec; if ((err = ds_da->getBestNDataService(ha_zone_name, data_services, @@ -226,7 +227,6 @@ void CUCommonUtility::addDataServicePack(ChaosUniquePtr<chaos::common::data::CDa throw CException(err, "Error fetching best available data service", __PRETTY_FUNCTION__); } //update cache on first call after ten seconds - nu_cache_ts = now + chaos::common::constants::RefreshEndpointMSec; } std::string msgbroker = GlobalConfiguration::getInstance()->getOption<std::string>(InitOption::OPT_MSG_BROKER_SERVER); if (msgbroker.size()) { diff --git a/ChaosMetadataService/object_storage/posixFile/PosixFile.cpp b/ChaosMetadataService/object_storage/posixFile/PosixFile.cpp index c9ef6b3cd..cae3bcee0 100644 --- a/ChaosMetadataService/object_storage/posixFile/PosixFile.cpp +++ b/ChaosMetadataService/object_storage/posixFile/PosixFile.cpp @@ -56,7 +56,7 @@ chaos::common::metric::CounterUniquePtr PosixFile::counter_read_data_uptr; chaos::common::metric::GaugeUniquePtr PosixFile::gauge_insert_time_uptr; chaos::common::metric::GaugeUniquePtr PosixFile::gauge_query_time_uptr; #endif -boost::lockfree::queue<PosixFile::dirpath_t*, boost::lockfree::fixed_sized<true> > PosixFile::file_to_finalize(MAXDIROPENED); +//boost::lockfree::queue<PosixFile::dirpath_t*, boost::lockfree::fixed_sized<true> > PosixFile::file_to_finalize(MAXDIROPENED); #ifdef CERN_ROOT GenerateRootJob PosixFile::rootGenJob; #endif @@ -555,7 +555,7 @@ PosixFile::PosixFile(const std::string& name) #endif DBG << " BASED DIR:" << name; - AsyncCentralManager::getInstance()->addTimer(this, 2000, 2000); + // AsyncCentralManager::getInstance()->addTimer(this, 2000, 2000); finalize_th = boost::thread(&PosixFile::finalizeJob, this); #ifdef CERN_ROOT rootGenJob.init(1); @@ -566,25 +566,9 @@ void PosixFile::finalizeJob() { exitFinalizeJob = false; do { - boost::mutex::scoped_lock lock(mutex_io); - - wait_data.wait(lock); - dirpath_t* ele; - - while (file_to_finalize.pop(ele)) { - DBG << "processing dir :" << ele->dir << " name:" << ele->name; - if (createFinal(ele->dir, ele->name, true) >= 0) { - // DBG << " CREATE FINAL: " << fpath; -#ifdef CERN_ROOT - if (PosixFile::generateRoot) { - std::string fpath = ele->dir + "/" + ele->name + ((PosixFile::compress) ? ".lz4" : ""); - chaos::CObjectProcessingQueue<std::string>::QueueElementShrdPtr a(new std::string(fpath)); - rootGenJob.push(a); - } -#endif - - delete ele; - } + + if(!process_dirs()){ + sleep(PROCESS_DIR_FREQ_SEC); } } while (!exitFinalizeJob); @@ -594,7 +578,7 @@ void PosixFile::finalizeJob() { PosixFile::~PosixFile() { exitFinalizeJob = true; - AsyncCentralManager::getInstance()->removeTimer(this); + // AsyncCentralManager::getInstance()->removeTimer(this); #ifdef CERN_ROOT rootGenJob.deinit(); #endif @@ -1419,8 +1403,8 @@ int PosixFile::getObjectByIndex(const chaos::common::data::CDWShrdPtr& index, return 0; } -void PosixFile::timeout() { - +bool PosixFile::process_dirs() { + bool work_done=false; // remove directory write cache for (write_path_t::iterator id = s_lastWriteDir.begin(); id != s_lastWriteDir.end(); ) { uint64_t ts = chaos::common::utility::TimingUtil::getTimeStamp(); @@ -1450,19 +1434,21 @@ void PosixFile::timeout() { } int ret; if ((ret = makeOrdered(id->second)) > 0) { - dirpath_t* ele = new dirpath_t(); - ele->dir = dstdir; - ele->name = POSIX_FINAL_DATA_NAME; - DBG << "TO Process" << dstdir; - - file_to_finalize.push(ele); - wait_data.notify_all(); + + DBG << "processing dir :" << dstdir << " name:" << POSIX_FINAL_DATA_NAME; + if (createFinal(dstdir, POSIX_FINAL_DATA_NAME, true) >= 0) { + // DBG << " CREATE FINAL: " << fpath; + #ifdef CERN_ROOT + if (PosixFile::generateRoot) { + std::string fpath = ele->dir + "/" + ele->name + ((PosixFile::compress) ? ".lz4" : ""); + chaos::CObjectProcessingQueue<std::string>::QueueElementShrdPtr a(new std::string(fpath)); + rootGenJob.push(a); + } + #endif - /*if (createFinal(dstdir, POSIX_FINAL_DATA_NAME) >= 0) { + } + work_done=true; - DBG << " CREATE FINAL: " <<dstdir + "/"+POSIX_FINAL_DATA_NAME; - - }*/ } else if (ret < 0) { DBG << "remove resource:" << dstdir; @@ -1504,6 +1490,7 @@ void PosixFile::timeout() { id++; } }*/ + return work_done; } //!return the number of object for a determinated key that are store for a time range diff --git a/ChaosMetadataService/object_storage/posixFile/PosixFile.h b/ChaosMetadataService/object_storage/posixFile/PosixFile.h index 546ae867f..96947f732 100644 --- a/ChaosMetadataService/object_storage/posixFile/PosixFile.h +++ b/ChaosMetadataService/object_storage/posixFile/PosixFile.h @@ -38,6 +38,7 @@ #endif //CHAOS_PROMETHEUS // 3Khz #define MAX_NUM_OF_FILE_PER_MINUTE 60*3000 +#define PROCESS_DIR_FREQ_SEC 2 #include "FileLock.h" namespace chaos { @@ -165,7 +166,7 @@ namespace chaos { int getData(abstraction::VectorObject& data,int maxData,const uint64_t timestamp_from,const uint64_t timestamp_to,chaos::common::direct_io::channel::opcode_headers::SearchSequence&,int timeout=5000); ~SearchWorker(); }; - class PosixFile:public metadata_service::object_storage::abstraction::ObjectStorageDataAccess,public chaos::common::async_central::TimerHandler { + class PosixFile:public metadata_service::object_storage::abstraction::ObjectStorageDataAccess{ friend SearchWorker; @@ -235,8 +236,8 @@ public: typedef std::map<std::string,read_path_t> cacheRead_t; static boost::mutex last_access_mutex,cache_mutex; static cacheRead_t s_lastAccessedDir; - // return number of items, or negative if error - void timeout(); + // something to process + bool process_dirs(); #ifdef CERN_ROOT @@ -256,7 +257,7 @@ public: boost::thread finalize_th; boost::condition_variable wait_data; boost::mutex mutex_io; - static boost::lockfree::queue<dirpath_t*, boost::lockfree::fixed_sized<true> > file_to_finalize; + // static boost::lockfree::queue<dirpath_t*, boost::lockfree::fixed_sized<true> > file_to_finalize; bool exitFinalizeJob; -- GitLab