From 237b1d127f5825dbf0ce0f9aca051e7e5cac6b2f Mon Sep 17 00:00:00 2001 From: Claudio Bisegni <Claudio.Bisegni@lnf.infn.it> Date: Thu, 20 Mar 2014 19:59:50 +0100 Subject: [PATCH] added the performance channel and refactorying on message channel Change-Id: I4e4736edd7569cee36ce371d6018cecff0ed14f2 --- CHAOSFramework.xcodeproj/project.pbxproj | 38 ++- .../contents.xcworkspacedata | 3 + CMakeLists.txt | 3 + ChaosDataService/DataConsumer.cpp | 20 +- ChaosDataService/DataConsumer.h | 2 - ChaosDataService/cache_system/CacheDriver.cpp | 27 +- ChaosDataService/cache_system/CacheDriver.h | 26 +- .../cache_system/MemcachedCacheDriver.cpp | 26 +- .../cache_system/MemcachedCacheDriver.h | 27 +- ChaosDataService/cache_system/cache_system.h | 27 +- ChaosDataService/dataservice_global.h | 15 +- ChaosDataService/main.cpp | 30 +- ChaosDataService/storage_system/VFSDriver.cpp | 21 ++ ChaosDataService/storage_system/VFSDriver.h | 62 ++++ ChaosDataService/worker/AnswerDataWorker.cpp | 35 ++- chaos/common/CMakeLists.txt | 9 +- chaos/common/chaos_constants.h | 2 + chaos/common/data/CDataWrapper.cpp | 2 +- chaos/common/data/CDataWrapper.h | 2 +- .../direct_io/DirectIOClientConnection.cpp | 8 +- .../direct_io/DirectIOClientConnection.h | 3 + .../direct_io/DirectIOPerformanceLoop.cpp | 83 ------ .../direct_io/DirectIOPerformanceSession.cpp | 96 ++++++ ...nceLoop.h => DirectIOPerformanceSession.h} | 50 +++- chaos/common/direct_io/DirectIOServer.cpp | 7 + chaos/common/direct_io/DirectIOServer.h | 2 + .../direct_io/DirectIOServerEndpoint.cpp | 7 + .../common/direct_io/DirectIOServerEndpoint.h | 2 + .../direct_io/DirectIOServerPublicInterface.h | 1 + .../channel/DirectIODeviceChannelGlobal.h | 3 +- .../DirectIOPerformanceClientChannel.cpp | 49 ++- .../DirectIOPerformanceClientChannel.h | 2 + .../DirectIOPerformanceServerChannel.cpp | 11 +- .../DirectIOPerformanceServerChannel.h | 6 +- .../direct_io/impl/ZMQDirectIOClient.cpp | 6 +- .../impl/ZMQDirectIOClientConnection.cpp | 12 +- .../impl/ZMQDirectIOClientConnection.h | 4 +- chaos/common/io/IODirectIODriver.cpp | 2 + chaos/common/message/DeviceMessageChannel.cpp | 12 - chaos/common/message/MessageChannel.h | 64 ++-- .../common/message/PerformanceNodeChannel.cpp | 124 ++++++++ chaos/common/message/PerformanceNodeChannel.h | 66 +++++ chaos/common/network/NetworkBroker.cpp | 43 ++- chaos/common/network/NetworkBroker.h | 46 ++- chaos/common/network/PerformanceManagment.cpp | 137 ++++++--- chaos/common/network/PerformanceManagment.h | 27 +- .../utility/TemplatedKeyObjectContainer.h | 4 +- chaos/ui_toolkit/HighLevelApi/HLDataApi.cpp | 3 +- chaos/ui_toolkit/HighLevelApi/HLDataApi.h | 3 +- chaos/ui_toolkit/LowLevelApi/LLRpcApi.cpp | 16 +- chaos/ui_toolkit/LowLevelApi/LLRpcApi.h | 9 +- example/ChaosCLI/main.cpp | 70 ++--- example/ChaosPerformanceTester/CMakeLists.txt | 21 ++ .../project.pbxproj | 280 ++++++++++++++++++ example/ChaosPerformanceTester/main.cpp | 186 ++++++++++++ 55 files changed, 1489 insertions(+), 353 deletions(-) create mode 100644 ChaosDataService/storage_system/VFSDriver.cpp create mode 100644 ChaosDataService/storage_system/VFSDriver.h delete mode 100644 chaos/common/direct_io/DirectIOPerformanceLoop.cpp create mode 100644 chaos/common/direct_io/DirectIOPerformanceSession.cpp rename chaos/common/direct_io/{DirectIOPerformanceLoop.h => DirectIOPerformanceSession.h} (54%) create mode 100644 chaos/common/message/PerformanceNodeChannel.cpp create mode 100644 chaos/common/message/PerformanceNodeChannel.h create mode 100644 example/ChaosPerformanceTester/CMakeLists.txt create mode 100644 example/ChaosPerformanceTester/ChaosPerformanceTester.xcodeproj/project.pbxproj create mode 100644 example/ChaosPerformanceTester/main.cpp diff --git a/CHAOSFramework.xcodeproj/project.pbxproj b/CHAOSFramework.xcodeproj/project.pbxproj index 550c390d9..bdd74a7d6 100644 --- a/CHAOSFramework.xcodeproj/project.pbxproj +++ b/CHAOSFramework.xcodeproj/project.pbxproj @@ -265,6 +265,8 @@ 32AAD78A14CD772F005A097C /* MessageChannel.h in Headers */ = {isa = PBXBuildFile; fileRef = 32AAD78914CD772F005A097C /* MessageChannel.h */; }; 32AAD78C14CD773F005A097C /* MessageChannel.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 32AAD78B14CD773F005A097C /* MessageChannel.cpp */; }; 32AAD78E14CD95FD005A097C /* Atomic.h in Headers */ = {isa = PBXBuildFile; fileRef = 32AAD78D14CD95FD005A097C /* Atomic.h */; }; + 32AF7D4018DAEAF300537DE6 /* PerformanceNodeChannel.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 32AF7D3E18DAEAF300537DE6 /* PerformanceNodeChannel.cpp */; }; + 32AF7D4118DAEAF300537DE6 /* PerformanceNodeChannel.h in Headers */ = {isa = PBXBuildFile; fileRef = 32AF7D3F18DAEAF300537DE6 /* PerformanceNodeChannel.h */; }; 32B10910188C1D6200969D3D /* main.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 32B1090F188C1D6200969D3D /* main.cpp */; }; 32B10912188C1D6200969D3D /* TestAsyncRpc.1 in CopyFiles */ = {isa = PBXBuildFile; fileRef = 32B10911188C1D6200969D3D /* TestAsyncRpc.1 */; }; 32B1378A189BEC3300B17CA4 /* ZMQDirectIOClient.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 32B13788189BEC3300B17CA4 /* ZMQDirectIOClient.cpp */; }; @@ -380,8 +382,8 @@ 32C3546317885A37009ED581 /* shared_library.hpp in Headers */ = {isa = PBXBuildFile; fileRef = 32C3544B17885A37009ED581 /* shared_library.hpp */; }; 32C35467178860E5009ED581 /* extension.hpp in Headers */ = {isa = PBXBuildFile; fileRef = 32C35465178860E5009ED581 /* extension.hpp */; }; 32C3546A178860EB009ED581 /* decl.hpp in Headers */ = {isa = PBXBuildFile; fileRef = 32C35469178860EB009ED581 /* decl.hpp */; }; - 32C5A42718D9959800C0435C /* DirectIOPerformanceLoop.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 32C5A42518D9959800C0435C /* DirectIOPerformanceLoop.cpp */; }; - 32C5A42818D9959800C0435C /* DirectIOPerformanceLoop.h in Headers */ = {isa = PBXBuildFile; fileRef = 32C5A42618D9959800C0435C /* DirectIOPerformanceLoop.h */; }; + 32C5A42718D9959800C0435C /* DirectIOPerformanceSession.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 32C5A42518D9959800C0435C /* DirectIOPerformanceSession.cpp */; }; + 32C5A42818D9959800C0435C /* DirectIOPerformanceSession.h in Headers */ = {isa = PBXBuildFile; fileRef = 32C5A42618D9959800C0435C /* DirectIOPerformanceSession.h */; }; 32C7B8FD16DD362A005B3B21 /* ApiServer.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 32C7B8FB16DD362A005B3B21 /* ApiServer.cpp */; }; 32C7B90116DD365D005B3B21 /* DataManagment.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 32C7B8FF16DD365D005B3B21 /* DataManagment.cpp */; }; 32C7B90516DD6159005B3B21 /* DeviceApi.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 32C7B90316DD6159005B3B21 /* DeviceApi.cpp */; }; @@ -402,6 +404,7 @@ 32D6C8981815293200E9689B /* DirectIOHandler.h in Headers */ = {isa = PBXBuildFile; fileRef = 32D6C8971815293200E9689B /* DirectIOHandler.h */; }; 32D6C89A1815294700E9689B /* DirectIOForwarder.h in Headers */ = {isa = PBXBuildFile; fileRef = 32D6C8991815294700E9689B /* DirectIOForwarder.h */; }; 32DB1188166267FB0036D1B9 /* DSDoubleHandler.h in Headers */ = {isa = PBXBuildFile; fileRef = 32DB1187166267FB0036D1B9 /* DSDoubleHandler.h */; }; + 32DBF9CE18E06B240095B289 /* VFSDriver.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 32DBF9CC18E06B240095B289 /* VFSDriver.cpp */; }; 32DD884F15E2663B000AF740 /* DefaultEventDispatcher.h in Headers */ = {isa = PBXBuildFile; fileRef = 32DD884D15E2663B000AF740 /* DefaultEventDispatcher.h */; }; 32DD885315E2681F000AF740 /* EventHandler.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 32DD885115E2681F000AF740 /* EventHandler.cpp */; }; 32DD885415E2681F000AF740 /* EventHandler.h in Headers */ = {isa = PBXBuildFile; fileRef = 32DD885215E2681F000AF740 /* EventHandler.h */; }; @@ -736,6 +739,8 @@ 32AAD78914CD772F005A097C /* MessageChannel.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = MessageChannel.h; sourceTree = "<group>"; }; 32AAD78B14CD773F005A097C /* MessageChannel.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = MessageChannel.cpp; sourceTree = "<group>"; }; 32AAD78D14CD95FD005A097C /* Atomic.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Atomic.h; sourceTree = "<group>"; }; + 32AF7D3E18DAEAF300537DE6 /* PerformanceNodeChannel.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = PerformanceNodeChannel.cpp; sourceTree = "<group>"; }; + 32AF7D3F18DAEAF300537DE6 /* PerformanceNodeChannel.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = PerformanceNodeChannel.h; sourceTree = "<group>"; }; 32B1090D188C1D6200969D3D /* TestAsyncRpc */ = {isa = PBXFileReference; explicitFileType = "compiled.mach-o.executable"; includeInIndex = 0; path = TestAsyncRpc; sourceTree = BUILT_PRODUCTS_DIR; }; 32B1090F188C1D6200969D3D /* main.cpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = main.cpp; sourceTree = "<group>"; }; 32B10911188C1D6200969D3D /* TestAsyncRpc.1 */ = {isa = PBXFileReference; lastKnownFileType = text.man; path = TestAsyncRpc.1; sourceTree = "<group>"; }; @@ -857,8 +862,8 @@ 32C3544B17885A37009ED581 /* shared_library.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = shared_library.hpp; sourceTree = "<group>"; }; 32C35465178860E5009ED581 /* extension.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = extension.hpp; sourceTree = "<group>"; }; 32C35469178860EB009ED581 /* decl.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = decl.hpp; sourceTree = "<group>"; }; - 32C5A42518D9959800C0435C /* DirectIOPerformanceLoop.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = DirectIOPerformanceLoop.cpp; sourceTree = "<group>"; }; - 32C5A42618D9959800C0435C /* DirectIOPerformanceLoop.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = DirectIOPerformanceLoop.h; sourceTree = "<group>"; }; + 32C5A42518D9959800C0435C /* DirectIOPerformanceSession.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = DirectIOPerformanceSession.cpp; sourceTree = "<group>"; }; + 32C5A42618D9959800C0435C /* DirectIOPerformanceSession.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = DirectIOPerformanceSession.h; sourceTree = "<group>"; }; 32C7B8FB16DD362A005B3B21 /* ApiServer.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = ApiServer.cpp; sourceTree = "<group>"; }; 32C7B8FC16DD362A005B3B21 /* ApiServer.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = ApiServer.h; sourceTree = "<group>"; }; 32C7B8FF16DD365D005B3B21 /* DataManagment.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = DataManagment.cpp; sourceTree = "<group>"; }; @@ -884,6 +889,8 @@ 32D6C8991815294700E9689B /* DirectIOForwarder.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = DirectIOForwarder.h; sourceTree = "<group>"; }; 32DA666F1700DBB6004FC02B /* endian.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = endian.h; sourceTree = "<group>"; }; 32DB1187166267FB0036D1B9 /* DSDoubleHandler.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = DSDoubleHandler.h; sourceTree = "<group>"; }; + 32DBF9CC18E06B240095B289 /* VFSDriver.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = VFSDriver.cpp; sourceTree = "<group>"; }; + 32DBF9CD18E06B240095B289 /* VFSDriver.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = VFSDriver.h; sourceTree = "<group>"; }; 32DD884D15E2663B000AF740 /* DefaultEventDispatcher.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = DefaultEventDispatcher.h; sourceTree = "<group>"; }; 32DD885115E2681F000AF740 /* EventHandler.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = EventHandler.cpp; sourceTree = "<group>"; }; 32DD885215E2681F000AF740 /* EventHandler.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = EventHandler.h; sourceTree = "<group>"; }; @@ -1077,6 +1084,7 @@ 322D98FF18A95F9200C0864C /* DataConsumer.h */, 32B159DB18B4C46E0096B58B /* worker */, 3272152118B00650005AC8FD /* cache_system */, + 32DBF9CB18E046A30095B289 /* storage_system */, ); path = ChaosDataService; sourceTree = "<group>"; @@ -1693,8 +1701,8 @@ 32A886C4189C37EA00321D61 /* ServerFeeder.cpp */, 32A886C5189C37EA00321D61 /* ServerFeeder.h */, 320FB0EB18A39E650014CDB3 /* DirectIO.h */, - 32C5A42518D9959800C0435C /* DirectIOPerformanceLoop.cpp */, - 32C5A42618D9959800C0435C /* DirectIOPerformanceLoop.h */, + 32C5A42518D9959800C0435C /* DirectIOPerformanceSession.cpp */, + 32C5A42618D9959800C0435C /* DirectIOPerformanceSession.h */, ); path = direct_io; sourceTree = "<group>"; @@ -1793,6 +1801,8 @@ 3297F22514D96F31004FFE4F /* DeviceMessageChannel.cpp */, 3297F22814D96F43004FFE4F /* DeviceMessageChannel.h */, 3297F22A14D96F8D004FFE4F /* NodeMessageChannel.h */, + 32AF7D3E18DAEAF300537DE6 /* PerformanceNodeChannel.cpp */, + 32AF7D3F18DAEAF300537DE6 /* PerformanceNodeChannel.h */, ); path = message; sourceTree = "<group>"; @@ -2002,6 +2012,15 @@ path = perf_test; sourceTree = "<group>"; }; + 32DBF9CB18E046A30095B289 /* storage_system */ = { + isa = PBXGroup; + children = ( + 32DBF9CC18E06B240095B289 /* VFSDriver.cpp */, + 32DBF9CD18E06B240095B289 /* VFSDriver.h */, + ); + path = storage_system; + sourceTree = "<group>"; + }; 32DF602B15E14B1900312C94 /* evt_desc */ = { isa = PBXGroup; children = ( @@ -2208,6 +2227,7 @@ 32B57211168E07AB0002A198 /* jsobjmanipulator.h in Headers */, 32C35467178860E5009ED581 /* extension.hpp in Headers */, 32B57213168E07AB0002A198 /* json.h in Headers */, + 32AF7D4118DAEAF300537DE6 /* PerformanceNodeChannel.h in Headers */, 32B57215168E087D0002A198 /* str.h in Headers */, 32B57219168E13FC0002A198 /* time_support.h in Headers */, 32B5721B168E143E0002A198 /* mutex.h in Headers */, @@ -2235,7 +2255,7 @@ 32640A31169DCC5800450962 /* TrackerListener.h in Headers */, 32640A33169DCC5800450962 /* TransformTracker.h in Headers */, 32640A34169DCC5800450962 /* CommonBuffer.h in Headers */, - 32C5A42818D9959800C0435C /* DirectIOPerformanceLoop.h in Headers */, + 32C5A42818D9959800C0435C /* DirectIOPerformanceSession.h in Headers */, 32640A35169DCC5800450962 /* AbstractDataElement.h in Headers */, 32640A36169DCC5800450962 /* DataElement.h in Headers */, 32640A37169DCC5800450962 /* SmartData.h in Headers */, @@ -2491,6 +2511,7 @@ 32A10C0318C87806006EA04C /* DeviceSharedDataWorker.cpp in Sources */, 323236DA164573F50052CE06 /* main.cpp in Sources */, 32B159DA18B4B9F90096B58B /* DataWorker.cpp in Sources */, + 32DBF9CE18E06B240095B289 /* VFSDriver.cpp in Sources */, 3299363618A7C4F60093E7F2 /* ChaosDataService.cpp in Sources */, 32B159DE18B4D4210096B58B /* MemcachedCacheDriver.cpp in Sources */, 326459CC18D073BA0091B2E3 /* AnswerDataWorker.cpp in Sources */, @@ -2594,12 +2615,13 @@ 3230BEC2181A6ADB00EA5793 /* DatasetDB.cpp in Sources */, 32B57231168E4E010002A198 /* jsobj.cpp in Sources */, 32B57238168E50980002A198 /* md5.cpp in Sources */, + 32AF7D4018DAEAF300537DE6 /* PerformanceNodeChannel.cpp in Sources */, 32B1378A189BEC3300B17CA4 /* ZMQDirectIOClient.cpp in Sources */, 32FA968317C894830000BE9E /* Semaphore.cpp in Sources */, 329603221876B45000CDCADA /* BatchCommand.cpp in Sources */, 32BFF90E176C842E00D27A04 /* InizializableService.cpp in Sources */, 32B5723C168E547B0002A198 /* stringutils.cpp in Sources */, - 32C5A42718D9959800C0435C /* DirectIOPerformanceLoop.cpp in Sources */, + 32C5A42718D9959800C0435C /* DirectIOPerformanceSession.cpp in Sources */, 32279F8416F8DEAF009EC7B9 /* DataBroker.cpp in Sources */, 3264BBD51784887900E2F5A0 /* AbstractPlugin.cpp in Sources */, 3219E69F18CC7AE600C2F795 /* DirectIOClientConnection.cpp in Sources */, diff --git a/CHAOSWorkspace.xcworkspace/contents.xcworkspacedata b/CHAOSWorkspace.xcworkspace/contents.xcworkspacedata index 91ad2df04..123d091e7 100644 --- a/CHAOSWorkspace.xcworkspace/contents.xcworkspacedata +++ b/CHAOSWorkspace.xcworkspace/contents.xcworkspacedata @@ -26,6 +26,9 @@ <FileRef location = "group:example/ChaosCLI/ChaosCLI.xcodeproj"> </FileRef> + <FileRef + location = "group:example/ChaosPerformanceTester/ChaosPerformanceTester.xcodeproj"> + </FileRef> <FileRef location = "group:test/CachingTest/CachingTest.xcodeproj"> </FileRef> diff --git a/CMakeLists.txt b/CMakeLists.txt index 413bc5e3d..545248f49 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -48,6 +48,9 @@ ADD_SUBDIRECTORY(ChaosDataService bin/bin_chst) message(STATUS "Configure Chaos CLI") ADD_SUBDIRECTORY(example/ChaosCLI bin/chaos_cli) +message(STATUS "Configure Chaos Performance Tester") +ADD_SUBDIRECTORY(example/ChaosPerformanceTester bin/ChaosPerformanceTester) + #message(STATUS "Configure Control Unit Test examples") #ADD_SUBDIRECTORY(example/ControlUnitTest bin/bin_example_cutest) diff --git a/ChaosDataService/DataConsumer.cpp b/ChaosDataService/DataConsumer.cpp index 2cedf99e5..716be58fa 100644 --- a/ChaosDataService/DataConsumer.cpp +++ b/ChaosDataService/DataConsumer.cpp @@ -64,17 +64,17 @@ void DataConsumer::init(void *init_data) throw (chaos::CException) { cache_impl_name.append("CacheDriver"); DSLAPP_ << "The cache implementation to allocate is " << cache_impl_name; //cache_driver_instance - device_data_worker = (chaos::data_service::worker::DataWorker**) malloc(sizeof(chaos::data_service::worker::DataWorker**) * DEVICE_WORKER_NUMBER); + device_data_worker = (chaos::data_service::worker::DataWorker**) malloc(sizeof(chaos::data_service::worker::DataWorker**) * settings->caching_worker_num); if(!device_data_worker) throw chaos::CException(-5, "Error allocating device workers", __FUNCTION__); chaos::data_service::worker::DeviceSharedDataWorker *tmp = NULL; - for(int idx = 0; idx < DEVICE_WORKER_NUMBER; idx++) { + for(int idx = 0; idx < settings->caching_worker_num; idx++) { device_data_worker[idx] = (tmp = new chaos::data_service::worker::DeviceSharedDataWorker(cache_impl_name)); DSLAPP_ << "Configure server on device worker " << idx; for(CacheServerListIterator iter = settings->startup_chache_servers.begin(); iter != settings->startup_chache_servers.end(); iter++) { - tmp->init(NULL); + tmp->init(&settings->caching_worker_setting); tmp->start(); tmp->addServer(*iter); } @@ -82,7 +82,7 @@ void DataConsumer::init(void *init_data) throw (chaos::CException) { //add answer worker chaos::data_service::worker::AnswerDataWorker *tmp_data_worker = NULL; - for(int idx = 0; idx < ANSWER_WORKER_NUMBER; idx++) { + for(int idx = 0; idx < settings->answer_worker_num; idx++) { //allocate a new worker with his personal client instance //get the cache driver instance cache_system::CacheDriver *cache_driver_instance = chaos::ObjectFactoryRegister<cache_system::CacheDriver>::getInstance()->getNewInstanceByName(cache_impl_name.c_str()); @@ -92,7 +92,7 @@ void DataConsumer::init(void *init_data) throw (chaos::CException) { cache_driver_instance->addServer(*iter); } tmp_data_worker = new chaos::data_service::worker::AnswerDataWorker(network_broker->getDirectIOClientInstance(), cache_driver_instance); - tmp_data_worker->init(NULL); + tmp_data_worker->init(&settings->answer_worker_setting); tmp_data_worker->start(); answer_worker_list.addSlot(tmp_data_worker); } @@ -105,7 +105,7 @@ void DataConsumer::stop() throw (chaos::CException) { } void DataConsumer::deinit() throw (chaos::CException) { - DSLAPP_ << "Release DirectIOCDataWrapperServerChannel into the endpoint"; + DSLAPP_ << "Release direct io device channel into the endpoint"; server_endpoint->releaseChannelInstance(device_channel); DSLAPP_ << "Deallocating answer worker list"; @@ -118,7 +118,7 @@ void DataConsumer::deinit() throw (chaos::CException) { } answer_worker_list.clearSlots(); DSLAPP_ << "Deallocating device push data worker list"; - for(int idx = 0; idx < DEVICE_WORKER_NUMBER; idx++) { + for(int idx = 0; idx < settings->caching_worker_num; idx++) { DSLAPP_ << "Release device worker "<< idx; device_data_worker[idx]->stop(); device_data_worker[idx]->deinit(); @@ -129,7 +129,7 @@ void DataConsumer::deinit() throw (chaos::CException) { } void DataConsumer::consumePutEvent(DirectIODeviceChannelHeaderPutOpcode *header, void *channel_data, uint32_t channel_data_len) { - uint32_t index_to_use = device_data_worker_index++ % DEVICE_WORKER_NUMBER; + uint32_t index_to_use = device_data_worker_index++ % settings->caching_worker_num; chaos::data_service::worker::DeviceSharedWorkerJob *job = new chaos::data_service::worker::DeviceSharedWorkerJob(); job->request_header = header; @@ -150,10 +150,8 @@ void DataConsumer::consumeGetEvent(DirectIODeviceChannelHeaderGetOpcode *header, job->key_data = channel_data; job->key_len = channel_data_len; job->request_header = header; - //while(); - //((worker::AnswerDataWorker*)worker)->executeJob(job) if(!worker->submitJobInfo(job)) { - DEBUG_CODE(DSLDBG_ << "error pushing data into worker queue"); + DEBUG_CODE(DSLDBG_ << "error pushing data into answer queue"); delete job; } } \ No newline at end of file diff --git a/ChaosDataService/DataConsumer.h b/ChaosDataService/DataConsumer.h index 8fb4bfcc6..5c6ff85cc 100644 --- a/ChaosDataService/DataConsumer.h +++ b/ChaosDataService/DataConsumer.h @@ -38,8 +38,6 @@ using namespace chaos::common::direct_io; using namespace chaos::common::direct_io::channel; using namespace chaos::common::direct_io::channel::opcode_headers; -#define DEVICE_WORKER_NUMBER 10 -#define ANSWER_WORKER_NUMBER 2 namespace chaos{ namespace data_service { diff --git a/ChaosDataService/cache_system/CacheDriver.cpp b/ChaosDataService/cache_system/CacheDriver.cpp index 42373d098..33a178ddd 100644 --- a/ChaosDataService/cache_system/CacheDriver.cpp +++ b/ChaosDataService/cache_system/CacheDriver.cpp @@ -1,11 +1,22 @@ -// -// CacheDriver.cpp -// CHAOSFramework -// -// Created by Claudio Bisegni on 15/02/14. -// Copyright (c) 2014 INFN. All rights reserved. -// - +/* + * CacheDriver.cpp + * !CHOAS + * Created by Bisegni Claudio. + * + * Copyright 2012 INFN, National Institute of Nuclear Physics + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ #include "CacheDriver.h" using namespace chaos::data_service::cache_system; diff --git a/ChaosDataService/cache_system/CacheDriver.h b/ChaosDataService/cache_system/CacheDriver.h index a87c816bb..d21445ae2 100644 --- a/ChaosDataService/cache_system/CacheDriver.h +++ b/ChaosDataService/cache_system/CacheDriver.h @@ -1,10 +1,22 @@ -// -// CacheDriver.h -// CHAOSFramework -// -// Created by Claudio Bisegni on 15/02/14. -// Copyright (c) 2014 INFN. All rights reserved. -// +/* + * CacheDriver.h + * !CHOAS + * Created by Bisegni Claudio. + * + * Copyright 2012 INFN, National Institute of Nuclear Physics + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ #ifndef __CHAOSFramework__CacheDriver__ #define __CHAOSFramework__CacheDriver__ diff --git a/ChaosDataService/cache_system/MemcachedCacheDriver.cpp b/ChaosDataService/cache_system/MemcachedCacheDriver.cpp index edc2ddea9..370be5947 100644 --- a/ChaosDataService/cache_system/MemcachedCacheDriver.cpp +++ b/ChaosDataService/cache_system/MemcachedCacheDriver.cpp @@ -1,10 +1,22 @@ -// -// MemcachedCacheDriver.cpp -// CHAOSFramework -// -// Created by Claudio Bisegni on 19/02/14. -// Copyright (c) 2014 INFN. All rights reserved. -// +/* + * MemcachedChacheDriver.cpp + * !CHOAS + * Created by Bisegni Claudio. + * + * Copyright 2012 INFN, National Institute of Nuclear Physics + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ #include "MemcachedCacheDriver.h" diff --git a/ChaosDataService/cache_system/MemcachedCacheDriver.h b/ChaosDataService/cache_system/MemcachedCacheDriver.h index 3e1d03be4..982c0de43 100644 --- a/ChaosDataService/cache_system/MemcachedCacheDriver.h +++ b/ChaosDataService/cache_system/MemcachedCacheDriver.h @@ -1,11 +1,22 @@ -// -// MemcachedCacheDriver.h -// CHAOSFramework -// -// Created by Claudio Bisegni on 19/02/14. -// Copyright (c) 2014 INFN. All rights reserved. -// - +/* + * MemcachedCacheDriver.h + * !CHOAS + * Created by Bisegni Claudio. + * + * Copyright 2012 INFN, National Institute of Nuclear Physics + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ #ifndef __CHAOSFramework__MemcachedCacheDriver__ #define __CHAOSFramework__MemcachedCacheDriver__ diff --git a/ChaosDataService/cache_system/cache_system.h b/ChaosDataService/cache_system/cache_system.h index 2d5b5a889..8a915e49e 100644 --- a/ChaosDataService/cache_system/cache_system.h +++ b/ChaosDataService/cache_system/cache_system.h @@ -1,11 +1,22 @@ -// -// cache_sytem.h -// CHAOSFramework -// -// Created by Claudio Bisegni on 20/02/14. -// Copyright (c) 2014 INFN. All rights reserved. -// - +/* + * cache_system.h + * !CHOAS + * Created by Bisegni Claudio. + * + * Copyright 2012 INFN, National Institute of Nuclear Physics + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ #ifndef CHAOSFramework_cache_sytem_h #define CHAOSFramework_cache_sytem_h #include "MemcachedCacheDriver.h" diff --git a/ChaosDataService/dataservice_global.h b/ChaosDataService/dataservice_global.h index c0e5fcad7..740e8bc6a 100644 --- a/ChaosDataService/dataservice_global.h +++ b/ChaosDataService/dataservice_global.h @@ -30,10 +30,19 @@ namespace chaos{ typedef std::vector<std::string> CacheServerList; typedef std::vector<std::string>::iterator CacheServerListIterator; + //! Setting for dataservice configuration typedef struct ChaosDataServiceSetting { - worker::DataWorkerSetting worker_setting; - std::string cache_driver_impl; - CacheServerList startup_chache_servers; + + //!cache configuration + std::string cache_driver_impl; + CacheServerList startup_chache_servers; + unsigned int caching_worker_num; + worker::DataWorkerSetting caching_worker_setting; + + //----------answer worker------------------ + unsigned int answer_worker_num; + worker::DataWorkerSetting answer_worker_setting; + } ChaosDataServiceSetting; } } diff --git a/ChaosDataService/main.cpp b/ChaosDataService/main.cpp index 1b22ef34d..f51cee8da 100644 --- a/ChaosDataService/main.cpp +++ b/ChaosDataService/main.cpp @@ -25,8 +25,16 @@ using namespace chaos::data_service; -#define OPT_CACHE_SERVER_LIST "cache_servers" -#define OPT_CACHE_DRIVER "cache_driver" +#define OPT_CACHE_SERVER_LIST "cache_servers" +#define OPT_CACHE_DRIVER "cache_driver" +#define OPT_CACHE_WORKER_NUM "cache_worker_num" +#define OPT_CACHE_WORKER_THREAD "cache_worker_thread" + +#define OPT_ANSWER_WORKER_NUM "answer_worker_num" +#define OPT_ANSWER_WORKER_THREAD "answer_worker_thread" + +#define DEVICE_WORKER_NUMBER 10 +#define ANSWER_WORKER_NUMBER 2 int main(int argc, char * argv[]) { try { @@ -41,7 +49,25 @@ int main(int argc, char * argv[]) { ChaosDataService::getInstance()->getGlobalConfigurationInstance()->addOption< std::vector<std::string> >(OPT_CACHE_SERVER_LIST, "The list of the cache server", &ChaosDataService::getInstance()->settings.startup_chache_servers); + ChaosDataService::getInstance()->getGlobalConfigurationInstance()->addOption< unsigned int >(OPT_CACHE_WORKER_NUM, + "The number of the cache worker", + DEVICE_WORKER_NUMBER, + &ChaosDataService::getInstance()->settings.caching_worker_num); + + ChaosDataService::getInstance()->getGlobalConfigurationInstance()->addOption< unsigned int >(OPT_CACHE_WORKER_THREAD, + "The thread number of each cache worker", + 1, + &ChaosDataService::getInstance()->settings.caching_worker_setting.job_thread_number); + + ChaosDataService::getInstance()->getGlobalConfigurationInstance()->addOption< unsigned int >(OPT_ANSWER_WORKER_NUM, + "The number of the answer worker", + ANSWER_WORKER_NUMBER, + &ChaosDataService::getInstance()->settings.answer_worker_num); + ChaosDataService::getInstance()->getGlobalConfigurationInstance()->addOption< unsigned int >(OPT_ANSWER_WORKER_THREAD, + "The thread number of each answer worker", + 1, + &ChaosDataService::getInstance()->settings.answer_worker_setting.job_thread_number); ChaosDataService::getInstance()->init(argc, argv); if(!ChaosDataService::getInstance()->getGlobalConfigurationInstance()->hasOption(OPT_CACHE_SERVER_LIST)) { diff --git a/ChaosDataService/storage_system/VFSDriver.cpp b/ChaosDataService/storage_system/VFSDriver.cpp new file mode 100644 index 000000000..bc85fdc1b --- /dev/null +++ b/ChaosDataService/storage_system/VFSDriver.cpp @@ -0,0 +1,21 @@ +/* + * VFSDriver.cpp + * !CHOAS + * Created by Bisegni Claudio. + * + * Copyright 2012 INFN, National Institute of Nuclear Physics + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "VFSDriver.h" diff --git a/ChaosDataService/storage_system/VFSDriver.h b/ChaosDataService/storage_system/VFSDriver.h new file mode 100644 index 000000000..6f648e82a --- /dev/null +++ b/ChaosDataService/storage_system/VFSDriver.h @@ -0,0 +1,62 @@ +/* + * VFSDriver.h + * !CHOAS + * Created by Bisegni Claudio. + * + * Copyright 2012 INFN, National Institute of Nuclear Physics + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __CHAOSFramework__VFSDriver__ +#define __CHAOSFramework__VFSDriver__ + +#include <string> + +namespace chaos { + namespace data_service { + namespace storage_system { + + + class BlockInfo { + + }; + + typedef enum BlockType { + BlockTypeStage, + BlockTypeData + } BlockType; + + //! Abstraction of the vfs driver + /*! + This class represent the abstraction of the + work to do on chaos VFS + */ + class VFSDriver { + public: + /*! + + */ + virtual BlockInfo *getNewBlock(BlockType type, std::string path, uint32_t max_size, uint32_t max_live_time) = 0; + + virtual BlockInfo *releaseBlock(BlockType type, std::string path) = 0; + + virtual int write(BlockInfo *, void * data, uint32_t data_len) = 0; + + virtual int read(BlockInfo *, uint64_t offset, void * * data, uint32_t *data_len) = 0; + }; + + } + } +} + +#endif /* defined(__CHAOSFramework__VFSDriver__) */ diff --git a/ChaosDataService/worker/AnswerDataWorker.cpp b/ChaosDataService/worker/AnswerDataWorker.cpp index 612d7b58c..e5d704563 100644 --- a/ChaosDataService/worker/AnswerDataWorker.cpp +++ b/ChaosDataService/worker/AnswerDataWorker.cpp @@ -47,11 +47,11 @@ direct_io_client(_client_instance) { } AnswerDataWorker::~AnswerDataWorker() { + if(getServiceState() == service_state_machine::InizializableServiceType::IS_INITIATED) { + deinit(); + } //delete the cache instance if(cache_driver_instance) delete(cache_driver_instance); - - //delete all registered object - TemplatedKeyObjectContainer::clearElement(); } void AnswerDataWorker::init(void *init_data) throw (chaos::CException) { @@ -73,10 +73,20 @@ void AnswerDataWorker::deinit() throw (chaos::CException) { //shutdown purger thread ADWLAPP_ << "stop the purge thread"; work_on_purge = false; - purge_thread_wait_variable. unlock(); + purge_thread_wait_variable.unlock(); purge_thread->join(); ADWLAPP_ << "purge thread stoppped"; + + //delete all registered object + ADWLAPP_ << "Clear all active connections"; + TemplatedKeyObjectContainer::clearElement(); + + //try to remove all other remainde disconnected connections + ADWLAPP_ << "Clear all orfaned answer connection"; + purge(); + //deinit client + ADWLAPP_ << "Deinitilize the directio client"; chaos::utility::InizializableService::deinitImplementation(direct_io_client, direct_io_client->getName(), __PRETTY_FUNCTION__); //call superclass init method @@ -202,9 +212,22 @@ void AnswerDataWorker::handleEvent(chaos_direct_io::DirectIOClientConnection *cl } void AnswerDataWorker::freeObject(uint32_t key, ClientConnectionInfo *elementPtr) { + //lock the creation of new connection + boost::unique_lock<boost::shared_mutex> uniqueLock(mutex_add_new_client); + + if(TemplatedKeyObjectContainer::hasKey(elementPtr->connection->getConnectionHash())) { + ClientConnectionInfo *connection_info = TemplatedKeyObjectContainer::accessItem(elementPtr->connection->getConnectionHash()); + + //NEED TO BE FOUND A LOGIC TO INVALIDATE AND DELETE AFTER THIS METHOD IS TERMINATED + map_to_purge.insert(make_pair(elementPtr->connection->getConnectionHash(), connection_info)); + DEBUG_CODE(ADWLDBG_ << "Added to purge queue for connection of the server " << elementPtr->connection->getServerDescription() << " and hash " << elementPtr->connection->getConnectionHash();) + + TemplatedKeyObjectContainer::deregisterElementKey(elementPtr->connection->getConnectionHash()); + } + //dispose element non managed, thi smethod is called only when //TemplatedKeyObjectContainer::clearElement(); is called (only in destructor - disposeClientInfo(elementPtr); + //disposeClientInfo(elementPtr); } void AnswerDataWorker::disposeClientInfo(ClientConnectionInfo *client_info) { @@ -232,8 +255,6 @@ void AnswerDataWorker::purge_thread_worker() { purge(); purge_thread_wait_variable.wait(5000); } - //befor thread ends, try to remove all other remainde connections - purge(); } void AnswerDataWorker::purge(uint32_t max_purge_element) { diff --git a/chaos/common/CMakeLists.txt b/chaos/common/CMakeLists.txt index 5eab716ae..e63597440 100644 --- a/chaos/common/CMakeLists.txt +++ b/chaos/common/CMakeLists.txt @@ -19,15 +19,16 @@ SET(common_lib_src ${common_lib_src} event/EventClient.cpp event/AsioImplEventC event/evt_desc/CustomEventDescriptor.cpp event/channel/EventChannel.cpp event/channel/AlertEventChannel.cpp event/channel/InstrumentEventChannel.cpp) SET(common_lib_src ${common_lib_src} direct_io/DirectIOClient.cpp direct_io/DirectIOClientConnection.cpp direct_io/DirectIOServer.cpp - direct_io/ServerFeeder.cpp direct_io/DirectIOServerEndpoint.cpp direct_io/DirectIODispatcher.cpp + direct_io/ServerFeeder.cpp direct_io/DirectIOServerEndpoint.cpp direct_io/DirectIODispatcher.cpp direct_io/DirectIOPerformanceSession.cpp direct_io/impl/ZMQDirectIOClient.cpp direct_io/impl/ZMQDirectIOServer.cpp direct_io/impl/ZMQBaseClass.cpp direct_io/impl/ZMQDirectIOClientConnection.cpp direct_io/channel/DirectIOVirtualChannel.cpp direct_io/channel/DirectIOVirtualClientChannel.cpp direct_io/channel/DirectIOVirtualServerChannel.cpp direct_io/channel/DirectIOCDataWrapperClientChannel.cpp direct_io/channel/DirectIOCDataWrapperServerChannel.cpp - direct_io/channel/DirectIODeviceClientChannel.cpp direct_io/channel/DirectIODeviceServerChannel.cpp) + direct_io/channel/DirectIODeviceClientChannel.cpp direct_io/channel/DirectIODeviceServerChannel.cpp + direct_io/channel/DirectIOPerformanceServerChannel.cpp direct_io/channel/DirectIOPerformanceClientChannel.cpp) SET(common_lib_src ${common_lib_src} io/IODataDriver.cpp io/IOMemcachedIODriver.cpp io/IODirectIODriver.cpp) -SET(common_lib_src ${common_lib_src} message/DeviceMessageChannel.cpp message/MDSMessageChannel.cpp message/MessageChannel.cpp) -SET(common_lib_src ${common_lib_src} network/NetworkBroker.cpp) +SET(common_lib_src ${common_lib_src} message/DeviceMessageChannel.cpp message/MDSMessageChannel.cpp message/MessageChannel.cpp message/PerformanceNodeChannel.cpp) +SET(common_lib_src ${common_lib_src} network/NetworkBroker.cpp network/PerformanceManagment.cpp) SET(common_lib_src ${common_lib_src} rpc/RpcClient.cpp rpc/RpcServer.cpp rpc/msgpack/MsgPackClient.cpp rpc/msgpack/MsgPackServer.cpp) SET(common_lib_src ${common_lib_src} thread/CThread.cpp thread/CThreadGroup.cpp) SET(common_lib_src ${common_lib_src} utility/NamedService.cpp utility/StartableService.cpp utility/InizializableService.cpp) diff --git a/chaos/common/chaos_constants.h b/chaos/common/chaos_constants.h index 976c6a039..e0548d56b 100644 --- a/chaos/common/chaos_constants.h +++ b/chaos/common/chaos_constants.h @@ -399,6 +399,8 @@ namespace chaos { static const char * const ACTION_PERFORMANCE_INIT_SESSION= "sp:init_session"; + static const char * const ACTION_PERFORMANCE_CLOSE_SESSION= "sp:close_session"; + static const char * const KEY_REQUEST_SERVER_DESCRITPION = "sp::req_serv_desc"; } /** @} */ // end of PerformanceSystemRpcKey diff --git a/chaos/common/data/CDataWrapper.cpp b/chaos/common/data/CDataWrapper.cpp index 75198ac0a..6ba3feaf3 100644 --- a/chaos/common/data/CDataWrapper.cpp +++ b/chaos/common/data/CDataWrapper.cpp @@ -81,7 +81,7 @@ void CDataWrapper::addStringValue(const char * key, const char * strValue) { } //add a string value -void CDataWrapper::addStringValue(const char *key, string& strValue) { +void CDataWrapper::addStringValue(const char *key, string strValue) { bsonBuilder->append(key, strValue); } diff --git a/chaos/common/data/CDataWrapper.h b/chaos/common/data/CDataWrapper.h index 533968604..8113655a0 100644 --- a/chaos/common/data/CDataWrapper.h +++ b/chaos/common/data/CDataWrapper.h @@ -106,7 +106,7 @@ namespace chaos { void addStringValue(const char *, const char *); //add a string value - void addStringValue(const char *, string&); + void addStringValue(const char *, string); //add a string to array void appendStringToArray(const char *); diff --git a/chaos/common/direct_io/DirectIOClientConnection.cpp b/chaos/common/direct_io/DirectIOClientConnection.cpp index 88a992dea..e39dbe2b9 100644 --- a/chaos/common/direct_io/DirectIOClientConnection.cpp +++ b/chaos/common/direct_io/DirectIOClientConnection.cpp @@ -49,8 +49,8 @@ DirectIOClientConnection::DirectIOClientConnection(std::string _server_descripti //generate random hash from uuid lite std::string unique_uuid = UUIDUtil::generateUUIDLite(); - std::string answer_server_description = boost::str( boost::format("%1%|%2%") % server_description % endpoint); - connection_hash = chaos::common::data::cache::FastHash::hash(answer_server_description.c_str(), answer_server_description.size(), 0); + url = boost::str( boost::format("%1%|%2%") % server_description % endpoint); + connection_hash = chaos::common::data::cache::FastHash::hash(url.c_str(), url.size(), 0); unique_hash = chaos::common::data::cache::FastHash::hash(unique_uuid.c_str(), unique_uuid.size(), 0); } @@ -62,6 +62,10 @@ const char * DirectIOClientConnection::getServerDescription() { return server_description.c_str(); } +std::string DirectIOClientConnection::getURL() { + return url; +} + std::string DirectIOClientConnection::getStrIp() { return my_str_ip; } diff --git a/chaos/common/direct_io/DirectIOClientConnection.h b/chaos/common/direct_io/DirectIOClientConnection.h index ee43a8878..7b1a011e1 100644 --- a/chaos/common/direct_io/DirectIOClientConnection.h +++ b/chaos/common/direct_io/DirectIOClientConnection.h @@ -73,6 +73,7 @@ namespace chaos { friend class chaos::NetworkBroker; protected: + std::string url; std::string server_description; uint16_t endpoint; uint32_t connection_hash; @@ -118,6 +119,8 @@ namespace chaos { const char * getServerDescription(); + std::string getURL(); + //! return the state of the connection DirectIOClientConnectionStateType::DirectIOClientConnectionStateType getState(); diff --git a/chaos/common/direct_io/DirectIOPerformanceLoop.cpp b/chaos/common/direct_io/DirectIOPerformanceLoop.cpp deleted file mode 100644 index 5a4f4a697..000000000 --- a/chaos/common/direct_io/DirectIOPerformanceLoop.cpp +++ /dev/null @@ -1,83 +0,0 @@ -// -// DirectIOPerformanceLoop.cpp -// CHAOSFramework -// -// Created by Claudio Bisegni on 19/03/14. -// Copyright (c) 2014 INFN. All rights reserved. -// - -#include <chaos/common/direct_io/DirectIOPerformanceLoop.h> -#include <chaos/common/direct_io/DirectIOClient.h> -#include <chaos/common/direct_io/DirectIODispatcher.h> -#include <chaos/common/direct_io/DirectIOServerEndpoint.h> - -using namespace chaos::common::direct_io; - -#define DIPL_LOG_HEAD "[DirectIOPerformanceLoop] - " - -#define DIPLLAPP_ LAPP_ << DIPL_LOG_HEAD -#define DIPLLDBG_ LDBG_ << DIPL_LOG_HEAD << __FUNCTION__ -#define DIPLLERR_ LERR_ << DIPL_LOG_HEAD - -DirectIOPerformanceLoop::DirectIOPerformanceLoop(DirectIOClientConnection *_client_connection, DirectIOServerEndpoint *_server_endpoint): - client_connection(_client_connection), server_endpoint(_server_endpoint){ -} - -DirectIOPerformanceLoop::~DirectIOPerformanceLoop() { -} - -void DirectIOPerformanceLoop::setConnectionHandler(DirectIOClientConnectionEventHandler *event_handler) { - if(!client_connection) return; - client_connection->setEventHandler(event_handler); -} - -// Initialize instance -void DirectIOPerformanceLoop::init(void *init_data) throw(chaos::CException) { - //check the client and server - if(!client_connection) throw CException(-1 ,"The client instance as not been set", __PRETTY_FUNCTION__); - if(!server_endpoint) throw CException(-2 ,"Error initializing the endpoint", __PRETTY_FUNCTION__); - - //get the connection to the client - DIPLLAPP_ << "allocating the client channel"; - //get the channels - client_channel = dynamic_cast<channel::DirectIOPerformanceClientChannel*>(client_connection->getNewChannelInstance(std::string("DirectIOPerformanceClientChannel"))); - if(client_channel) throw CException(-3 ,"Error creating the client channel", __PRETTY_FUNCTION__); - - DIPLLAPP_ << "allocating the server channel"; - server_channel = dynamic_cast<channel::DirectIOPerformanceServerChannel*>(server_endpoint->getNewChannelInstance(std::string("DirectIOPerformanceServerChannel"))); - if(server_channel) throw CException(-3 ,"Error creating the server channel", __PRETTY_FUNCTION__); - server_channel->setHandler(this); -} - -// Deinit the implementation -void DirectIOPerformanceLoop::deinit() throw(chaos::CException) { - //release the channel - - if(server_endpoint && server_channel) { - DIPLLAPP_ << "deallocating the server channel"; - server_endpoint->releaseChannelInstance(server_channel); - } else { - DIPLLAPP_ << "no server channel allocated"; - } - - if(client_connection && client_channel) { - DIPLLAPP_ << "deallocating the client channel"; - client_connection->releaseChannelInstance(client_channel); - } else { - DIPLLAPP_ << "no client channel allocated"; - } -} - -channel::opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr DirectIOPerformanceLoop::rttTest(uint32_t ms_timeout) { - channel::opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr result = NULL; - client_channel->sendRoundTripMessage(); - rtt_request_wait_sema.wait(ms_timeout); - result = rtt_answer; - rtt_answer = NULL; - return result; -} - -void DirectIOPerformanceLoop::handleRoundTripRequest(channel::opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr rtt_request) { - rtt_answer = rtt_request; - rtt_request_wait_sema.unlock(); -} \ No newline at end of file diff --git a/chaos/common/direct_io/DirectIOPerformanceSession.cpp b/chaos/common/direct_io/DirectIOPerformanceSession.cpp new file mode 100644 index 000000000..4cb97ae0e --- /dev/null +++ b/chaos/common/direct_io/DirectIOPerformanceSession.cpp @@ -0,0 +1,96 @@ +// +// DirectIOPerformanceSession.cpp +// CHAOSFramework +// +// Created by Claudio Bisegni on 19/03/14. +// Copyright (c) 2014 INFN. All rights reserved. +// + +#include <chaos/common/direct_io/DirectIOPerformanceSession.h> +#include <chaos/common/direct_io/DirectIOClient.h> +#include <chaos/common/direct_io/DirectIODispatcher.h> +#include <chaos/common/direct_io/DirectIOServerEndpoint.h> + +#include <boost/date_time/posix_time/posix_time.hpp> +#include <boost/date_time/c_local_time_adjustor.hpp> +using namespace chaos::common::direct_io; + +#define DIPL_LOG_HEAD "[DirectIOPerformanceSession] - " + +#define DIPLLAPP_ LAPP_ << DIPL_LOG_HEAD +#define DIPLLDBG_ LDBG_ << DIPL_LOG_HEAD << __FUNCTION__ +#define DIPLLERR_ LERR_ << DIPL_LOG_HEAD + +DirectIOPerformanceSession::DirectIOPerformanceSession(DirectIOClientConnection *_client_connection, DirectIOServerEndpoint *_server_endpoint): + client_connection(_client_connection), server_endpoint(_server_endpoint), rtt_queue(1){ +} + +DirectIOPerformanceSession::~DirectIOPerformanceSession() { +} + +void DirectIOPerformanceSession::setConnectionHandler(DirectIOClientConnectionEventHandler *event_handler) { + if(!client_connection) return; + client_connection->setEventHandler(event_handler); +} + +// Initialize instance +void DirectIOPerformanceSession::init(void *init_data) throw(chaos::CException) { + //check the client and server + if(!client_connection) throw CException(-1 ,"The client instance as not been set", __PRETTY_FUNCTION__); + if(!server_endpoint) throw CException(-2 ,"Error initializing the endpoint", __PRETTY_FUNCTION__); + + //get the connection to the client + DIPLLAPP_ << "allocating the client channel"; + //get the channels + client_channel = dynamic_cast<channel::DirectIOPerformanceClientChannel*>(client_connection->getNewChannelInstance(std::string("DirectIOPerformanceClientChannel"))); + if(!client_channel) throw CException(-3 ,"Error creating the client channel", __PRETTY_FUNCTION__); + + DIPLLAPP_ << "allocating the server channel"; + server_channel = dynamic_cast<channel::DirectIOPerformanceServerChannel*>(server_endpoint->getNewChannelInstance(std::string("DirectIOPerformanceServerChannel"))); + if(!server_channel) throw CException(-3 ,"Error creating the server channel", __PRETTY_FUNCTION__); + server_channel->setHandler(this); +} + +// Deinit the implementation +void DirectIOPerformanceSession::deinit() throw(chaos::CException) { + //release the channel + + if(server_endpoint && server_channel) { + DIPLLAPP_ << "deallocating the server channel"; + server_endpoint->releaseChannelInstance(server_channel); + } else { + DIPLLAPP_ << "no server channel allocated"; + } + + if(client_connection && client_channel) { + DIPLLAPP_ << "deallocating the client channel"; + client_connection->releaseChannelInstance(client_channel); + } else { + DIPLLAPP_ << "no client channel allocated"; + } +} + +int64_t DirectIOPerformanceSession::sendRttTest(uint32_t ms_timeout) { + int64_t result =0; + result = client_channel->sendRoundTripMessage(); + return result; +} + +RttResultFetcher *DirectIOPerformanceSession::getRttResultQueue() { + ResultFetcher<channel::opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr> *result = new ResultFetcher<channel::opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr>(&rtt_queue); + return result; +} + +void DirectIOPerformanceSession::handleReqRoundTripRequest(channel::opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr rtt_request) { + //send the answer + DIPLLAPP_ << "pre req"; + if(client_channel->answerRoundTripMessage(rtt_request) == -1) { + DIPLLERR_ << "Error sending rtt answer"; + } + DIPLLAPP_ << "post req"; +} + +void DirectIOPerformanceSession::handleRespRoundTripRequest(channel::opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr rtt_request) { + //push result on queue + rtt_queue.push(rtt_request); +} \ No newline at end of file diff --git a/chaos/common/direct_io/DirectIOPerformanceLoop.h b/chaos/common/direct_io/DirectIOPerformanceSession.h similarity index 54% rename from chaos/common/direct_io/DirectIOPerformanceLoop.h rename to chaos/common/direct_io/DirectIOPerformanceSession.h index 9eee7e25d..f1b909030 100644 --- a/chaos/common/direct_io/DirectIOPerformanceLoop.h +++ b/chaos/common/direct_io/DirectIOPerformanceSession.h @@ -1,5 +1,5 @@ /* - * DirectIOPerformanceLoop.h + * DirectIOPerformanceSession.h * !CHOAS * Created by Bisegni Claudio. * @@ -18,16 +18,22 @@ * limitations under the License. */ -#ifndef __CHAOSFramework__DirectIOPerformanceLoop__ -#define __CHAOSFramework__DirectIOPerformanceLoop__ +#ifndef __CHAOSFramework__DirectIOPerformanceSession__ +#define __CHAOSFramework__DirectIOPerformanceSession__ #include <chaos/common/thread/WaitSemaphore.h> #include <chaos/common/utility/InizializableService.h> #include <chaos/common/direct_io/DirectIOClientConnection.h> #include <chaos/common/direct_io/channel/DirectIOPerformanceClientChannel.h> #include <chaos/common/direct_io/channel/DirectIOPerformanceServerChannel.h> + +#include <boost/lockfree/queue.hpp> + namespace chaos { namespace common{ + namespace message { + class PerformanceNodeChannel; + } namespace network{ //forward declaration class PerformanceManagment; @@ -38,15 +44,31 @@ namespace chaos { class DirectIOClient; class DirectIOServerEndpoint; + typedef boost::lockfree::queue< channel::opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr, boost::lockfree::fixed_sized<false> > RttResultQueue; + + template<typename T> + class ResultFetcher { + boost::lockfree::queue< T, boost::lockfree::fixed_sized<false> > *result_queue_ptr; + public: + ResultFetcher(boost::lockfree::queue< T, boost::lockfree::fixed_sized<false> > *_result_queue_ptr):result_queue_ptr(_result_queue_ptr) {} + + bool getNext(T& result) { + return result_queue_ptr->pop(result); + } + }; + + typedef ResultFetcher<channel::opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr> RttResultFetcher; + //! Direct IO performance loop connection /*! this class permit to realize a direct io loop to make some bench test between two node using direct io system. */ - class DirectIOPerformanceLoop : - public chaos::utility::InizializableService, + class DirectIOPerformanceSession : + protected chaos::utility::InizializableService, protected channel::DirectIOPerformanceServerChannel::DirectIOPerformanceServerChannelHandler { friend class chaos::common::network::PerformanceManagment; + friend class chaos::common::message::PerformanceNodeChannel; std::string server_description; @@ -56,14 +78,16 @@ namespace chaos { channel::DirectIOPerformanceClientChannel *client_channel; channel::DirectIOPerformanceServerChannel *server_channel; - chaos::WaitSemaphore rtt_request_wait_sema; - channel::opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr rtt_answer; + //chaos::WaitSemaphore rtt_request_wait_sema; + RttResultQueue rtt_queue; + //channel::opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr rtt_answer; protected: - void handleRoundTripRequest(channel::opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr rtt_request); + void handleReqRoundTripRequest(channel::opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr rtt_request); + void handleRespRoundTripRequest(channel::opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr rtt_request); public: - DirectIOPerformanceLoop(DirectIOClientConnection *_client_connection, DirectIOServerEndpoint *_server_endpoint); - ~DirectIOPerformanceLoop(); + DirectIOPerformanceSession(DirectIOClientConnection *_client_connection, DirectIOServerEndpoint *_server_endpoint); + ~DirectIOPerformanceSession(); void setConnectionHandler(DirectIOClientConnectionEventHandler *event_handler); // Initialize instance void init(void *init_data) throw(chaos::CException); @@ -71,10 +95,12 @@ namespace chaos { // Deinit the implementation void deinit() throw(chaos::CException); - channel::opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr rttTest(uint32_t ms_timeout = 1000); + int64_t sendRttTest(uint32_t ms_timeout = 1000); + + RttResultFetcher *getRttResultQueue(); }; } } } -#endif /* defined(__CHAOSFramework__DirectIOPerformanceLoop__) */ +#endif /* defined(__CHAOSFramework__DirectIOPerformanceSession__) */ diff --git a/chaos/common/direct_io/DirectIOServer.cpp b/chaos/common/direct_io/DirectIOServer.cpp index f945e96bf..ce368a0d1 100644 --- a/chaos/common/direct_io/DirectIOServer.cpp +++ b/chaos/common/direct_io/DirectIOServer.cpp @@ -18,6 +18,9 @@ * limitations under the License. */ #include <chaos/common/direct_io/DirectIOServer.h> +#include <chaos/common/configuration/GlobalConfiguration.h> + +#include <boost/format.hpp> using namespace chaos::common::direct_io; @@ -70,4 +73,8 @@ uint32_t DirectIOServer::getPriorityPort() { uint32_t DirectIOServer::getServicePort() { return service_port; +} + +std::string DirectIOServer::getUrl() { + return boost::str( boost::format("%1%:%2%:%3%") % chaos::GlobalConfiguration::getInstance()->getLocalServerAddress() % priority_port % service_port); } \ No newline at end of file diff --git a/chaos/common/direct_io/DirectIOServer.h b/chaos/common/direct_io/DirectIOServer.h index 2c7a584c2..301ff4c8a 100644 --- a/chaos/common/direct_io/DirectIOServer.h +++ b/chaos/common/direct_io/DirectIOServer.h @@ -73,6 +73,8 @@ namespace chaos { uint32_t getPriorityPort(); uint32_t getServicePort(); + + std::string getUrl(); }; diff --git a/chaos/common/direct_io/DirectIOServerEndpoint.cpp b/chaos/common/direct_io/DirectIOServerEndpoint.cpp index d5205ed0d..e4208527d 100644 --- a/chaos/common/direct_io/DirectIOServerEndpoint.cpp +++ b/chaos/common/direct_io/DirectIOServerEndpoint.cpp @@ -19,6 +19,7 @@ */ #include <chaos/common/utility/ObjectFactoryRegister.h> #include <chaos/common/direct_io/DirectIOServerEndpoint.h> +#include <boost/format.hpp> using namespace chaos::common::direct_io; @@ -52,6 +53,12 @@ DirectIOServerPublicInterface * DirectIOServerEndpoint::getPublicServerInterface return server_public_interface; } +std::string DirectIOServerEndpoint::getUrl() { + if(!server_public_interface) return std::string(""); + //return the url concatenating he server url with the endpoint divided by pipe chacracter + return boost::str( boost::format("%1%|%2%") % server_public_interface->getUrl() % endpoint_route_index); +} + //! Add a new channel instantiator channel::DirectIOVirtualServerChannel *DirectIOServerEndpoint::registerChannelInstance(channel::DirectIOVirtualServerChannel *channel_instance) { if(!channel_instance) return NULL; diff --git a/chaos/common/direct_io/DirectIOServerEndpoint.h b/chaos/common/direct_io/DirectIOServerEndpoint.h index 234b1f847..fb5d4d72c 100644 --- a/chaos/common/direct_io/DirectIOServerEndpoint.h +++ b/chaos/common/direct_io/DirectIOServerEndpoint.h @@ -86,6 +86,8 @@ namespace chaos { Allocate a new channel and initialize it */ void releaseChannelInstance(channel::DirectIOVirtualServerChannel *channel_instance) throw (CException); + + std::string getUrl(); }; } diff --git a/chaos/common/direct_io/DirectIOServerPublicInterface.h b/chaos/common/direct_io/DirectIOServerPublicInterface.h index 1158722e1..096de7dd8 100644 --- a/chaos/common/direct_io/DirectIOServerPublicInterface.h +++ b/chaos/common/direct_io/DirectIOServerPublicInterface.h @@ -32,6 +32,7 @@ namespace chaos { public: virtual uint32_t getPriorityPort() = 0; virtual uint32_t getServicePort() = 0; + virtual std::string getUrl() = 0; }; } diff --git a/chaos/common/direct_io/channel/DirectIODeviceChannelGlobal.h b/chaos/common/direct_io/channel/DirectIODeviceChannelGlobal.h index a4543e6b1..0ac127c50 100644 --- a/chaos/common/direct_io/channel/DirectIODeviceChannelGlobal.h +++ b/chaos/common/direct_io/channel/DirectIODeviceChannelGlobal.h @@ -36,7 +36,8 @@ namespace chaos { Opcode used by the DirectIO device channel */ typedef enum PerformanceChannelOpcode { - PerformanceChannelOpcodeRoundTrip = 1, /**< manage the roundtrip test */ + PerformanceChannelOpcodeReqRoundTrip = 1, /**< forwarda a start of a roundtrip test */ + PerformanceChannelOpcodeRespRoundTrip = 2, /**< perform an answer to the roundtrip test */ } PerformanceChannelOpcode; } diff --git a/chaos/common/direct_io/channel/DirectIOPerformanceClientChannel.cpp b/chaos/common/direct_io/channel/DirectIOPerformanceClientChannel.cpp index 2bfd5c095..f91f5a1e7 100644 --- a/chaos/common/direct_io/channel/DirectIOPerformanceClientChannel.cpp +++ b/chaos/common/direct_io/channel/DirectIOPerformanceClientChannel.cpp @@ -40,21 +40,62 @@ int64_t DirectIOPerformanceClientChannel::sendRoundTripMessage() { std::memset(data_pack, 0, sizeof(DirectIODataPack)); //set opcode - data_pack->header.dispatcher_header.fields.channel_opcode = static_cast<uint8_t>(opcode::PerformanceChannelOpcodeRoundTrip); + data_pack->header.dispatcher_header.fields.channel_opcode = static_cast<uint8_t>(opcode::PerformanceChannelOpcodeReqRoundTrip); opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr header = new opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTrip(); boost::posix_time::ptime time = boost::posix_time::microsec_clock::local_time(); boost::posix_time::time_duration duration( time.time_of_day() ); - header->field.start_rt_ts = duration.total_microseconds(); + header->field.start_rt_ts = TO_LITTE_ENDNS_NUM(uint64_t, duration.total_microseconds()); //rt_opcode_header->fields. //set the header - //DIRECT_IO_SET_CHANNEL_HEADER(data_pack, put_opcode_header, (uint32_t)PUT_HEADER_LEN) + DIRECT_IO_SET_CHANNEL_HEADER(data_pack, header, (uint32_t)sizeof(opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTrip)) //send pack - return client_instance->sendPriorityData(this, completeDataPack(data_pack)); + return client_instance->sendServiceData(this, completeDataPack(data_pack)); } +int64_t DirectIOPerformanceClientChannel::answerRoundTripMessage(uint64_t received_ts) { + DirectIODataPack *data_pack = new DirectIODataPack(); + std::memset(data_pack, 0, sizeof(DirectIODataPack)); + + //set opcode + data_pack->header.dispatcher_header.fields.channel_opcode = static_cast<uint8_t>(opcode::PerformanceChannelOpcodeRespRoundTrip); + opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr header = new opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTrip(); + + boost::posix_time::ptime time = boost::posix_time::microsec_clock::local_time(); + boost::posix_time::time_duration duration( time.time_of_day() ); + + header->field.start_rt_ts = received_ts; + header->field.receiver_rt_ts = TO_LITTE_ENDNS_NUM(uint64_t, duration.total_microseconds()); + //rt_opcode_header->fields. + //set the header + DIRECT_IO_SET_CHANNEL_HEADER(data_pack, header, (uint32_t)sizeof(opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTrip)) + + //send pack + return client_instance->sendServiceData(this, completeDataPack(data_pack)); + +} + +int64_t DirectIOPerformanceClientChannel::answerRoundTripMessage(opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr received_header) { + DirectIODataPack *data_pack = new DirectIODataPack(); + std::memset(data_pack, 0, sizeof(DirectIODataPack)); + + //set opcode + data_pack->header.dispatcher_header.fields.channel_opcode = static_cast<uint8_t>(opcode::PerformanceChannelOpcodeRespRoundTrip); + + boost::posix_time::ptime time = boost::posix_time::microsec_clock::local_time(); + boost::posix_time::time_duration duration( time.time_of_day() ); + + received_header->field.receiver_rt_ts =TO_LITTE_ENDNS_NUM(uint64_t, duration.total_microseconds()); + //rt_opcode_header->fields. + //set the header + DIRECT_IO_SET_CHANNEL_HEADER(data_pack, received_header, (uint32_t)sizeof(opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTrip)) + + //send pack + return client_instance->sendServiceData(this, completeDataPack(data_pack)); + +} void DirectIOPerformanceClientChannel::freeSentData(void *data, DisposeSentMemoryInfo& dispose_memory_info) { //free all received data free(data); diff --git a/chaos/common/direct_io/channel/DirectIOPerformanceClientChannel.h b/chaos/common/direct_io/channel/DirectIOPerformanceClientChannel.h index 5ea78d780..cebade6b8 100644 --- a/chaos/common/direct_io/channel/DirectIOPerformanceClientChannel.h +++ b/chaos/common/direct_io/channel/DirectIOPerformanceClientChannel.h @@ -42,6 +42,8 @@ namespace chaos { void freeSentData(void *data, DisposeSentMemoryInfo& dispose_memory_info); public: int64_t sendRoundTripMessage(); + int64_t answerRoundTripMessage(uint64_t received_ts); + int64_t answerRoundTripMessage(opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr received_header); }; } } diff --git a/chaos/common/direct_io/channel/DirectIOPerformanceServerChannel.cpp b/chaos/common/direct_io/channel/DirectIOPerformanceServerChannel.cpp index 647d60051..b3957afdd 100644 --- a/chaos/common/direct_io/channel/DirectIOPerformanceServerChannel.cpp +++ b/chaos/common/direct_io/channel/DirectIOPerformanceServerChannel.cpp @@ -50,14 +50,19 @@ void DirectIOPerformanceServerChannel::consumeDataPack(DirectIODataPack *dataPac opcode::PerformanceChannelOpcode channel_opcode = static_cast<opcode::PerformanceChannelOpcode>(dataPack->header.dispatcher_header.fields.channel_opcode); switch (channel_opcode) { - case opcode::PerformanceChannelOpcodeRoundTrip: { + case opcode::PerformanceChannelOpcodeReqRoundTrip: { opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr header = reinterpret_cast< opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr >(dataPack->channel_header_data); //reallign the pointer to the start of the key header->field.start_rt_ts = FROM_LITTLE_ENDNS_NUM(uint64_t, header->field.start_rt_ts); - handler->handleRoundTripRequest(header); + handler->handleReqRoundTripRequest(header); break; } - default: + + case opcode::PerformanceChannelOpcodeRespRoundTrip: + opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr header = reinterpret_cast< opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr >(dataPack->channel_header_data); + //reallign the pointer to the start of the key + header->field.start_rt_ts = FROM_LITTLE_ENDNS_NUM(uint64_t, header->field.start_rt_ts); + handler->handleRespRoundTripRequest(header); break; } diff --git a/chaos/common/direct_io/channel/DirectIOPerformanceServerChannel.h b/chaos/common/direct_io/channel/DirectIOPerformanceServerChannel.h index 8df606413..b07e342e4 100644 --- a/chaos/common/direct_io/channel/DirectIOPerformanceServerChannel.h +++ b/chaos/common/direct_io/channel/DirectIOPerformanceServerChannel.h @@ -44,7 +44,11 @@ namespace chaos { //! Device handler definition class DirectIOPerformanceServerChannelHandler { public: - virtual void handleRoundTripRequest(opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr rtt_request) = 0; + //! handle a request to perform a roundtrip test + virtual void handleReqRoundTripRequest(opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr rtt_request) = 0; + + //! andle the response to an round trip test request + virtual void handleRespRoundTripRequest(opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr rtt_request) = 0; }; diff --git a/chaos/common/direct_io/impl/ZMQDirectIOClient.cpp b/chaos/common/direct_io/impl/ZMQDirectIOClient.cpp index 65e85003c..c510733c5 100644 --- a/chaos/common/direct_io/impl/ZMQDirectIOClient.cpp +++ b/chaos/common/direct_io/impl/ZMQDirectIOClient.cpp @@ -97,7 +97,7 @@ void *ZMQDirectIOClient::socketMonitor (void *ctx, const char * address, Connect forwardEventToClientConnection(connection , DirectIOClientConnectionStateType::DirectIOClientConnectionEventConnected); break; case ZMQ_EVENT_CONNECT_DELAYED: - forwardEventToClientConnection(connection , DirectIOClientConnectionStateType::DirectIOClientConnectionEventConnected); + //forwardEventToClientConnection(connection , DirectIOClientConnectionStateType::DirectIOClientConnectionEventConnected); DEBUG_CODE(ZMQDIOLDBG_ << "ZMQ_EVENT_CONNECT_DELAYED " << connection->getServerDescription();) break; case ZMQ_EVENT_CONNECT_RETRIED: @@ -170,8 +170,8 @@ DirectIOClientConnection *ZMQDirectIOClient::getNewConnection(std::string server int err = 0; const int output_buffer_dim = 1; const int linger_period = 0; - const int min_reconnection_ivl = 500; - const int max_reconnection_ivl = 5000; + const int min_reconnection_ivl = 100; + const int max_reconnection_ivl = 500; void *socket_priority = NULL; void *socket_service = NULL; diff --git a/chaos/common/direct_io/impl/ZMQDirectIOClientConnection.cpp b/chaos/common/direct_io/impl/ZMQDirectIOClientConnection.cpp index c8d26fec7..5e52e0fe6 100644 --- a/chaos/common/direct_io/impl/ZMQDirectIOClientConnection.cpp +++ b/chaos/common/direct_io/impl/ZMQDirectIOClientConnection.cpp @@ -68,6 +68,7 @@ int64_t ZMQDirectIOClientConnection::writeToSocket(channel::DirectIOVirtualClien case DIRECT_IO_CHANNEL_PART_HEADER_ONLY: err = zmq_send(socket, &data_pack->header, DIRECT_IO_HEADER_SIZE, _send_more_no_wait_flag); if(err == -1) { + delete (data_pack); return err; } err = zmq_msg_init_data (&msg_header_data, @@ -76,11 +77,12 @@ int64_t ZMQDirectIOClientConnection::writeToSocket(channel::DirectIOVirtualClien DirectIOClientConnection::freeSentData, new channel::DisposeSentMemoryInfo(channel, 1, sending_opcode)); err = zmq_sendmsg(socket, &msg_header_data, _send_no_wait_flag); - err = zmq_msg_close(&msg_header_data); + zmq_msg_close(&msg_header_data); break; case DIRECT_IO_CHANNEL_PART_DATA_ONLY: err = zmq_send(socket, &data_pack->header, DIRECT_IO_HEADER_SIZE, _send_more_no_wait_flag); if(err == -1) { + delete (data_pack); return err; } err = zmq_msg_init_data (&msg_data, @@ -89,11 +91,12 @@ int64_t ZMQDirectIOClientConnection::writeToSocket(channel::DirectIOVirtualClien DirectIOClientConnection::freeSentData, new channel::DisposeSentMemoryInfo(channel, 2, sending_opcode)); err = zmq_sendmsg(socket, &msg_data, _send_no_wait_flag); - err = zmq_msg_close(&msg_data); + zmq_msg_close(&msg_data); break; case DIRECT_IO_CHANNEL_PART_HEADER_DATA: err = zmq_send(socket, &data_pack->header, DIRECT_IO_HEADER_SIZE, _send_more_no_wait_flag); if(err == -1) { + delete (data_pack); return err; } err = zmq_msg_init_data (&msg_header_data, @@ -103,6 +106,7 @@ int64_t ZMQDirectIOClientConnection::writeToSocket(channel::DirectIOVirtualClien new channel::DisposeSentMemoryInfo(channel, 1, sending_opcode)); err = zmq_sendmsg(socket, &msg_header_data, _send_more_no_wait_flag); if(err == -1) { + delete (data_pack); zmq_msg_close(&msg_header_data); return err; } @@ -113,8 +117,8 @@ int64_t ZMQDirectIOClientConnection::writeToSocket(channel::DirectIOVirtualClien new channel::DisposeSentMemoryInfo(channel, 2, sending_opcode)); err = zmq_sendmsg(socket, &msg_data, _send_no_wait_flag); - err = zmq_msg_close(&msg_header_data); - err = zmq_msg_close(&msg_data); + zmq_msg_close(&msg_header_data); + zmq_msg_close(&msg_data); break; } delete (data_pack); diff --git a/chaos/common/direct_io/impl/ZMQDirectIOClientConnection.h b/chaos/common/direct_io/impl/ZMQDirectIOClientConnection.h index 09ecd5ac9..a59e68ad7 100644 --- a/chaos/common/direct_io/impl/ZMQDirectIOClientConnection.h +++ b/chaos/common/direct_io/impl/ZMQDirectIOClientConnection.h @@ -50,8 +50,8 @@ namespace chaos { void *monitor_socket; } ConnectionMonitorInfo; - const int _send_more_no_wait_flag = ZMQ_SNDMORE|ZMQ_DONTWAIT; - const int _send_no_wait_flag = ZMQ_DONTWAIT; + const int _send_more_no_wait_flag = ZMQ_SNDMORE; + const int _send_no_wait_flag = 0; /*! Class that represetn th eimplementation of one connection of the direct io connection implemented with zmq diff --git a/chaos/common/io/IODirectIODriver.cpp b/chaos/common/io/IODirectIODriver.cpp index 6a7b01777..7a6339427 100644 --- a/chaos/common/io/IODirectIODriver.cpp +++ b/chaos/common/io/IODirectIODriver.cpp @@ -175,6 +175,8 @@ namespace chaos{ //free the packet next_client->device_client_channel->storeAndCacheDataOutputChannel((void*)serialization->getBufferPtr(), (uint32_t)serialization->getBufferLen()); return; + } else { + DEBUG_CODE(IODirectIODriver_DLDBG_ << "No available socket->loose packet"); } delete(serialization); } diff --git a/chaos/common/message/DeviceMessageChannel.cpp b/chaos/common/message/DeviceMessageChannel.cpp index a1f89c53a..d9e292ce0 100644 --- a/chaos/common/message/DeviceMessageChannel.cpp +++ b/chaos/common/message/DeviceMessageChannel.cpp @@ -23,19 +23,7 @@ using namespace chaos; using namespace chaos::common::data; -/*! \name Check Request Result Macro - These macros are used to check the result of a syncronous request operation. - */ -/*! \{ */ -/*! Check for delay error or application error */ -#define CHECK_TIMEOUT_AND_RESULT_CODE(x,e) \ -if(!x.get()) {\ -e = ErrorCode::EC_TIMEOUT;\ -} else if(x->hasKey(RpcActionDefinitionKey::CS_CMDM_ACTION_SUBMISSION_ERROR_CODE)) {\ -e = x->getInt32Value(RpcActionDefinitionKey::CS_CMDM_ACTION_SUBMISSION_ERROR_CODE);\ -} -/*! \} */ //------------------------------------ DeviceMessageChannel::DeviceMessageChannel(NetworkBroker *msgBroker, CDeviceNetworkAddress *_deviceNetworkAddress) : NodeMessageChannel(msgBroker, _deviceNetworkAddress){ diff --git a/chaos/common/message/MessageChannel.h b/chaos/common/message/MessageChannel.h index ab2c1514d..1e98cf36d 100644 --- a/chaos/common/message/MessageChannel.h +++ b/chaos/common/message/MessageChannel.h @@ -1,8 +1,8 @@ -/* +/* * MessageChannel.h * !CHOAS * Created by Bisegni Claudio. - * + * * Copyright 2012 INFN, National Institute of Nuclear Physics * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -27,24 +27,35 @@ #include <chaos/common/exception/CException.h> #include <chaos/common/thread/MultiKeyObjectWaitSemaphore.h> #include <chaos/common/network/CNodeNetworkAddress.h> - +#include <chaos/common/network/NetworkBroker.h> #include <boost/function.hpp> //#include <boost/thread/condition.hpp> #include <boost/thread.hpp> #include <map> namespace chaos { - - class NetworkBroker; + /*! \name Check Request Result Macro + These macros are used to check the result of a syncronous request operation. + */ + /*! \{ */ + + /*! Check for delay error or application error */ +#define CHECK_TIMEOUT_AND_RESULT_CODE(x,e) \ +if(!x.get()) {\ +e = ErrorCode::EC_TIMEOUT;\ +} else if(x->hasKey(RpcActionDefinitionKey::CS_CMDM_ACTION_SUBMISSION_ERROR_CODE)) {\ +e = x->getInt32Value(RpcActionDefinitionKey::CS_CMDM_ACTION_SUBMISSION_ERROR_CODE);\ +} + /*! \} */ typedef void (*MessageHandler)(atomic_int_type, common::data::CDataWrapper*); - + //typedef boost::function<void(atomic_int_type, common::data::CDataWrapper*)> MessageHandler; - //! MessageChannel - /*! + //! MessageChannel + /*! This is the basic channel for comunicate with other chaos rpc server. It can be instantiated only by NetworkBroker. It can send a message or a request. Message is the async forward data to domain/action hosted on remote rpc server and no answer is waited. Request is two pahse message, first a message(the reqeust) is sent to domain/action on remote rpc server, then an aswer is waited this step can @@ -54,28 +65,26 @@ namespace chaos { */ class MessageChannel : public DeclareAction { friend class NetworkBroker; - //! Message broker associated with the channel instance - NetworkBroker *broker; - - //! atomic int for request id + + //! atomic int for request id atomic_int_type channelRequestIDCounter; - //! remote host where send the message and request + //! remote host where send the message and request string remoteNodeAddress; - //! channel id (action domain) + //! channel id (action domain) string channelID; - //!multi key semaphore for manage the return of the action and result association to the reqeust id + //!multi key semaphore for manage the return of the action and result association to the reqeust id MultiKeyObjectWaitSemaphore<atomic_int_type,common::data::CDataWrapper*> sem; - //! Mutex for managing the maps manipulation + //! Mutex for managing the maps manipulation boost::shared_mutex mutext_answer_managment; - //!map to async request and handler + //!map to async request and handler MessageHandler response_handler; - //!map to sync request and result + //!map to sync request and result map<atomic_int_type, common::data::CDataWrapper* > responseIdSyncMap; /*! @@ -88,7 +97,7 @@ namespace chaos { */ virtual void deinit() throw(CException); - /*! + /*! \brief called when a response to a request is received, it will manage thesearch of hanlder specified for request id request */ @@ -100,8 +109,11 @@ namespace chaos { \return the unique request id */ atomic_int_type prepareRequestPackAndSend(bool, const char * const, const char * const, common::data::CDataWrapper*); - + protected: + //! Message broker associated with the channel instance + NetworkBroker *broker; + /*! Private constructor called by NetworkBroker */ @@ -116,7 +128,7 @@ namespace chaos { Private destructor called by NetworkBroker */ virtual ~MessageChannel(); - + /*! Update the address of the channel */ @@ -132,14 +144,14 @@ namespace chaos { } public: - - /*! + + /*! \brief send a message \param messagePack the data to send, the pointer is not deallocated and i scopied into the pack */ void sendMessage(const char * const, const char * const, common::data::CDataWrapper* const); - /*! + /*! \brief Set the handler for manage the rpc answer \param async_handler the handler to be used */ @@ -162,11 +174,11 @@ namespace chaos { \param actionName name of the actio to call \param requestPack the data to send, the pointer is not deallocated and i scopied into the pack \param millisecToWait waith the response for onli these number of millisec then return - \return the answer of the request, a null value mean that the wait time is expired + \return the answer of the request, a null value mean that the wait time is expired */ common::data::CDataWrapper* sendRequest(const char * const, const char * const, common::data::CDataWrapper* const, uint32_t millisecToWait=0, bool async = false); - + }; } diff --git a/chaos/common/message/PerformanceNodeChannel.cpp b/chaos/common/message/PerformanceNodeChannel.cpp new file mode 100644 index 000000000..b70b6d553 --- /dev/null +++ b/chaos/common/message/PerformanceNodeChannel.cpp @@ -0,0 +1,124 @@ +/* + * PerformanceNodeChannel.cpp + * !CHOAS + * Created by Bisegni Claudio. + * + * Copyright 2012 INFN, National Institute of Nuclear Physics + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <chaos/common/message/PerformanceNodeChannel.h> +using namespace chaos::common::message; +namespace chaos_data = chaos::common::data; +namespace chaos_direct_io = chaos::common::direct_io; + +//! base constructor +PerformanceNodeChannel::PerformanceNodeChannel(NetworkBroker *msg_broker, CNetworkAddress *node_network_address, chaos_direct_io::DirectIOClient *_client_instance): +MessageChannel(msg_broker), client_instance(_client_instance) { + setRemoteNodeAddress(node_network_address->ipPort); +} + +PerformanceNodeChannel::~PerformanceNodeChannel() { + releasePerformanceSession(local_performance_session); +} + +//Get the performance session for a chaos node +int PerformanceNodeChannel::getPerformanceSession(chaos_direct_io::DirectIOPerformanceSession **performance_session_handler, + uint32_t ms_timeout) { + int err = chaos::ErrorCode::EC_NO_ERROR; + if(!performance_session_handler) return -100; + + //get the local endpoint + chaos_direct_io::DirectIOServerEndpoint *local_session_endpoint = broker->getDirectIOServerEndpoint(); + if(!local_session_endpoint) return -101; + + std::string remote_endpoint_url; + chaos_data::CDataWrapper init_performance_session_param; + + //set the init parameter + init_performance_session_param.addStringValue(PerformanceSystemRpcKey::KEY_REQUEST_SERVER_DESCRITPION, + local_session_endpoint->getUrl()); + + //sent the request and waith the ansewer for startp local session + auto_ptr<chaos_data::CDataWrapper> init_session_result(MessageChannel::sendRequest(PerformanceSystemRpcKey::SYSTEM_PERFORMANCE_DOMAIN, + PerformanceSystemRpcKey::ACTION_PERFORMANCE_INIT_SESSION, + &init_performance_session_param, + ms_timeout)); + CHECK_TIMEOUT_AND_RESULT_CODE(init_session_result, err) + if(err == ErrorCode::EC_NO_ERROR) { + auto_ptr<chaos_data::CDataWrapper> info_pack(init_session_result->getCSDataValue(RpcActionDefinitionKey::CS_CMDM_ACTION_MESSAGE)); + if(info_pack.get() && info_pack->hasKey(PerformanceSystemRpcKey::KEY_REQUEST_SERVER_DESCRITPION)){ + + remote_endpoint_url = info_pack->getStringValue(PerformanceSystemRpcKey::KEY_REQUEST_SERVER_DESCRITPION); + chaos_direct_io::DirectIOClientConnection *local_session_client_connection = client_instance->getNewConnection(remote_endpoint_url); + if(!local_session_client_connection) { + //i need to release the enpoint + broker->releaseDirectIOServerEndpoint(local_session_endpoint); + } + + // i can create session + local_performance_session = *performance_session_handler = new chaos_direct_io::DirectIOPerformanceSession(local_session_client_connection, local_session_endpoint); + if(!*performance_session_handler) { + client_instance->releaseConnection(local_session_client_connection); + //i need to release the enpoint + broker->releaseDirectIOServerEndpoint(local_session_endpoint); + + err = -103; + } else { + try { + chaos::utility::InizializableService::initImplementation(*performance_session_handler, NULL, "DirectIOPerformanceSession", __PRETTY_FUNCTION__); + } catch(chaos::CException ex) { + chaos::utility::InizializableService::deinitImplementation(*performance_session_handler, "DirectIOPerformanceSession", __PRETTY_FUNCTION__); + err = -104; + } + } + } + } else { + //i need to release the enpoint + broker->releaseDirectIOServerEndpoint(local_session_endpoint); + err = -102; + } + return err; +} +//! release the performance session +int PerformanceNodeChannel::releasePerformanceSession(chaos_direct_io::DirectIOPerformanceSession *performance_session, + uint32_t ms_timeout) { + chaos_data::CDataWrapper init_performance_session_param; + if(!performance_session) return -1; + if(local_performance_session != performance_session) return -1; + + try{ + //set the init parameter + init_performance_session_param.addStringValue(PerformanceSystemRpcKey::KEY_REQUEST_SERVER_DESCRITPION, + performance_session->server_endpoint->getUrl()); + + //sent the request and waith the ansewer for startp local session + auto_ptr<chaos_data::CDataWrapper> init_session_result(MessageChannel::sendRequest(PerformanceSystemRpcKey::SYSTEM_PERFORMANCE_DOMAIN, + PerformanceSystemRpcKey::ACTION_PERFORMANCE_CLOSE_SESSION, + &init_performance_session_param, + ms_timeout)); + + + chaos::utility::InizializableService::deinitImplementation(performance_session, "DirectIOPerformanceSession", __PRETTY_FUNCTION__); + if(performance_session->client_connection) client_instance->releaseConnection(performance_session->client_connection); + //i need to release the enpoint + if(performance_session->server_endpoint) broker->releaseDirectIOServerEndpoint(performance_session->server_endpoint); + + } catch(chaos::CException ex) { + return -100; + } + delete(local_performance_session); + local_performance_session = NULL; + return 0; +} \ No newline at end of file diff --git a/chaos/common/message/PerformanceNodeChannel.h b/chaos/common/message/PerformanceNodeChannel.h new file mode 100644 index 000000000..c7b23b1e4 --- /dev/null +++ b/chaos/common/message/PerformanceNodeChannel.h @@ -0,0 +1,66 @@ +/* + * PerformanceNodeChannel.h + * !CHOAS + * Created by Bisegni Claudio. + * + * Copyright 2012 INFN, National Institute of Nuclear Physics + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __CHAOSFramework__PerformanceNodeChannel__ +#define __CHAOSFramework__PerformanceNodeChannel__ +#include <string> +#include <chaos/common/message/NodeMessageChannel.h> +#include <chaos/common/network/CNodeNetworkAddress.h> +#include <chaos/common/direct_io/DirectIOPerformanceSession.h> +namespace chaos_direct_io = chaos::common::direct_io; + +namespace chaos { + namespace common { + namespace direct_io { + class DirectIOClient; + } + namespace message { + //! Message Channel specialize for metadataserver comunication + /*! + This class represent a message chanel for comunication with a device + */ + class PerformanceNodeChannel : public MessageChannel { + friend class chaos::NetworkBroker; + chaos_directio::DirectIOClient *client_instance; + chaos_direct_io::DirectIOPerformanceSession *local_performance_session; + protected: + //! base constructor + /*! + The constructor create a channel for comunicate with the device that is contained in a Contro Unit, so the full network address is + ip:port:cu_node_address(instance):deviceID + */ + PerformanceNodeChannel(NetworkBroker *msg_broker, CNetworkAddress *node_network_address, chaos_direct_io::DirectIOClient *_client_instance); + ~PerformanceNodeChannel(); + public: + //Get the performance session for a chaos node + /*! + Get the performance object that permit to check the performance between the requester and the + end point node identified by CNodeNetworkAddress + */ + int getPerformanceSession(chaos_direct_io::DirectIOPerformanceSession **performance_session_handler, uint32_t ms_timeout = 1000); + + //! release the performance session + int releasePerformanceSession(chaos_direct_io::DirectIOPerformanceSession *performance_session, uint32_t ms_timeout = 1000); + + }; + } + } +} + +#endif /* defined(__CHAOSFramework__PerformanceNodeChannel__) */ diff --git a/chaos/common/network/NetworkBroker.cpp b/chaos/common/network/NetworkBroker.cpp index d0c62f5c1..ddde949c0 100644 --- a/chaos/common/network/NetworkBroker.cpp +++ b/chaos/common/network/NetworkBroker.cpp @@ -24,6 +24,7 @@ #include <chaos/common/message/DeviceMessageChannel.h> #include <chaos/common/message/MDSMessageChannel.h> #include <chaos/common/message/MessageChannel.h> +#include <chaos/common/message/PerformanceNodeChannel.h> #include <chaos/common/event/EventServer.h> #include <chaos/common/event/EventClient.h> #include <chaos/common/dispatcher/AbstractCommandDispatcher.h> @@ -43,7 +44,7 @@ using namespace chaos::common::data; /*! */ -NetworkBroker::NetworkBroker(){ +NetworkBroker::NetworkBroker():performance_session_managment(this){ eventClient = NULL; eventServer = NULL; rpcServer = NULL; @@ -69,12 +70,7 @@ NetworkBroker::~NetworkBroker() { * for the rpc client and server and for the dispatcher. All these are here initialized */ void NetworkBroker::init(void *initData) throw(CException) { - //check if initialized - SetupStateManager::levelUpFrom(INIT_STEP, "NetworkBroker already initialized"); - - - - MB_LAPP << "Init pahse"; + MB_LAPP << "Init phase"; //get global configuration reference CDataWrapper *globalConfiguration = GlobalConfiguration::getInstance()->getConfiguration(); @@ -172,18 +168,14 @@ void NetworkBroker::init(void *initData) throw(CException) { throw CException(4, "No RPC Adapter type found in configuration", "NetworkBroker::init"); } //---------------------------- R P C ---------------------------- - MB_LAPP << "Message Broker Initialized"; + MB_LAPP << "Initialize performance session manager"; + utility::StartableService::initImplementation(performance_session_managment, static_cast<void*>(globalConfiguration), "PerformanceManagment", __PRETTY_FUNCTION__); } /*! * All rpc adapter and command siaptcer are deinitilized */ void NetworkBroker::deinit() throw(CException) { - - //lock esclusive access to init phase - SetupStateManager::levelDownFrom(DEINIT_STEP, "NetworkBroker already deinitialized"); - - MB_LAPP << "Deinitilizing Network Broker"; //---------------------------- D I R E C T I/O ---------------------------- MB_LAPP << "Deinit DirectIO server: " << directIOServer->getName(); @@ -247,6 +239,9 @@ void NetworkBroker::deinit() throw(CException) { utility::StartableService::deinitImplementation(commandDispatcher, "DefaultCommandDispatcher", "NetworkBroker::deinit"); //---------------------------- R P C ---------------------------- + MB_LAPP << "Deinitialize performance session manager"; + utility::StartableService::deinitImplementation(performance_session_managment, "PerformanceManagment", __PRETTY_FUNCTION__); + } /*! @@ -277,12 +272,17 @@ void NetworkBroker::start() throw(CException){ MB_LAPP << "get the published host and port from rpc server"; getPublishedHostAndPort(publishedHostAndPort); MB_LAPP << "Rpc server has been published in: " << publishedHostAndPort; + + MB_LAPP << "Start performance session manager"; + utility::StartableService::startImplementation(performance_session_managment, "PerformanceManagment", __PRETTY_FUNCTION__); + } /*! * all part are started */ void NetworkBroker::stop() throw(CException) { + MB_LAPP << "Stop rpc server: " << rpcClient->getName(); utility::StartableService::stopImplementation(rpcClient, rpcClient->getName(), "NetworkBroker::stop"); @@ -303,6 +303,10 @@ void NetworkBroker::stop() throw(CException) { MB_LAPP << "Stop DirectIO server: " << directIOServer->getName(); utility::StartableService::stopImplementation(directIOServer, directIOServer->getName(), "NetworkBroker::Stop"); + + MB_LAPP << "Stop performance session manager"; + utility::StartableService::stopImplementation(performance_session_managment, "PerformanceManagment", __PRETTY_FUNCTION__); + } /*! @@ -476,7 +480,7 @@ bool NetworkBroker::submiteRequest(string& serverAndPort, CDataWrapper *request /* */ -MessageChannel *NetworkBroker::getNewMessageChannelForRemoteHost(CNodeNetworkAddress *nodeNetworkAddress, EntityType type) { +MessageChannel *NetworkBroker::getNewMessageChannelForRemoteHost(CNetworkAddress *nodeNetworkAddress, EntityType type) { CHAOS_ASSERT(nodeNetworkAddress) MessageChannel *channel = NULL; switch (type) { @@ -486,12 +490,15 @@ MessageChannel *NetworkBroker::getNewMessageChannelForRemoteHost(CNodeNetworkAdd break; case MDS: - channel = new MDSMessageChannel(this, nodeNetworkAddress); + channel = new MDSMessageChannel(this, static_cast<CNodeNetworkAddress*>(nodeNetworkAddress)); break; case DEVICE: channel = new DeviceMessageChannel(this, static_cast<CDeviceNetworkAddress*>(nodeNetworkAddress)); break; + case PERFORMANCE: + channel = new common::message::PerformanceNodeChannel(this, nodeNetworkAddress, performance_session_managment.getLocalDirectIOClientInstance()); + break; } //check if the channel has been created if(channel){ @@ -522,6 +529,11 @@ DeviceMessageChannel *NetworkBroker::getDeviceMessageChannelFromAddress(CDeviceN return static_cast<DeviceMessageChannel*>(getNewMessageChannelForRemoteHost(deviceNetworkAddress, DEVICE)); } +//!performance channel creation +chaos::common::message::PerformanceNodeChannel *NetworkBroker::getPerformanceChannelFromAddress(CNetworkAddress *node_network_address) { + return static_cast<chaos::common::message::PerformanceNodeChannel*>(getNewMessageChannelForRemoteHost(node_network_address, PERFORMANCE)); +} + //!Channel deallocation /*! Perform the message channel deallocation @@ -564,4 +576,3 @@ chaos_directio::DirectIOClient *NetworkBroker::getDirectIOClientInstance() { MB_LAPP << "Allocate a new DirectIOClient of type " << directIOClientImpl; return ObjectFactoryRegister<common::direct_io::DirectIOClient>::getInstance()->getNewInstanceByName(directIOClientImpl.c_str()); } - diff --git a/chaos/common/network/NetworkBroker.h b/chaos/common/network/NetworkBroker.h index 6235fc60a..d397ff4c8 100644 --- a/chaos/common/network/NetworkBroker.h +++ b/chaos/common/network/NetworkBroker.h @@ -30,10 +30,10 @@ #include <chaos/common/action/DeclareAction.h> #include <chaos/common/action/EventAction.h> #include <chaos/common/network/CNodeNetworkAddress.h> -#include <chaos/common/utility/SetupStateManager.h> #include <chaos/common/event/channel/EventChannel.h> #include <chaos/common/event/evt_desc/EventDescriptor.h> #include <chaos/common/network/NetworkForwardInfo.h> +#include <chaos/common/network/PerformanceManagment.h> #include <chaos/common/utility/StartableService.h> #include <chaos/common/direct_io/DirectIO.h> @@ -51,9 +51,10 @@ namespace chaos { Constants that identify the type of the channel to create */ typedef enum { - RAW = 0, /*!< Identify a raw channel used to send data pack to remote server */ - MDS, /*!< Identify a mds specific channel used to send data pack to the metadataserver */ - DEVICE /*!< Identify a device specific channel used to send data pack to the target control unit */ + RAW = 0, /*!< Identify a raw channel used to send data pack to remote server */ + MDS, /*!< Identify a mds specific channel used to send data pack to the metadataserver */ + DEVICE, /*!< Identify a device specific channel used to send data pack to the target control unit */ + PERFORMANCE /*!< Identify a performance specific channel used to send and receive various performance information and test between two chaos node using directio system */ } EntityType; class MessageChannel; @@ -63,6 +64,12 @@ namespace chaos { class AbstractCommandDispatcher; class AbstractEventDispatcher; + namespace common { + namespace message { + class PerformanceNodeChannel; + } + } + namespace event { namespace channel { class AlertEventChannel; @@ -79,10 +86,13 @@ namespace chaos { chaos rpc client and server abstract class and to the message dispatcher abstract class. It abstract the !CHAOS rule for sending message and wait for answer and other facility. */ - class NetworkBroker: private SetupStateManager, public utility::StartableService { + class NetworkBroker: public utility::StartableService { //!Event Client for event forwarding std::string directIOClientImpl; + //! performance session managment + chaos::common::network::PerformanceManagment performance_session_managment; + //!Direct IO server interface common::direct_io::DirectIOServer *directIOServer; //! Direct IO dispatcher @@ -128,7 +138,7 @@ namespace chaos { \param nodeNetworkAddress node address info \param type channel type to create */ - MessageChannel *getNewMessageChannelForRemoteHost(CNodeNetworkAddress *nodeNetworkAddress, EntityType type); + MessageChannel *getNewMessageChannelForRemoteHost(CNetworkAddress *nodeNetworkAddress, EntityType type); public: @@ -236,7 +246,12 @@ namespace chaos { \param message the message coded into key/value semantics \param onThisThread if true the message is forwarded in the same thread of the caller */ - bool submitMessage(string& serverAndPort, chaos_data::CDataWrapper *message, NetworkErrorHandler handler = NULL, const char * senderIdentifier = NULL, int64_t senderTag = (int64_t)0, bool onThisThread=false); + bool submitMessage(string& serverAndPort, + chaos_data::CDataWrapper *message, + NetworkErrorHandler handler = NULL, + const char * senderIdentifier = NULL, + int64_t senderTag = (int64_t)0, + bool onThisThread=false); //!message request /*! @@ -245,7 +260,12 @@ namespace chaos { \param request the request coded into key/value semantics \param onThisThread if true the message is forwarded in the same thread of the caller */ - bool submiteRequest(string& serverAndPort, chaos_data::CDataWrapper *request, NetworkErrorHandler handler = NULL, const char * senderIdentifier = NULL, int64_t senderTag = (int64_t)0, bool onThisThread=false); + bool submiteRequest(string& serverAndPort, + chaos_data::CDataWrapper *request, + NetworkErrorHandler handler = NULL, + const char * senderIdentifier = NULL, + int64_t senderTag = (int64_t)0, + bool onThisThread=false); //!message submition /*! @@ -266,6 +286,13 @@ namespace chaos { */ DeviceMessageChannel *getDeviceMessageChannelFromAddress(CDeviceNetworkAddress *deviceNetworkAddress); + //!performance channel creation + /*! + Performe the creation of performance channel thowards a network node + \param note_network_address the address of the chaos node(network broker) + */ + chaos::common::message::PerformanceNodeChannel *getPerformanceChannelFromAddress(CNetworkAddress *node_network_address); + //!Rpc Channel deallocation /*! Perform the message channel deallocation @@ -299,7 +326,6 @@ namespace chaos { \return A new instance of the direct io client */ chaos_directio::DirectIOClient *getDirectIOClientInstance(); - - }; + }; } #endif diff --git a/chaos/common/network/PerformanceManagment.cpp b/chaos/common/network/PerformanceManagment.cpp index 3cf115c17..c93557ab9 100644 --- a/chaos/common/network/PerformanceManagment.cpp +++ b/chaos/common/network/PerformanceManagment.cpp @@ -8,7 +8,7 @@ #include <chaos/common/network/NetworkBroker.h> #include <chaos/common/direct_io/DirectIOClient.h> #include <chaos/common/network/PerformanceManagment.h> -#include <chaos/common/direct_io/DirectIOPerformanceLoop.h> +#include <chaos/common/direct_io/DirectIOPerformanceSession.h> #define PM_LOG_HEAD "[PerformanceManagment] - " @@ -22,8 +22,7 @@ namespace chaos_direct_io = chaos::common::direct_io; PerformanceManagment::PerformanceManagment(NetworkBroker *_network_broker): network_broker(_network_broker), -global_performance_connection(NULL), -queue_purgeable_performance_node(1) { +global_performance_connection(NULL){ //create the description for init preformance session rpc action AbstActionDescShrPtr actionDescription = DeclareAction::addActionDescritionInstance<PerformanceManagment>(this, @@ -31,8 +30,13 @@ queue_purgeable_performance_node(1) { PerformanceSystemRpcKey::SYSTEM_PERFORMANCE_DOMAIN, PerformanceSystemRpcKey::ACTION_PERFORMANCE_INIT_SESSION, "Initialize the performance session"); + actionDescription = DeclareAction::addActionDescritionInstance<PerformanceManagment>(this, + &PerformanceManagment::stopPerformanceSession, + PerformanceSystemRpcKey::SYSTEM_PERFORMANCE_DOMAIN, + PerformanceSystemRpcKey::ACTION_PERFORMANCE_CLOSE_SESSION, + "Deinitialize the performance session"); + - } PerformanceManagment::~PerformanceManagment() { @@ -40,7 +44,11 @@ PerformanceManagment::~PerformanceManagment() { //init the implementation void PerformanceManagment::init(void *init_parameter) throw(chaos::CException) { + //check the network broker setup if(!network_broker) throw chaos::CException(-1, "NetworkBroker not set", __PRETTY_FUNCTION__); + + //register the action + network_broker->registerAction(this); } //Start the implementation @@ -52,63 +60,74 @@ void PerformanceManagment::start() throw(chaos::CException) { //Stop the implementation void PerformanceManagment::stop() throw(chaos::CException) { - chaos_direct_io::DirectIOPerformanceLoop *performance_node_to_purge = NULL; PMLAPP_ << "Stop the purger thread"; work_on_purge = false; purge_wait_semaphore.unlock(); thread_purge->join(); PMLAPP_ << "Purger thread stoppped"; - + PMLAPP_ << "Remove all orfaned performance session"; - while (!queue_purgeable_performance_node.empty()) { - if(queue_purgeable_performance_node.pop(performance_node_to_purge)) { - disposePerformanceNode(performance_node_to_purge); - } else { - //no more element - break; - } + purge_map(); +} + +chaos_direct_io::DirectIOClient *PerformanceManagment::getLocalDirectIOClientInstance() { + boost::unique_lock<boost::mutex>(mutext_client_connection); + if(!global_performance_connection) { + global_performance_connection = network_broker->getDirectIOClientInstance(); + if(!global_performance_connection) throw chaos::CException(-1, "Performance direct io client creation error", __PRETTY_FUNCTION__); + + chaos::utility::InizializableService::initImplementation(global_performance_connection, NULL, global_performance_connection->getName(), __PRETTY_FUNCTION__); } + return global_performance_connection; } //Deinit the implementation void PerformanceManagment::deinit() throw(chaos::CException) { + //register the action + network_broker->deregisterAction(this); + if(global_performance_connection) { chaos::utility::InizializableService::deinitImplementation(global_performance_connection, global_performance_connection->getName(), __PRETTY_FUNCTION__); } } void PerformanceManagment::purge_worker() { - chaos_direct_io::DirectIOPerformanceLoop *performance_node_to_purge = NULL; while(work_on_purge) { - for (int idx = 10; idx<=0 && !queue_purgeable_performance_node.empty(); idx--) { - if(queue_purgeable_performance_node.pop(performance_node_to_purge)) { - disposePerformanceNode(performance_node_to_purge); - } else { - //no more element - } - } + purge_map(); purge_wait_semaphore.wait(5000); } } +void PerformanceManagment::purge_map() { + boost::unique_lock<boost::shared_mutex> lock(mutex_map_purgeable); + for(std::map<std::string, chaos_direct_io::DirectIOPerformanceSession*>::iterator iter = map_purgeable_performance_node.begin(); + iter != map_purgeable_performance_node.end();) { + disposePerformanceNode(iter->second); + map_purgeable_performance_node.erase(iter++); + } + +} + void PerformanceManagment::handleEvent(chaos_direct_io::DirectIOClientConnection *client_connection, chaos_direct_io::DirectIOClientConnectionStateType::DirectIOClientConnectionStateType event) { if(event != chaos_direct_io::DirectIOClientConnectionStateType::DirectIOClientConnectionEventDisconnected) return; - PMLDBG_ << "Managing disconenction for "<<client_connection->getServerDescription(); - //add to purgeable map - queue_purgeable_performance_node.push(TemplatedKeyObjectContainer::accessItem(client_connection->getServerDescription())); + boost::unique_lock<boost::shared_mutex> lock(mutex_map_purgeable); + if(map_purgeable_performance_node.count(client_connection->getServerDescription())) { + PMLDBG_ << "Performance session for remote address "<<client_connection->getServerDescription() << " Already in purge map"; + return; + } + map_purgeable_performance_node.insert(make_pair(client_connection->getServerDescription(), TemplatedKeyObjectContainer::accessItem(client_connection->getServerDescription()))); + PMLDBG_ << "Performance session for remote address "<<client_connection->getServerDescription() << " added in purge map"; - PMLDBG_ << "Performance node added to the purgeable ndoe queue"; - } -void PerformanceManagment::disposePerformanceNode(chaos_direct_io::DirectIOPerformanceLoop *performance_node) { +void PerformanceManagment::disposePerformanceNode(chaos_direct_io::DirectIOPerformanceSession *performance_node) { std::string server_description; try { - server_description = performance_node->client_connection->getServerDescription(); + server_description = performance_node->client_connection->getURL(); PMLAPP_ << "Dispose the performance node for "<<server_description; - chaos::utility::InizializableService::initImplementation(performance_node, NULL, "DirectIOPerformanceLoop", __PRETTY_FUNCTION__); + chaos::utility::InizializableService::initImplementation(performance_node, NULL, "DirectIOPerformanceSession", __PRETTY_FUNCTION__); } catch(CException ex) {} catch(...) {} @@ -125,9 +144,11 @@ void PerformanceManagment::disposePerformanceNode(chaos_direct_io::DirectIOPerf PMLAPP_ << "Release server endpoint for performance node for "<<server_description; network_broker->releaseDirectIOServerEndpoint(performance_node->server_endpoint); } + + TemplatedKeyObjectContainer::deregisterElementKey(server_description); } -void PerformanceManagment::freeObject(std::string server_description, chaos_direct_io::DirectIOPerformanceLoop *performance_node) { +void PerformanceManagment::freeObject(std::string server_description, chaos_direct_io::DirectIOPerformanceSession *performance_node) { disposePerformanceNode(performance_node); } @@ -135,36 +156,42 @@ chaos_data::CDataWrapper* PerformanceManagment::startPerformanceSession(chaos_da if(!param) return NULL; if(!param->hasKey(PerformanceSystemRpcKey::KEY_REQUEST_SERVER_DESCRITPION)) throw chaos::CException(-1, "Requester server description not found", __PRETTY_FUNCTION__); - + + chaos_data::CDataWrapper *result = NULL; + //we can initiate performance session allcoation std::string req_server_description = param->getStringValue(PerformanceSystemRpcKey::KEY_REQUEST_SERVER_DESCRITPION); - + if(TemplatedKeyObjectContainer::hasKey(req_server_description)) throw chaos::CException(-2, "performance sesison for requester already allocated", __PRETTY_FUNCTION__); PMLAPP_<< "Create new performance session for " << req_server_description; - - //lock the map - if(!global_performance_connection) { - global_performance_connection = network_broker->getDirectIOClientInstance(); - if(global_performance_connection) throw chaos::CException(-1, "Performance direct io client creation error", __PRETTY_FUNCTION__); - - chaos::utility::InizializableService::initImplementation(global_performance_connection, NULL, global_performance_connection->getName(), __PRETTY_FUNCTION__); - } + //ensure lcient creation on local instance + getLocalDirectIOClientInstance(); + //get the local connection to the requester form shared client chaos_direct_io::DirectIOClientConnection *client_connection = global_performance_connection->getNewConnection(req_server_description); + client_connection->setEventHandler(this); + + //get the server endpoint for the requester chaos_direct_io::DirectIOServerEndpoint *server_endpoint = network_broker->getDirectIOServerEndpoint(); - chaos_direct_io::DirectIOPerformanceLoop *performace_node = new chaos_direct_io::DirectIOPerformanceLoop(client_connection, server_endpoint); + + //alocate new session + chaos_direct_io::DirectIOPerformanceSession *performace_node = new chaos_direct_io::DirectIOPerformanceSession(client_connection, server_endpoint); try { - chaos::utility::InizializableService::initImplementation(performace_node, NULL, "DirectIOPerformanceLoop", __PRETTY_FUNCTION__); - + chaos::utility::InizializableService::initImplementation(performace_node, NULL, "DirectIOPerformanceSession", __PRETTY_FUNCTION__); + + //set the result value to the local endpoint url + result = new chaos_data::CDataWrapper(); + result->addStringValue(PerformanceSystemRpcKey::KEY_REQUEST_SERVER_DESCRITPION, server_endpoint->getUrl()); + } catch(CException ex) { - chaos::utility::InizializableService::deinitImplementation(performace_node, "DirectIOPerformanceLoop", __PRETTY_FUNCTION__); + chaos::utility::InizializableService::deinitImplementation(performace_node, "DirectIOPerformanceSession", __PRETTY_FUNCTION__); if(client_connection) global_performance_connection->releaseConnection(client_connection); if(server_endpoint) network_broker->releaseDirectIOServerEndpoint(server_endpoint); throw ex; } catch(...) { - chaos::utility::InizializableService::deinitImplementation(performace_node, "DirectIOPerformanceLoop", __PRETTY_FUNCTION__); + chaos::utility::InizializableService::deinitImplementation(performace_node, "DirectIOPerformanceSession", __PRETTY_FUNCTION__); if(client_connection) global_performance_connection->releaseConnection(client_connection); if(server_endpoint) network_broker->releaseDirectIOServerEndpoint(server_endpoint); throw chaos::CException(-3, "Generic exception on initialization of performance loop", __PRETTY_FUNCTION__); @@ -173,5 +200,23 @@ chaos_data::CDataWrapper* PerformanceManagment::startPerformanceSession(chaos_da TemplatedKeyObjectContainer::registerElement(req_server_description, performace_node); //get the node server description for send it to the requester - return NULL; + return result; } + +chaos_data::CDataWrapper* PerformanceManagment::stopPerformanceSession(chaos_data::CDataWrapper *param, bool& detach) throw(chaos::CException) { + if(!param) return NULL; + if(!param->hasKey(PerformanceSystemRpcKey::KEY_REQUEST_SERVER_DESCRITPION)) + throw chaos::CException(-1, "Requester server description not found", __PRETTY_FUNCTION__); + //we can initiate performance session allcoation + std::string req_server_description = param->getStringValue(PerformanceSystemRpcKey::KEY_REQUEST_SERVER_DESCRITPION); + + if(!TemplatedKeyObjectContainer::hasKey(req_server_description)) + throw chaos::CException(-2, "performance sesison for requester already allocated", __PRETTY_FUNCTION__); + + //we can proceed to the closing of the performance session + chaos_direct_io::DirectIOPerformanceSession *performace_node = TemplatedKeyObjectContainer::accessItem(req_server_description); + + // dispose the sesison node + disposePerformanceNode(performace_node); + return NULL; +} \ No newline at end of file diff --git a/chaos/common/network/PerformanceManagment.h b/chaos/common/network/PerformanceManagment.h index 422dd652c..166ef7287 100644 --- a/chaos/common/network/PerformanceManagment.h +++ b/chaos/common/network/PerformanceManagment.h @@ -13,10 +13,11 @@ #include <chaos/common/thread/WaitSemaphore.h> #include <chaos/common/utility/StartableService.h> #include <chaos/common/utility/TemplatedKeyObjectContainer.h> - +#include <chaos/common/direct_io/DirectIOClientConnection.h> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <boost/lockfree/queue.hpp> +#include <string> #include <map> #define PERFORMANCE_MANAGMENT_RPC_DOMAIN "system:perf" @@ -32,7 +33,7 @@ namespace chaos { namespace direct_io { //class declaration class DirectIOClient; - class DirectIOPerformanceLoop; + class DirectIOPerformanceSession; } namespace network { @@ -42,22 +43,28 @@ namespace chaos { public chaos::utility::StartableService, public chaos::DeclareAction, public direct_io::DirectIOClientConnectionEventHandler, - protected ::chaos::utility::TemplatedKeyObjectContainer<std::string, chaos_direct_io::DirectIOPerformanceLoop*> { - friend class NetworkBroker; + protected ::chaos::utility::TemplatedKeyObjectContainer<std::string, chaos_direct_io::DirectIOPerformanceSession*> { + friend class chaos::NetworkBroker; chaos::NetworkBroker *network_broker; + + boost::mutex mutext_client_connection; chaos_direct_io::DirectIOClient *global_performance_connection; bool work_on_purge; WaitSemaphore purge_wait_semaphore; boost::shared_ptr<boost::thread> thread_purge; - boost::lockfree::queue<chaos_direct_io::DirectIOPerformanceLoop*, boost::lockfree::fixed_sized<false> > queue_purgeable_performance_node; + + boost::shared_mutex mutex_map_purgeable; + std::map<std::string, chaos_direct_io::DirectIOPerformanceSession*> map_purgeable_performance_node; PerformanceManagment(NetworkBroker *_network_broker); ~PerformanceManagment(); - void disposePerformanceNode(chaos_direct_io::DirectIOPerformanceLoop *performance_node); - + void disposePerformanceNode(chaos_direct_io::DirectIOPerformanceSession *performance_node); + void purge_map(); void purge_worker(); + + chaos_direct_io::DirectIOClient *getLocalDirectIOClientInstance(); protected: //! Start the implementation void init(void *init_parameter) throw(chaos::CException); @@ -72,8 +79,10 @@ namespace chaos { void deinit() throw(chaos::CException); chaos_data::CDataWrapper* startPerformanceSession(chaos_data::CDataWrapper *param, bool& detach) throw(chaos::CException); - - void freeObject(std::string server_description, chaos_direct_io::DirectIOPerformanceLoop *performance_node); + + chaos_data::CDataWrapper* stopPerformanceSession(chaos_data::CDataWrapper *param, bool& detach) throw(chaos::CException); + + void freeObject(std::string server_description, chaos_direct_io::DirectIOPerformanceSession *performance_node); void handleEvent(direct_io::DirectIOClientConnection *client_connection, direct_io::DirectIOClientConnectionStateType::DirectIOClientConnectionStateType event); }; diff --git a/chaos/common/utility/TemplatedKeyObjectContainer.h b/chaos/common/utility/TemplatedKeyObjectContainer.h index 7ab91a848..849659e2e 100644 --- a/chaos/common/utility/TemplatedKeyObjectContainer.h +++ b/chaos/common/utility/TemplatedKeyObjectContainer.h @@ -76,7 +76,9 @@ namespace chaos { inline O accessItem(K key) { boost::shared_lock<boost::shared_mutex> lock(mutex_organizer_map); if(organizer_map.count(key)) - return organizer_map[key]; + return organizer_map[key]; + else + return NULL; } }; diff --git a/chaos/ui_toolkit/HighLevelApi/HLDataApi.cpp b/chaos/ui_toolkit/HighLevelApi/HLDataApi.cpp index 3556fa8e9..f180377cf 100644 --- a/chaos/ui_toolkit/HighLevelApi/HLDataApi.cpp +++ b/chaos/ui_toolkit/HighLevelApi/HLDataApi.cpp @@ -18,7 +18,6 @@ * limitations under the License. */ #include "HLDataApi.h" - using namespace std; using namespace chaos; using namespace chaos::ui; @@ -80,4 +79,4 @@ void HLDataApi::disposeDeviceControllerPtr(DeviceController *ctrl) throw (CExcep //dispose the devie delete ctrl; ctrl = NULL; -} +} \ No newline at end of file diff --git a/chaos/ui_toolkit/HighLevelApi/HLDataApi.h b/chaos/ui_toolkit/HighLevelApi/HLDataApi.h index a4c37e1e4..047ebe70f 100644 --- a/chaos/ui_toolkit/HighLevelApi/HLDataApi.h +++ b/chaos/ui_toolkit/HighLevelApi/HLDataApi.h @@ -22,8 +22,9 @@ #include <map> #include <string> #include <chaos/ui_toolkit/HighLevelApi/DeviceController.h> +#include <chaos/common/message/PerformanceNodeChannel.h> #include <chaos/common/utility/Singleton.h> - +namespace chaos_message = chaos::common::message; namespace chaos { namespace ui { using namespace std; diff --git a/chaos/ui_toolkit/LowLevelApi/LLRpcApi.cpp b/chaos/ui_toolkit/LowLevelApi/LLRpcApi.cpp index 21d660010..fc18cf095 100644 --- a/chaos/ui_toolkit/LowLevelApi/LLRpcApi.cpp +++ b/chaos/ui_toolkit/LowLevelApi/LLRpcApi.cpp @@ -110,10 +110,22 @@ DeviceMessageChannel *LLRpcApi::getNewDeviceMessageChannel(CDeviceNetworkAddress return network_broker->getDeviceMessageChannelFromAddress(deviceNetworkAddress); } -void LLRpcApi::deleteMessageChannel(NodeMessageChannel *channelToDispose) { - network_broker->disposeMessageChannel(channelToDispose); +/*! + Return a new device channel + */ +chaos::common::message::PerformanceNodeChannel *LLRpcApi::getNewPerformanceChannel(CNetworkAddress *note_network_address) { + return network_broker->getPerformanceChannelFromAddress(note_network_address); } +void LLRpcApi::deleteMessageChannel(MessageChannel *channelToDispose) { + network_broker->disposeMessageChannel(channelToDispose); +} +/*! + Delete a previously instantiatedchannel + */ +void LLRpcApi::deleteMessageChannel(NodeMessageChannel *channelToDispose) { + network_broker->disposeMessageChannel(channelToDispose); +} event::channel::AlertEventChannel *LLRpcApi::getNewAlertEventChannel() throw (CException) { return network_broker->getNewAlertEventChannel(); diff --git a/chaos/ui_toolkit/LowLevelApi/LLRpcApi.h b/chaos/ui_toolkit/LowLevelApi/LLRpcApi.h index 05639b622..99c658d20 100644 --- a/chaos/ui_toolkit/LowLevelApi/LLRpcApi.h +++ b/chaos/ui_toolkit/LowLevelApi/LLRpcApi.h @@ -76,11 +76,18 @@ namespace chaos { */ DeviceMessageChannel *getNewDeviceMessageChannel(CDeviceNetworkAddress *deviceNetworkAddress); + chaos::common::message::PerformanceNodeChannel *getNewPerformanceChannel(CNetworkAddress *note_network_address); + /*! Delete a previously instantiatedchannel */ - void deleteMessageChannel(NodeMessageChannel*); + void deleteMessageChannel(MessageChannel*); + /*! + Delete a previously instantiatedchannel + */ + void deleteMessageChannel(NodeMessageChannel*); + event::channel::AlertEventChannel *getNewAlertEventChannel() throw (CException); event::channel::InstrumentEventChannel *getNewInstrumentEventChannel() throw (CException); void disposeEventChannel(event::channel::EventChannel *) throw (CException); diff --git a/example/ChaosCLI/main.cpp b/example/ChaosCLI/main.cpp index ba0c316b1..1665945c9 100644 --- a/example/ChaosCLI/main.cpp +++ b/example/ChaosCLI/main.cpp @@ -122,7 +122,7 @@ int main (int argc, char* argv[] ) CDeviceNetworkAddress deviceNetworkAddress; CUStateKey::ControlUnitState deviceState; - + //! [UIToolkit Attribute Init] ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<string>(OPT_DEVICE_ID, "The identification string of the device"); ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<uint32_t>(OPT_TIMEOUT, "Timeout rpc in milliseconds", 2000, &timeout); @@ -247,38 +247,38 @@ int main (int argc, char* argv[] ) } break; case 6: { - //check sc - uint64_t command_id = 0; - auto_ptr<CDataWrapper> userData; - bool canBeExecuted = scAlias.size() > 0; - canBeExecuted = canBeExecuted && (checkSubmissionRule(scSubmissionRule) != -1); - if(canBeExecuted) { - if(ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->hasOption(OPT_SL_COMMAND_SUBMISSION_RETRY_DELAY)) { - std::cout << "Custom checker delay submitted -> " << scSubmissionSubmissionRetryDelay << std::endl; - } - - if(ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->hasOption(OPT_SL_COMMAND_DATA)) { - userData.reset(new CDataWrapper()); - if(userData.get())userData->setSerializedJsonData(scUserData.c_str()); - std::cout << "User data submitted" << std::endl; - std::cout << "-----------------------------------------" << std::endl; - std::cout << userData->getJSONString() << std::endl; - std::cout << "-----------------------------------------" << std::endl; - } - err = controller->submitSlowControlCommand(scAlias, - static_cast<chaos_batch::SubmissionRuleType::SubmissionRule>(checkSubmissionRule(scSubmissionRule)), - scSubmissionPriority, - command_id, - scExecutionChannel, - scSubmissionSchedulerDelay, - scSubmissionSubmissionRetryDelay, - userData.get()); - if(err == ErrorCode::EC_TIMEOUT) throw CException(2, "Time out on connection", "Set device to deinit state"); - std::cout << "Command submitted successfully his command idedentification number(cidn) is= " << command_id << std::endl; - } else { - throw CException(29, "Device can't be in deinit state", "Send slow command"); - } - } + //check sc + uint64_t command_id = 0; + auto_ptr<CDataWrapper> userData; + bool canBeExecuted = scAlias.size() > 0; + canBeExecuted = canBeExecuted && (checkSubmissionRule(scSubmissionRule) != -1); + if(canBeExecuted) { + if(ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->hasOption(OPT_SL_COMMAND_SUBMISSION_RETRY_DELAY)) { + std::cout << "Custom checker delay submitted -> " << scSubmissionSubmissionRetryDelay << std::endl; + } + + if(ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->hasOption(OPT_SL_COMMAND_DATA)) { + userData.reset(new CDataWrapper()); + if(userData.get())userData->setSerializedJsonData(scUserData.c_str()); + std::cout << "User data submitted" << std::endl; + std::cout << "-----------------------------------------" << std::endl; + std::cout << userData->getJSONString() << std::endl; + std::cout << "-----------------------------------------" << std::endl; + } + err = controller->submitSlowControlCommand(scAlias, + static_cast<chaos_batch::SubmissionRuleType::SubmissionRule>(checkSubmissionRule(scSubmissionRule)), + scSubmissionPriority, + command_id, + scExecutionChannel, + scSubmissionSchedulerDelay, + scSubmissionSubmissionRetryDelay, + userData.get()); + if(err == ErrorCode::EC_TIMEOUT) throw CException(2, "Time out on connection", "Set device to deinit state"); + std::cout << "Command submitted successfully his command idedentification number(cidn) is= " << command_id << std::endl; + } else { + throw CException(29, "Device can't be in deinit state", "Send slow command"); + } + } break; case 7:{ err = controller->killCurrentCommand(); @@ -330,7 +330,7 @@ int main (int argc, char* argv[] ) } if(rtAttributeValue.find(":")== string::npos) { throw CException(2, "Attribute param not well formet, lak of ':' character (param_name:param_value)", "OPCODE 9"); - + } std::string param_name = rtAttributeValue.substr(0, rtAttributeValue.find(":")); std::string param_value = rtAttributeValue.substr(rtAttributeValue.find(":")+1); @@ -404,7 +404,7 @@ int main (int argc, char* argv[] ) if(controller) HLDataApi::getInstance()->disposeDeviceControllerPtr(controller); - + } catch (CException& e) { std::cerr << e.errorCode << " - "<< e.errorDomain << " - " << e.errorMessage << std::endl; } diff --git a/example/ChaosPerformanceTester/CMakeLists.txt b/example/ChaosPerformanceTester/CMakeLists.txt new file mode 100644 index 000000000..7b9994efa --- /dev/null +++ b/example/ChaosPerformanceTester/CMakeLists.txt @@ -0,0 +1,21 @@ +cmake_minimum_required(VERSION 2.6) +option(BUILD_FORCE_32 "Set to ON to enable 32 bit compilation" OFF) + +IF( ($ENV{CHAOS32}) OR (BUILD_FORCE_32) ) + MESSAGE(STATUS "Enabling 32 bit Compilation") + set (CMAKE_C_FLAGS "-m32") + set (CMAKE_CXX_FLAGS "-m32") + set (CMAKE_LINK_FLAGS "-m32") +ENDIF() +ADD_DEFINITIONS(-g -O2 -Wall) + +SET(chaos_perf_tester_src main.cpp) + +INCLUDE_DIRECTORIES(. ${PROJECT_SOURCE_DIR}/../../usr/local/include /usr/local/include /usr/include ${PROJECT_SOURCE_DIR}/example/ChaosCLI/) +LINK_DIRECTORIES(${PROJECT_SOURCE_DIR}/../../usr/local/lib /usr/local/lib) + +ADD_EXECUTABLE(ChaosPerformanceTester ${chaos_perf_tester_src}) + +TARGET_LINK_LIBRARIES(ChaosPerformanceTester chaos_uitoolkit chaos_common boost_program_options boost_system boost_thread boost_chrono boost_regex boost_log boost_log_setup boost_atomic memcached msgpack msgpack-rpc mpio pthread dl) + +INSTALL_TARGETS(/bin ChaosPerformanceTester) \ No newline at end of file diff --git a/example/ChaosPerformanceTester/ChaosPerformanceTester.xcodeproj/project.pbxproj b/example/ChaosPerformanceTester/ChaosPerformanceTester.xcodeproj/project.pbxproj new file mode 100644 index 000000000..1bf0abd99 --- /dev/null +++ b/example/ChaosPerformanceTester/ChaosPerformanceTester.xcodeproj/project.pbxproj @@ -0,0 +1,280 @@ +// !$*UTF8*$! +{ + archiveVersion = 1; + classes = { + }; + objectVersion = 46; + objects = { + +/* Begin PBXBuildFile section */ + 32AF7D5818DB504400537DE6 /* main.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 32AF7D5718DB504400537DE6 /* main.cpp */; }; +/* End PBXBuildFile section */ + +/* Begin PBXCopyFilesBuildPhase section */ + 32AF7D4918DB501E00537DE6 /* CopyFiles */ = { + isa = PBXCopyFilesBuildPhase; + buildActionMask = 2147483647; + dstPath = /usr/share/man/man1/; + dstSubfolderSpec = 0; + files = ( + ); + runOnlyForDeploymentPostprocessing = 1; + }; +/* End PBXCopyFilesBuildPhase section */ + +/* Begin PBXFileReference section */ + 32AF7D4B18DB501E00537DE6 /* ChaosPerformanceTester */ = {isa = PBXFileReference; explicitFileType = "compiled.mach-o.executable"; includeInIndex = 0; path = ChaosPerformanceTester; sourceTree = BUILT_PRODUCTS_DIR; }; + 32AF7D5718DB504400537DE6 /* main.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = main.cpp; sourceTree = "<group>"; }; + 32AF7D5918DB504B00537DE6 /* CMakeLists.txt */ = {isa = PBXFileReference; lastKnownFileType = text; path = CMakeLists.txt; sourceTree = "<group>"; }; +/* End PBXFileReference section */ + +/* Begin PBXFrameworksBuildPhase section */ + 32AF7D4818DB501E00537DE6 /* Frameworks */ = { + isa = PBXFrameworksBuildPhase; + buildActionMask = 2147483647; + files = ( + ); + runOnlyForDeploymentPostprocessing = 0; + }; +/* End PBXFrameworksBuildPhase section */ + +/* Begin PBXGroup section */ + 32AF7D4218DB501E00537DE6 = { + isa = PBXGroup; + children = ( + 32AF7D5918DB504B00537DE6 /* CMakeLists.txt */, + 32AF7D5718DB504400537DE6 /* main.cpp */, + 32AF7D4C18DB501E00537DE6 /* Products */, + ); + sourceTree = "<group>"; + }; + 32AF7D4C18DB501E00537DE6 /* Products */ = { + isa = PBXGroup; + children = ( + 32AF7D4B18DB501E00537DE6 /* ChaosPerformanceTester */, + ); + name = Products; + sourceTree = "<group>"; + }; +/* End PBXGroup section */ + +/* Begin PBXNativeTarget section */ + 32AF7D4A18DB501E00537DE6 /* ChaosPerformanceTester */ = { + isa = PBXNativeTarget; + buildConfigurationList = 32AF7D5418DB501E00537DE6 /* Build configuration list for PBXNativeTarget "ChaosPerformanceTester" */; + buildPhases = ( + 32AF7D4718DB501E00537DE6 /* Sources */, + 32AF7D4818DB501E00537DE6 /* Frameworks */, + 32AF7D4918DB501E00537DE6 /* CopyFiles */, + ); + buildRules = ( + ); + dependencies = ( + ); + name = ChaosPerformanceTester; + productName = ChaosPerformanceTester; + productReference = 32AF7D4B18DB501E00537DE6 /* ChaosPerformanceTester */; + productType = "com.apple.product-type.tool"; + }; +/* End PBXNativeTarget section */ + +/* Begin PBXProject section */ + 32AF7D4318DB501E00537DE6 /* Project object */ = { + isa = PBXProject; + attributes = { + LastUpgradeCheck = 0510; + ORGANIZATIONNAME = infn; + }; + buildConfigurationList = 32AF7D4618DB501E00537DE6 /* Build configuration list for PBXProject "ChaosPerformanceTester" */; + compatibilityVersion = "Xcode 3.2"; + developmentRegion = English; + hasScannedForEncodings = 0; + knownRegions = ( + en, + ); + mainGroup = 32AF7D4218DB501E00537DE6; + productRefGroup = 32AF7D4C18DB501E00537DE6 /* Products */; + projectDirPath = ""; + projectRoot = ""; + targets = ( + 32AF7D4A18DB501E00537DE6 /* ChaosPerformanceTester */, + ); + }; +/* End PBXProject section */ + +/* Begin PBXSourcesBuildPhase section */ + 32AF7D4718DB501E00537DE6 /* Sources */ = { + isa = PBXSourcesBuildPhase; + buildActionMask = 2147483647; + files = ( + 32AF7D5818DB504400537DE6 /* main.cpp in Sources */, + ); + runOnlyForDeploymentPostprocessing = 0; + }; +/* End PBXSourcesBuildPhase section */ + +/* Begin XCBuildConfiguration section */ + 32AF7D5218DB501E00537DE6 /* Debug */ = { + isa = XCBuildConfiguration; + buildSettings = { + ALWAYS_SEARCH_USER_PATHS = NO; + CLANG_CXX_LANGUAGE_STANDARD = "gnu++0x"; + CLANG_CXX_LIBRARY = "libc++"; + CLANG_ENABLE_MODULES = YES; + CLANG_ENABLE_OBJC_ARC = YES; + CLANG_WARN_BOOL_CONVERSION = YES; + CLANG_WARN_CONSTANT_CONVERSION = YES; + CLANG_WARN_DIRECT_OBJC_ISA_USAGE = YES_ERROR; + CLANG_WARN_EMPTY_BODY = YES; + CLANG_WARN_ENUM_CONVERSION = YES; + CLANG_WARN_INT_CONVERSION = YES; + CLANG_WARN_OBJC_ROOT_CLASS = YES_ERROR; + CLANG_WARN__DUPLICATE_METHOD_MATCH = YES; + COPY_PHASE_STRIP = NO; + GCC_C_LANGUAGE_STANDARD = gnu99; + GCC_DYNAMIC_NO_PIC = NO; + GCC_ENABLE_OBJC_EXCEPTIONS = YES; + GCC_OPTIMIZATION_LEVEL = 0; + GCC_PREPROCESSOR_DEFINITIONS = ( + "DEBUG=1", + "$(inherited)", + ); + GCC_SYMBOLS_PRIVATE_EXTERN = NO; + GCC_WARN_64_TO_32_BIT_CONVERSION = YES; + GCC_WARN_ABOUT_RETURN_TYPE = YES_ERROR; + GCC_WARN_UNDECLARED_SELECTOR = YES; + GCC_WARN_UNINITIALIZED_AUTOS = YES_AGGRESSIVE; + GCC_WARN_UNUSED_FUNCTION = YES; + GCC_WARN_UNUSED_VARIABLE = YES; + MACOSX_DEPLOYMENT_TARGET = 10.9; + ONLY_ACTIVE_ARCH = YES; + SDKROOT = macosx; + }; + name = Debug; + }; + 32AF7D5318DB501E00537DE6 /* Release */ = { + isa = XCBuildConfiguration; + buildSettings = { + ALWAYS_SEARCH_USER_PATHS = NO; + CLANG_CXX_LANGUAGE_STANDARD = "gnu++0x"; + CLANG_CXX_LIBRARY = "libc++"; + CLANG_ENABLE_MODULES = YES; + CLANG_ENABLE_OBJC_ARC = YES; + CLANG_WARN_BOOL_CONVERSION = YES; + CLANG_WARN_CONSTANT_CONVERSION = YES; + CLANG_WARN_DIRECT_OBJC_ISA_USAGE = YES_ERROR; + CLANG_WARN_EMPTY_BODY = YES; + CLANG_WARN_ENUM_CONVERSION = YES; + CLANG_WARN_INT_CONVERSION = YES; + CLANG_WARN_OBJC_ROOT_CLASS = YES_ERROR; + CLANG_WARN__DUPLICATE_METHOD_MATCH = YES; + COPY_PHASE_STRIP = YES; + DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym"; + ENABLE_NS_ASSERTIONS = NO; + GCC_C_LANGUAGE_STANDARD = gnu99; + GCC_ENABLE_OBJC_EXCEPTIONS = YES; + GCC_WARN_64_TO_32_BIT_CONVERSION = YES; + GCC_WARN_ABOUT_RETURN_TYPE = YES_ERROR; + GCC_WARN_UNDECLARED_SELECTOR = YES; + GCC_WARN_UNINITIALIZED_AUTOS = YES_AGGRESSIVE; + GCC_WARN_UNUSED_FUNCTION = YES; + GCC_WARN_UNUSED_VARIABLE = YES; + MACOSX_DEPLOYMENT_TARGET = 10.9; + SDKROOT = macosx; + }; + name = Release; + }; + 32AF7D5518DB501E00537DE6 /* Debug */ = { + isa = XCBuildConfiguration; + buildSettings = { + CLANG_CXX_LANGUAGE_STANDARD = "compiler-default"; + CLANG_CXX_LIBRARY = "libstdc++"; + CONFIGURATION_BUILD_DIR = "$(SRCROOT)/../../usr/local/bin"; + HEADER_SEARCH_PATHS = ( + ../../, + /usr/local/include, + ../../usr/local/include, + ); + LIBRARY_SEARCH_PATHS = ( + /usr/local/lib, + "$(SRCROOT)/../../usr/local/lib", + ); + OTHER_LDFLAGS = ( + "-lboost_chrono", + "-lmemcached", + "-lboost_system", + "-lboost_thread", + "-lboost_program_options", + "-lboost_regex", + "-lboost_filesystem", + "-lmpio", + "-lmsgpack", + "-lmsgpack-rpc", + "-lboost_log", + "-lboost_log_setup", + "-lchaos_uitoolkit", + "-lchaos_common", + ); + PRODUCT_NAME = "$(TARGET_NAME)"; + }; + name = Debug; + }; + 32AF7D5618DB501E00537DE6 /* Release */ = { + isa = XCBuildConfiguration; + buildSettings = { + CLANG_CXX_LANGUAGE_STANDARD = "compiler-default"; + CLANG_CXX_LIBRARY = "libstdc++"; + CONFIGURATION_BUILD_DIR = "$(SRCROOT)/../../usr/local/bin"; + HEADER_SEARCH_PATHS = ( + ../../, + /usr/local/include, + ../../usr/local/include, + ); + LIBRARY_SEARCH_PATHS = ( + /usr/local/lib, + "$(SRCROOT)/../../usr/local/lib", + ); + OTHER_LDFLAGS = ( + "-lboost_chrono", + "-lmemcached", + "-lboost_system", + "-lboost_thread", + "-lboost_program_options", + "-lboost_regex", + "-lboost_filesystem", + "-lmpio", + "-lmsgpack", + "-lmsgpack-rpc", + "-lboost_log", + "-lboost_log_setup", + "-lchaos_uitoolkit", + "-lchaos_common", + ); + PRODUCT_NAME = "$(TARGET_NAME)"; + }; + name = Release; + }; +/* End XCBuildConfiguration section */ + +/* Begin XCConfigurationList section */ + 32AF7D4618DB501E00537DE6 /* Build configuration list for PBXProject "ChaosPerformanceTester" */ = { + isa = XCConfigurationList; + buildConfigurations = ( + 32AF7D5218DB501E00537DE6 /* Debug */, + 32AF7D5318DB501E00537DE6 /* Release */, + ); + defaultConfigurationIsVisible = 0; + defaultConfigurationName = Release; + }; + 32AF7D5418DB501E00537DE6 /* Build configuration list for PBXNativeTarget "ChaosPerformanceTester" */ = { + isa = XCConfigurationList; + buildConfigurations = ( + 32AF7D5518DB501E00537DE6 /* Debug */, + 32AF7D5618DB501E00537DE6 /* Release */, + ); + defaultConfigurationIsVisible = 0; + }; +/* End XCConfigurationList section */ + }; + rootObject = 32AF7D4318DB501E00537DE6 /* Project object */; +} diff --git a/example/ChaosPerformanceTester/main.cpp b/example/ChaosPerformanceTester/main.cpp new file mode 100644 index 000000000..834144f72 --- /dev/null +++ b/example/ChaosPerformanceTester/main.cpp @@ -0,0 +1,186 @@ +/* + * UIToolkitCMDLineExample.cpp + * !CHOAS + * Created by Bisegni Claudio. + * + * Copyright 2012 INFN, National Institute of Nuclear Physics + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <iostream> +#include <string> +#include <vector> +#include <chaos/common/global.h> +#include <chaos/common/chaos_constants.h> +#include <chaos/common/network/CNodeNetworkAddress.h> +#include <chaos/ui_toolkit/ChaosUIToolkit.h> +#include <chaos/ui_toolkit/LowLevelApi/LLRpcApi.h> +#include <chaos/ui_toolkit/HighLevelApi/HLDataApi.h> +#include <stdio.h> +#include <chaos/common/bson/bson.h> + +#include <boost/thread.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/date_time/posix_time/posix_time.hpp> +#include <boost/date_time/c_local_time_adjustor.hpp> + +using namespace std; +using namespace chaos; +using namespace chaos::ui; +using namespace bson; +using namespace boost; +using namespace boost::posix_time; +using namespace boost::date_time; +namespace chaos_batch = chaos::common::batch_command; + +#define OPT_NODE_ADDRESS "node_address" +#define OPT_RT_TEST "round_trip_test" +#define OPT_RT_TEST_ITER "round_trip_test_iteration" +#define OPT_TIMEOUT "timeout" + +typedef std::vector<int> OpcodeSequence; +typedef std::vector<int>::iterator OpcodeSequenceIterator; + +typedef struct RttStat { + int64_t trx_min = 0; + int64_t trx_max = 0; + int64_t rec_min = 0; + int64_t rec_max = 0; + int64_t rtt_min = 0; + int64_t rtt_max = 0; + int64_t calc_trx = 0; + int64_t calc_rec = 0; + int64_t calc_rtt = 0; +}RttStat; + +void performRTTTest(chaos::common::direct_io::DirectIOPerformanceSession *session, uint32_t iteration, uint32_t timeout); +void performRTTTestFetcher(chaos::common::direct_io::RttResultFetcher *fetcher, uint32_t iteration, RttStat *stat); + +int main (int argc, char* argv[] ) { + uint32_t timeout = 0; + OpcodeSequence command_sequence; + CNetworkAddress *device_network_address = NULL; + try { + + + //! [UIToolkit Attribute Init] + ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<string>(OPT_NODE_ADDRESS, "The netwrok address of the remote node(ip:port of rpc server)"); + ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption(OPT_RT_TEST, "Perform the roundtrip test"); + ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<uint32_t>(OPT_RT_TEST_ITER, "The number of iteration for the round tirp test"); + ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<uint32_t>(OPT_TIMEOUT, "Timeout", 2000, &timeout); + //! [UIToolkit Attribute Init] + + //! [UIToolkit Init] + ChaosUIToolkit::getInstance()->init(argc, argv); + //! [UIToolkit Init] + + if(!ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->hasOption(OPT_NODE_ADDRESS)){ + throw chaos::CException(-1, "Remote node address not set", __PRETTY_FUNCTION__); + } + + if(ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->hasOption(OPT_RT_TEST)) { + command_sequence.push_back(1); + } + device_network_address = new chaos::CNetworkAddress(); + device_network_address->ipPort = ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->getOption<std::string>(OPT_NODE_ADDRESS); + chaos::common::message::PerformanceNodeChannel *perf_channel = LLRpcApi::getInstance()->getNewPerformanceChannel(device_network_address); + chaos::common::direct_io::DirectIOPerformanceSession *session = NULL; + if(!perf_channel->getPerformanceSession(&session, timeout) && session) { + for (OpcodeSequenceIterator iter = command_sequence.begin(); + iter != command_sequence.end(); + iter++) { + switch (*iter) { + case 1: + if(ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->hasOption(OPT_RT_TEST_ITER)) { + performRTTTest(session, + ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->getOption<uint32_t>(OPT_RT_TEST_ITER), + timeout); + } else { + performRTTTest(session, + 1, + timeout); + } + + break; + + default: + break; + } + } + //clear the session + perf_channel->releasePerformanceSession(session, timeout); + }else { + std::cout<< " session not created " << std::endl; + } + + if(perf_channel) LLRpcApi::getInstance()->deleteMessageChannel(perf_channel); + } catch (CException& e) { + std::cerr << e.errorCode << " - "<< e.errorDomain << " - " << e.errorMessage << std::endl; + } + try { + //! [UIToolkit Deinit] + ChaosUIToolkit::getInstance()->deinit(); + //! [UIToolkit Deinit] + } catch (CException& e) { + std::cerr << e.errorCode << " - "<< e.errorDomain << " - " << e.errorMessage << std::endl; + } + + if(device_network_address) delete(device_network_address); + return 0; +} + +void performRTTTest(chaos::common::direct_io::DirectIOPerformanceSession *session, uint32_t iteration, uint32_t timeout) { + //ok + RttStat stat; + boost::shared_ptr<boost::thread> thread(new boost::thread(performRTTTestFetcher, session->getRttResultQueue(), iteration, &stat)); + for (uint32_t idx = 0; idx < iteration; idx++) { + if(session->sendRttTest(timeout) == -1){ + std::cout << "Err sending message"; + } + + } + thread->join(); + std::cout << std::endl; + std::cout << "Round Trip test reults----------------------------------------------------" << std::endl; + std::cout << "Transmissione ts ->" << stat.trx_min << "us to " << stat.trx_max << "us " << std::endl; + std::cout << "Receive ts ->" << stat.rec_min << " to " << stat.rec_max << "us " << std::endl; + std::cout << "Roundtrip ts ->" << stat.rtt_min << " to " << stat.rtt_max << "us " << std::endl; + std::cout << "END Round Trip test reults----------------------------------------------------" << std::endl; +} + +void performRTTTestFetcher(chaos::common::direct_io::RttResultFetcher *fetcher, uint32_t iteration, RttStat *stat) { + int idx = 0; + chaos::common::direct_io::channel::opcode_headers::DirectIOPerformanceChannelHeaderOpcodeRoundTripPtr test_result = NULL; + while (idx < iteration) { + if(fetcher->getNext(test_result)) { + boost::posix_time::ptime time = boost::posix_time::microsec_clock::local_time(); + boost::posix_time::time_duration duration( time.time_of_day() ); + stat->calc_trx = test_result->field.receiver_rt_ts - test_result->field.start_rt_ts; + stat->calc_rec = duration.total_microseconds() - test_result->field.receiver_rt_ts; + stat->calc_rtt = duration.total_microseconds() - test_result->field.start_rt_ts; + if(!idx) { + stat->trx_min = stat->trx_max = test_result->field.receiver_rt_ts - test_result->field.start_rt_ts; + stat->rec_min = stat->rec_max = duration.total_microseconds() - test_result->field.receiver_rt_ts; + stat->rtt_min = stat->rtt_max = duration.total_microseconds() - test_result->field.start_rt_ts; + } else { + stat->trx_max = std::max(stat->trx_max, stat->calc_trx); + stat->trx_min = std::min(stat->trx_min, stat->calc_trx); + stat->rec_max = std::max(stat->rec_max, stat->calc_rec); + stat->rec_min = std::min(stat->rec_min, stat->calc_rec); + stat->rtt_max = std::max(stat->rtt_max, stat->calc_rtt); + stat->rtt_min = std::min(stat->rtt_min, stat->calc_rtt); + } + idx++; + } + } +} \ No newline at end of file -- GitLab