diff --git a/CMakeLists.txt b/CMakeLists.txt index 6741eedac9089d8e2303169c7aa1a730c055b041..84c47856b26d3dd20026e6587683c85f0b42f8e2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -142,12 +142,12 @@ ENDIF() IF (CHAOS_MDS AND NOT CHAOS_ONLY_DEPENDECY) - IF("${CMAKE_CXX_COMPILE_FEATURES}" MATCHES "cxx_std_11") - MESG("Configure Chaos Data Service") - ADD_SUBDIRECTORY(ChaosMetadataService) - ELSE() - WARN("Chaos Data Service require a c11 compliant compiler") - ENDIF() + # IF("${CMAKE_CXX_COMPILE_FEATURES}" MATCHES "cxx_std_11") + MESG("Configure Chaos Data Service") + ADD_SUBDIRECTORY(ChaosMetadataService) + # ELSE() + # WARN("Chaos Data Service require a c11 compliant compiler") + # ENDIF() ENDIF() IF(NOT CHAOS_ONLY_DEPENDECY) diff --git a/chaos/common/direct_io/DirectIODispatcher.cpp b/chaos/common/direct_io/DirectIODispatcher.cpp index 5b5711c7691437fdd23b7b26301801adeba168b5..3c5a850b3f3f2cf9adf3daa9eba03e24aa40b56a 100644 --- a/chaos/common/direct_io/DirectIODispatcher.cpp +++ b/chaos/common/direct_io/DirectIODispatcher.cpp @@ -34,11 +34,11 @@ using namespace chaos::common::direct_io; #define MAX_ENDPOINT_ARRAY_SIZE (sizeof(EndpointFastDelegation*) * MAX_ENDPOINT_NUMBER) #define CLEAR_ENDPOINT_SLOT(i) \ - if(endpoint_slot_array[i]->endpoint) { \ - delete endpoint_slot_array[i]->endpoint; \ - endpoint_slot_array[i]->endpoint = NULL; \ - } \ - endpoint_slot_array[i]->enable = false; +if(endpoint_slot_array[i]->endpoint) { \ +delete endpoint_slot_array[i]->endpoint; \ +endpoint_slot_array[i]->endpoint = NULL; \ +} \ +endpoint_slot_array[i]->enable = false; DirectIODispatcher::DirectIODispatcher(): @@ -47,7 +47,7 @@ endpoint_slot_array(NULL) {} DirectIODispatcher::~DirectIODispatcher(){ CHAOS_NOT_THROW(stop();) - CHAOS_NOT_THROW(deinit();) + CHAOS_NOT_THROW(deinit();) } // Initialize instance @@ -55,18 +55,18 @@ void DirectIODispatcher::init(void *init_data) throw(chaos::CException) { //allocate memory for the endpoint array if(endpoint_slot_array==NULL){ DIOD_LDBG_<< "Allocating all memory for endpoint slot array"; - + endpoint_slot_array = (EndpointFastDelegation**)malloc(MAX_ENDPOINT_NUMBER*sizeof(EndpointFastDelegation*)); if(!endpoint_slot_array) throw chaos::CException(-1, "Error allocating memory for slot endpoint", __FUNCTION__); - - + + DIOD_LDBG_ << "Allocating all endpoint slot"; //allocate all endpoint slot for (int idx = 0; idx < MAX_ENDPOINT_NUMBER; idx++) { endpoint_slot_array[idx] = new EndpointFastDelegation(); endpoint_slot_array[idx]->enable = false; endpoint_slot_array[idx]->endpoint = NULL; - + //add this endpoint to free slot queue available_endpoint_slot.push(idx); } @@ -74,14 +74,10 @@ void DirectIODispatcher::init(void *init_data) throw(chaos::CException) { } // Start the implementation -void DirectIODispatcher::start() throw(chaos::CException) { - -} +void DirectIODispatcher::start() throw(chaos::CException) {} // Stop the implementation -void DirectIODispatcher::stop() throw(chaos::CException) { - -} +void DirectIODispatcher::stop() throw(chaos::CException) {} // Deinit the implementation void DirectIODispatcher::deinit() throw(chaos::CException) { @@ -89,14 +85,14 @@ void DirectIODispatcher::deinit() throw(chaos::CException) { DIOD_LDBG_ << "Deallocating all endpoint slot"; //allocate all endpoint slot for (int idx = 0; idx < MAX_ENDPOINT_NUMBER; idx++) { - + //clear all field of the slod ( and delete the endpoint if allocated) CLEAR_ENDPOINT_SLOT(idx); - + //delete andpoint slot delete endpoint_slot_array[idx]; } - + DIOD_LDBG_ << "Free slot array memory"; free(endpoint_slot_array); endpoint_slot_array=NULL; @@ -109,8 +105,8 @@ void DirectIODispatcher::deinit() throw(chaos::CException) { DirectIOServerEndpoint *DirectIODispatcher::getNewEndpoint() { unsigned int next_available_slot = -1; if(!available_endpoint_slot.pop(next_available_slot)) return NULL; - - + ChaosWriteLock wl(slot_mutex); + endpoint_slot_array[next_available_slot]->endpoint = new DirectIOServerEndpoint(); if(!endpoint_slot_array[next_available_slot]->endpoint) { //reallocate the slot index @@ -127,12 +123,13 @@ DirectIOServerEndpoint *DirectIODispatcher::getNewEndpoint() { //! relase an endpoint void DirectIODispatcher::releaseEndpoint(DirectIOServerEndpoint *endpoint_to_release) { if(!endpoint_to_release) return; + ChaosWriteLock wl(slot_mutex); // get slot index unsigned int slot_idx = endpoint_to_release->endpoint_route_index; - + //delete endpoint CLEAR_ENDPOINT_SLOT(slot_idx); - + //reuse the index available_endpoint_slot.push(slot_idx); } @@ -140,22 +137,26 @@ void DirectIODispatcher::releaseEndpoint(DirectIOServerEndpoint *endpoint_to_rel // Event for a new data received int DirectIODispatcher::priorityDataReceived(DirectIODataPackSPtr data_pack, DirectIODataPackSPtr& synchronous_answer) { - int err = -1; + int api_err = 0; + if(getServiceState() != 2) return -1; CHAOS_ASSERT(data_pack.get()); + ChaosReadLock wl(slot_mutex); + uint8_t opcode = data_pack->header.dispatcher_header.fields.channel_opcode; uint16_t tmp_addr = data_pack->header.dispatcher_header.fields.route_addr; uint16_t message_counter = data_pack->header.dispatcher_header.fields.counter; //convert dispatch header to correct endianes DIRECT_IO_DATAPACK_DISPATCH_HEADER_FROM_ENDIAN(data_pack.get()); - + CHAOS_ASSERT(tmp_addr == data_pack->header.dispatcher_header.fields.route_addr); if(data_pack->header.dispatcher_header.fields.route_addr>=MAX_ENDPOINT_NUMBER){ DIOD_LERR_ << "The endpoint address " << data_pack->header.dispatcher_header.fields.route_addr << "is invalid"; } else if(endpoint_slot_array[data_pack->header.dispatcher_header.fields.route_addr]->enable) { - err = endpoint_slot_array[data_pack->header.dispatcher_header.fields.route_addr]->endpoint->priorityDataReceived(ChaosMoveOperator(data_pack), - synchronous_answer); + api_err = endpoint_slot_array[data_pack->header.dispatcher_header.fields.route_addr]->endpoint->priorityDataReceived(ChaosMoveOperator(data_pack), + synchronous_answer); } else { DIOD_LERR_ << "The endpoint address " << tmp_addr << "is disable"; + return -2; } if(synchronous_answer.get()) { //set opcode for the answer @@ -163,35 +164,37 @@ int DirectIODispatcher::priorityDataReceived(DirectIODataPackSPtr data_pack, //set counter for the answer synchronous_answer->header.dispatcher_header.fields.counter = message_counter; //set error on result datapack - synchronous_answer->header.dispatcher_header.fields.err = (int16_t)err; + synchronous_answer->header.dispatcher_header.fields.err = (int16_t)api_err; //convert dispatch header to correct endianes DIRECT_IO_DATAPACK_DISPATCH_HEADER_TO_ENDIAN(synchronous_answer.get()); } - - return err; + return 0; } // Event for a new data received int DirectIODispatcher::serviceDataReceived(DirectIODataPackSPtr data_pack, DirectIODataPackSPtr& synchronous_answer) { - int err = -1; + int api_err = 0; + if(getServiceState() != 2) return -1; CHAOS_ASSERT(data_pack.get()); - + ChaosReadLock wl(slot_mutex); + uint8_t opcode = data_pack->header.dispatcher_header.fields.channel_opcode; uint16_t tmp_addr = data_pack->header.dispatcher_header.fields.route_addr; uint16_t message_counter = data_pack->header.dispatcher_header.fields.counter; //convert dispatch header to correct endianes DIRECT_IO_DATAPACK_DISPATCH_HEADER_FROM_ENDIAN(data_pack.get()); - + CHAOS_ASSERT(tmp_addr == data_pack->header.dispatcher_header.fields.route_addr); - + if(data_pack->header.dispatcher_header.fields.route_addr>=MAX_ENDPOINT_NUMBER){ DIOD_LERR_ << "The endpoint address " << data_pack->header.dispatcher_header.fields.route_addr << "is invalid"; } else if(endpoint_slot_array[data_pack->header.dispatcher_header.fields.route_addr]->enable) { - err = endpoint_slot_array[data_pack->header.dispatcher_header.fields.route_addr]->endpoint->serviceDataReceived(ChaosMoveOperator(data_pack), - synchronous_answer); + api_err = endpoint_slot_array[data_pack->header.dispatcher_header.fields.route_addr]->endpoint->serviceDataReceived(ChaosMoveOperator(data_pack), + synchronous_answer); } else { DIOD_LERR_ << "The endpoint address " << tmp_addr << "is disable"; + return -2; } if(synchronous_answer.get()) { //set opcode for the answer @@ -199,9 +202,9 @@ int DirectIODispatcher::serviceDataReceived(DirectIODataPackSPtr data_pack, //set counter for the answer synchronous_answer->header.dispatcher_header.fields.counter = message_counter; //set error on result datapack - synchronous_answer->header.dispatcher_header.fields.err = (int16_t)err; + synchronous_answer->header.dispatcher_header.fields.err = (int16_t)api_err; //convert dispatch header to correct endianes DIRECT_IO_DATAPACK_DISPATCH_HEADER_TO_ENDIAN(synchronous_answer.get()); } - return err; + return 0; } diff --git a/chaos/common/direct_io/DirectIODispatcher.h b/chaos/common/direct_io/DirectIODispatcher.h index f03b4cc2aff87aef50bcf410e360eab124eb001f..1e7dbe36623959b3501a36bd9a5d977606c566a6 100644 --- a/chaos/common/direct_io/DirectIODispatcher.h +++ b/chaos/common/direct_io/DirectIODispatcher.h @@ -21,6 +21,7 @@ #ifndef __CHAOSFramework__AbstractDirectIODispatcher__ #define __CHAOSFramework__AbstractDirectIODispatcher__ +#include <chaos/common/chaos_types.h> #include <chaos/common/direct_io/DirectIOHandler.h> #include <chaos/common/direct_io/DirectIOServerEndpoint.h> #include <chaos/common/utility/StartableService.h> @@ -33,10 +34,7 @@ namespace chaos { namespace common { namespace direct_io { class DirectIOServer; - //boost::function2<void, void*, uint32_t> delegate = priority_service? - // boost::bind(&DirectIOHandler::serviceDataReceived, handler_impl, _1, _2): - // boost::bind(&DirectIOHandler::priorityDataReceived, handler_impl, _1, _2); - //! Default dispatcher for the direct io system + class DirectIODispatcher: public common::direct_io::DirectIOHandler, public utility::StartableService { @@ -46,9 +44,9 @@ namespace chaos { bool enable; DirectIOServerEndpoint *endpoint; }; - - - //! available endpoint slotc + + //! available endpoint slot + ChaosSharedMutex slot_mutex; EndpointFastDelegation * * endpoint_slot_array; //!available index queue diff --git a/chaos/common/direct_io/impl/ZMQDirectIOServer.cpp b/chaos/common/direct_io/impl/ZMQDirectIOServer.cpp index c332f92a2cf443d764381e567e201c60449cd2e6..c3d3f2046af4b38d466f2c65e0efae587b3f071b 100644 --- a/chaos/common/direct_io/impl/ZMQDirectIOServer.cpp +++ b/chaos/common/direct_io/impl/ZMQDirectIOServer.cpp @@ -150,6 +150,7 @@ void ZMQDirectIOServer::start() throw(chaos::CException) { //! Stop the implementation void ZMQDirectIOServer::stop() throw(chaos::CException) { run_server = false; + DirectIOServer::stop(); ZMQDIO_SRV_LAPP_ << "Deallocating zmq context"; zmq_ctx_shutdown(zmq_context); zmq_ctx_term(zmq_context); @@ -159,7 +160,6 @@ void ZMQDirectIOServer::stop() throw(chaos::CException) { ZMQDIO_SRV_LAPP_ << "Join on all thread"; server_threads_group.join_all(); ZMQDIO_SRV_LAPP_ << "All thread stopped"; - DirectIOServer::stop(); } //! Deinit the implementation @@ -314,16 +314,19 @@ void ZMQDirectIOServer::worker(unsigned int w_type, //keep track if the cleint want the answer send_synchronous_answer = data_pack_received->header.dispatcher_header.fields.synchronous_answer; //call handler - err = DirectIOHandlerPtrCaller(handler_impl, delegate)(ChaosMoveOperator(data_pack_received), - data_pack_answer); - if(send_synchronous_answer && - data_pack_answer) { - - if((err = sendDatapack(worker_socket, - identity, - ChaosMoveOperator(data_pack_answer)))){ - ZMQDIO_SRV_LAPP_ << "Error sending answer with code:" << err; + if((err = DirectIOHandlerPtrCaller(handler_impl, delegate)(ChaosMoveOperator(data_pack_received), + data_pack_answer)) == 0) { + if(send_synchronous_answer && + data_pack_answer) { + + if((err = sendDatapack(worker_socket, + identity, + ChaosMoveOperator(data_pack_answer)))){ + ZMQDIO_SRV_LAPP_ << CHAOS_FORMAT("Error sending answer with code %1%", %err); + } } + } else { + ZMQDIO_SRV_LAPP_ << CHAOS_FORMAT("Error dispatching received message with code %1%", %err); } } } catch (CException& ex) {