From 4fcd2de063eb35d77fe9487815fc7c2ce5a93ab7 Mon Sep 17 00:00:00 2001 From: Andrea Michelotti <amichelotti@lnf.infn.it> Date: Wed, 12 Oct 2022 12:09:26 +0200 Subject: [PATCH] macro for condition wait --- .../object_storage/posixFile/PosixFile.cpp | 2 +- ChaosMetadataService/worker/DataWorker.cpp | 2 +- chaos/common/chaos_constants.h | 2 +- chaos/common/chaos_types.h | 9 ++++++--- chaos/common/message/MessagePublishSubscribeBase.cpp | 2 +- chaos/common/pqueue/CObjectProcessingPriorityQueue.h | 4 ++-- chaos/common/pqueue/CObjectProcessingQueue.h | 5 +++-- chaos/common/thread/ObjectWaitSemaphore.h | 2 +- chaos/common/thread/TLockFreeQueue.h | 4 ++-- chaos/common/thread/WaitSemaphore.cpp | 6 +++--- 10 files changed, 21 insertions(+), 17 deletions(-) diff --git a/ChaosMetadataService/object_storage/posixFile/PosixFile.cpp b/ChaosMetadataService/object_storage/posixFile/PosixFile.cpp index 1e0b39418..d4e25b471 100644 --- a/ChaosMetadataService/object_storage/posixFile/PosixFile.cpp +++ b/ChaosMetadataService/object_storage/posixFile/PosixFile.cpp @@ -816,7 +816,7 @@ bool SearchWorker::waitData(int timeo) { // boost::chrono::system_clock::time_point wakeUpTime = // boost::chrono::system_clock::now() + period; DBG << "waiting for data available.." << elements; - bool ret = CHAOS_WAIT(wait_data,lock, timeo); + bool ret = CHAOS_WAIT_MS(wait_data,lock, timeo); DBG << "data available:" << elements; return ret; diff --git a/ChaosMetadataService/worker/DataWorker.cpp b/ChaosMetadataService/worker/DataWorker.cpp index 463b54ddb..53f69553a 100644 --- a/ChaosMetadataService/worker/DataWorker.cpp +++ b/ChaosMetadataService/worker/DataWorker.cpp @@ -126,7 +126,7 @@ int DataWorker::submitJobInfo(WorkerJobPtr job_info, int64_t milliseconds_to_wai if(job_in_queue >= max_element) { DCLDBG_ << "Fifo Full queue :"<<job_in_queue<<", waiting.."; - if(CHAOS_WAIT(push_condition,lock,milliseconds_to_wait) ==false) { + if(CHAOS_WAIT_MS(push_condition,lock,milliseconds_to_wait) ==false) { DCLERR_ << "Datapack has gone in timeout waiting for queue free more space"; return -1; } diff --git a/chaos/common/chaos_constants.h b/chaos/common/chaos_constants.h index 643b069ab..8d8c10b23 100644 --- a/chaos/common/chaos_constants.h +++ b/chaos/common/chaos_constants.h @@ -199,7 +199,7 @@ static const unsigned int CacheTimeoutinMSec = 5000; static const unsigned int MetricCollectorTimeoutinMSec = 1000; static const unsigned int RefreshEndpointMSec = 60000; static const unsigned int SkipDatasetOlderThan = 5*60000; - +static const unsigned int ProcessingQueueTimeoutMSec = 500; //ms //!time to wait for queue can accept new data to push in object storage /*! Mds when receive a new dataset to store on history, it is push on hst sublayer diff --git a/chaos/common/chaos_types.h b/chaos/common/chaos_types.h index 301f5ffd8..cdb1055a6 100644 --- a/chaos/common/chaos_types.h +++ b/chaos/common/chaos_types.h @@ -23,7 +23,6 @@ #define CHAOSFramework_chaos_types_h #include <boost/ptr_container/ptr_container.hpp> - #include <boost/thread.hpp> #include <set> @@ -68,7 +67,9 @@ typedef std::chrono::microseconds ChaosCronoMicroseconds; #define ChaosUniqueLock std::unique_lock<std::mutex> #define ChaosConditionVariable std::condition_variable #define ChaosConditionVariableAny std::condition_variable_any -#define CHAOS_WAIT(condvar,lock,duration_ms) (condvar.wait_for(lock,std::chrono::milliseconds(duration_ms))==std::cv_status::no_timeout) +#define CHAOS_WAIT(condvar,lock) condvar.wait(lock) +#define CHAOS_WAIT_MS(condvar,lock,duration_ms) (condvar.wait_for(lock,std::chrono::milliseconds(duration_ms))==std::cv_status::no_timeout) + #define CHAOS_WAIT_US(condvar,lock,duration_us) (condvar.wait_for(lock,std::chrono::microseconds(duration_us))==std::cv_status::no_timeout) #define CHAOS_DEFER_LOCK std::defer_lock @@ -127,7 +128,9 @@ using ChaosFunction = std::function< R >; #define ChaosUniqueLock boost::unique_lock<boost::mutex> #define ChaosConditionVariable boost::condition_variable #define ChaosConditionVariableAny boost::condition_variable_any -#define CHAOS_WAIT(condvar,lock,duration_ms) condvar.timed_wait(lock,boost::posix_time::milliseconds(duration_ms)) +#define CHAOS_WAIT(condvar,lock) condvar.wait(lock,boost::posix_time::milliseconds(duration_ms)) +#define CHAOS_WAIT_MS(condvar,lock,duration_ms) condvar.wait(lock,boost::posix_time::milliseconds(duration_ms)) + #define CHAOS_WAIT_US(condvar,lock,duration_us) condvar.timed_wait(lock,boost::posix_time::microseconds(duration_us)) #define ChaosToString boost::lexical_cast<std::string> diff --git a/chaos/common/message/MessagePublishSubscribeBase.cpp b/chaos/common/message/MessagePublishSubscribeBase.cpp index 782bd0d7e..efbfd5ef0 100644 --- a/chaos/common/message/MessagePublishSubscribeBase.cpp +++ b/chaos/common/message/MessagePublishSubscribeBase.cpp @@ -46,7 +46,7 @@ namespace chaos { ChaosUniqueLock guard(mutex_cond); MRDDBG_<<"wating operation"; if(data_ready) return stats.last_err; - if(!CHAOS_WAIT(cond,guard,timeout_ms)){ + if(!CHAOS_WAIT_MS(cond,guard,timeout_ms)){ MRDERR_<<"Timeout"; return -100; } diff --git a/chaos/common/pqueue/CObjectProcessingPriorityQueue.h b/chaos/common/pqueue/CObjectProcessingPriorityQueue.h index 5732769b8..67a097575 100644 --- a/chaos/common/pqueue/CObjectProcessingPriorityQueue.h +++ b/chaos/common/pqueue/CObjectProcessingPriorityQueue.h @@ -175,7 +175,7 @@ namespace chaos { if(waithForEmptyQueue){ COPPQUEUE_LAPP_ << "wait until queue is empty"; while(!bufferQueue.empty()){ - CHAOS_WAIT(emptyQueueConditionLock,lock,500); + CHAOS_WAIT_MS(emptyQueueConditionLock,lock,chaos::common::constants::ProcessingQueueTimeoutMSec); } COPPQUEUE_LAPP_ << "queue is empty"; } @@ -242,7 +242,7 @@ namespace chaos { void waitForEmpty() { ChaosUniqueLock lock(qMutex); while( bufferQueue.empty()){ - bool tim=CHAOS_WAIT(emptyQueueConditionLock,lock,500); + bool tim=CHAOS_WAIT_MS(emptyQueueConditionLock,lock,chaos::common::constants::ProcessingQueueTimeoutMSec); } } diff --git a/chaos/common/pqueue/CObjectProcessingQueue.h b/chaos/common/pqueue/CObjectProcessingQueue.h index a896f1878..3c1a833c0 100644 --- a/chaos/common/pqueue/CObjectProcessingQueue.h +++ b/chaos/common/pqueue/CObjectProcessingQueue.h @@ -22,6 +22,7 @@ #define CObjectProcessingQueue_H #include <chaos/common/global.h> +#include <chaos/common/chaos_constants.h> #include <chaos/common/utility/UUIDUtil.h> #include <chaos/common/exception/exception.h> @@ -122,7 +123,7 @@ namespace chaos { if(waithForEmptyQueue){ COPQUEUE_LDBG_ << " wait until queue is empty"; while(!buffer_queue.empty()){ - CHAOS_WAIT(emptyQueueConditionLock,lock,500); + CHAOS_WAIT_MS(emptyQueueConditionLock,lock,chaos::common::constants::ProcessingQueueTimeoutMSec); } @@ -196,7 +197,7 @@ namespace chaos { void waitForEmpty() { ChaosUniqueLock lock(qMutex); while(!buffer_queue.empty()){ - CHAOS_WAIT(emptyQueueConditionLock,lock,500); + CHAOS_WAIT_MS(emptyQueueConditionLock,lock,chaos::common::constants::ProcessingQueueTimeoutMSec); } return buffer_queue.empty(); diff --git a/chaos/common/thread/ObjectWaitSemaphore.h b/chaos/common/thread/ObjectWaitSemaphore.h index 7fb6d651d..d3fc9e383 100644 --- a/chaos/common/thread/ObjectWaitSemaphore.h +++ b/chaos/common/thread/ObjectWaitSemaphore.h @@ -111,7 +111,7 @@ namespace chaos { if(inWait) return NULL; inWait = true; answered = false; - do {} while(CHAOS_WAIT(wait_answer_condition,lock, millisecToWait) && !answered); + do {} while(CHAOS_WAIT_MS(wait_answer_condition,lock, millisecToWait) && !answered); inWait = false; answered = false; T result = objecForWait; diff --git a/chaos/common/thread/TLockFreeQueue.h b/chaos/common/thread/TLockFreeQueue.h index 768b73bcf..4c0361279 100644 --- a/chaos/common/thread/TLockFreeQueue.h +++ b/chaos/common/thread/TLockFreeQueue.h @@ -70,7 +70,7 @@ class TLockFreeQueue { if (timeout_ms > 0) { /*boost::system_time const tim = boost::get_system_time() + boost::posix_time::milliseconds(timeout_ms);*/ - if (CHAOS_WAIT(some_read,lock, timeout_ms) == false) { + if (CHAOS_WAIT_MS(some_read,lock, timeout_ms) == false) { return chaos::ErrorCode::EC_GENERIC_TIMEOUT; } @@ -109,7 +109,7 @@ class TLockFreeQueue { if (timeout_ms > 0) { /* boost::system_time const tim = boost::get_system_time() + boost::posix_time::milliseconds(timeout_ms);*/ - if (CHAOS_WAIT(the_condition_variable,lock, timeout_ms)) { + if (CHAOS_WAIT_MS(the_condition_variable,lock, timeout_ms)) { if (pop(popped_value)) { return size; } diff --git a/chaos/common/thread/WaitSemaphore.cpp b/chaos/common/thread/WaitSemaphore.cpp index d7b00557e..96a6c2715 100644 --- a/chaos/common/thread/WaitSemaphore.cpp +++ b/chaos/common/thread/WaitSemaphore.cpp @@ -29,21 +29,21 @@ namespace chaos { if(inWait) return; inWait = true; - while((!answered)){ ChaosUniqueLock lock( wait_answer_mutex );wait_answer_condition.wait(lock);}; + while((!answered)){ ChaosUniqueLock lock( wait_answer_mutex );CHAOS_WAIT(wait_answer_condition,lock);}; inWait = false; answered = false; } void WaitSemaphore::waitRaw() { ChaosUniqueLock lock( wait_answer_mutex ); - wait_answer_condition.wait(lock); + CHAOS_WAIT(wait_answer_condition,lock); } void WaitSemaphore::wait(unsigned long millisecToWait) { ChaosUniqueLock lock( wait_answer_mutex ); if(inWait) return; inWait = true; - do {} while(CHAOS_WAIT(wait_answer_condition,lock,millisecToWait) && !answered); + do {} while(CHAOS_WAIT_MS(wait_answer_condition,lock,millisecToWait) && !answered); inWait = false; answered = false; } -- GitLab