diff --git a/.gitignore b/.gitignore index c0cb47e837c4d06c22c610ee8125bb8d0e21f580..5b47069a354d9dea93458c5f8e1e492526bbc8c4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +ccs/qrc_* .vscode/ ccs/compile_commands.json .scannerwork diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 5a4ad125bf1f9cf4bc411d96aee907bab29cd3fe..9ea9931c22a730945ad2db1c29e39ce0fdc54943 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -18,8 +18,9 @@ before_script: build_u_14_04_gcc49: tags: - - shared + - chaos - docker + - infncc stage: build image: baltig.infn.it:4567/bisegni/chaos-docker-compilation/ubuntu/14:gcc_49 script: @@ -56,8 +57,9 @@ test_u_14_04_gcc49: dependencies: - build_u_14_04_gcc49 tags: - - shared + - chaos - docker + - infncc stage: test image: baltig.infn.it:4567/bisegni/chaos-docker-compilation/ubuntu/14:gcc_49 script: @@ -99,8 +101,9 @@ test_u_14_04_gcc49: build_u_16_04_gcc5: tags: - - shared + - chaos - docker + - infncc stage: build image: baltig.infn.it:4567/bisegni/chaos-docker-compilation/ubuntu/16:llvm6 script: @@ -132,8 +135,9 @@ test_u_16_04_gcc5_test: dependencies: - build_u_16_04_gcc5 tags: - - shared + - chaos - docker + - infncc stage: test image: baltig.infn.it:4567/bisegni/chaos-docker-compilation/ubuntu/16:llvm6 script: @@ -189,8 +193,9 @@ test_u_18_04_gcc73: dependencies: - build_u_18_04_gcc73 tags: - - shared + - chaos - docker + - infncc stage: test image: baltig.infn.it:4567/bisegni/chaos-docker-compilation/ubuntu/18:gcc7.3 script: @@ -213,8 +218,9 @@ test_u_18_04_gcc73: build_dbg_c7: tags: - - shared + - chaos - docker + - infncc stage: build image: baltig.infn.it:4567/bisegni/chaos-docker-compilation/centos7:devtools7 script: @@ -246,8 +252,9 @@ test_dbg_c7: dependencies: - build_dbg_c7 tags: - - shared + - chaos - docker + - infncc stage: test image: baltig.infn.it:4567/bisegni/chaos-docker-compilation/centos7:devtools7 script: @@ -273,8 +280,9 @@ regression_c7: - test_dbg_c7 allow_failure: true tags: - - shared + - chaos - docker + - infncc stage: deploy image: baltig.infn.it:4567/chaos-lnf-control/chaos_bundle_compilation:lite script: @@ -299,8 +307,9 @@ regression_c7: build_rel_c7: tags: - - shared + - chaos - docker + - infncc stage: build image: baltig.infn.it:4567/bisegni/chaos-docker-compilation/centos7:devtools7 script: @@ -332,8 +341,9 @@ test_rel_c7: dependencies: - build_rel_c7 tags: - - shared + - chaos - docker + - infncc stage: test image: baltig.infn.it:4567/bisegni/chaos-docker-compilation/centos7:devtools7 script: @@ -359,8 +369,9 @@ deploy_rel_c7: - build_rel_c7 allow_failure: false tags: - - shared + - chaos - docker + - infncc stage: deploy image: baltig.infn.it:4567/chaos-lnf-control/chaos_bundle_compilation:lite script: @@ -384,8 +395,9 @@ deploy_rel_c7: build_u_14_04_gcc48_c98: tags: - - shared + - chaos - docker + - infncc stage: build image: baltig.infn.it:4567/bisegni/chaos-docker-compilation/ubuntu/14:gcc script: @@ -415,8 +427,9 @@ test_u_14_04_gcc48_c98: dependencies: - build_u_14_04_gcc48_c98 tags: - - shared + - chaos - docker + - infncc stage: test image: baltig.infn.it:4567/bisegni/chaos-docker-compilation/ubuntu/14:gcc script: @@ -435,8 +448,9 @@ test_u_14_04_gcc48_c98: build_arm: tags: - - shared + - chaos - docker + - infncc stage: build image: baltig.infn.it:4567/bisegni/chaos-docker-compilation:latest script: @@ -521,8 +535,9 @@ build_arm: exp_llvm_scan_coverity: tags: - - shared + - chaos - docker + - infncc stage: build image: baltig.infn.it:4567/bisegni/chaos-docker-compilation/llvm_coverity_u1710 script: diff --git a/CHAOSFramework.xcodeproj/project.pbxproj b/CHAOSFramework.xcodeproj/project.pbxproj index 4e960967513d006191a2b3d2b4b35d34254ced7c..aa7fb32b32a7ef26e9e3eac7b9a92304ce19d2b7 100644 --- a/CHAOSFramework.xcodeproj/project.pbxproj +++ b/CHAOSFramework.xcodeproj/project.pbxproj @@ -7522,6 +7522,7 @@ ., usr/local/include, ); + LD_RUNPATH_SEARCH_PATHS = "$(SRCROOT)/usr/local/lib"; LIBRARY_SEARCH_PATHS = "$(SRCROOT)/usr/local/lib"; MACOSX_DEPLOYMENT_TARGET = 10.10; MTL_ENABLE_DEBUG_INFO = YES; @@ -7568,6 +7569,7 @@ ., usr/local/include, ); + LD_RUNPATH_SEARCH_PATHS = "$(SRCROOT)/usr/local/lib"; LIBRARY_SEARCH_PATHS = "$(SRCROOT)/usr/local/lib"; MACOSX_DEPLOYMENT_TARGET = 10.10; MTL_ENABLE_DEBUG_INFO = NO; diff --git a/CMakeLists.txt b/CMakeLists.txt index 0e96073079fd0fa50d147d425b733d45c3c66237..ee498d56a02b25b90cfd5fbc9d5a1ea48997968e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -131,7 +131,8 @@ SET( ${PROJECT_NAME}_PATCH_LEVEL 0 ) # ADD_DEFINITIONS(-Wno-ignored-attributes) INCLUDE_DIRECTORIES(. ../ ../../ ${CMAKE_INSTALL_PREFIX}/include) LINK_DIRECTORIES(${CMAKE_INSTALL_PREFIX}/lib) - +LINK_DIRECTORIES(${CMAKE_INSTALL_PREFIX}/lib64) +LINK_DIRECTORIES(${CMAKE_INSTALL_PREFIX}/lib/x86_64-linux-gnu) SET(CMAKE_FIND_LIBRARY_SUFFIXES ".a") ADD_DEFINITIONS(${GCC_COVERAGE_COMPILE_FLAGS}) diff --git a/ChaosDataExport/cde.cpp b/ChaosDataExport/cde.cpp index 50b4f22c245d4fb840185144d2b6819c39e65c57..c2527ac85f31443363ff0fb6f026b9f44cd17a82 100644 --- a/ChaosDataExport/cde.cpp +++ b/ChaosDataExport/cde.cpp @@ -69,7 +69,7 @@ void sendBackForNPos(int position = 1) { } void printNumberOfExportedElement(uint32_t done) { - std::cout << CHAOS_FORMAT("\rExported %1% record", %done) << std::flush; + std::cout << CHAOS_FORMAT(" Exported %1% record \r", %done) << std::flush; } void printStep() { @@ -263,6 +263,7 @@ int main(int argc, const char* argv[]) { std::cout << "Acquiring controller" << std::endl; CUController *controller = NULL; ChaosMetadataServiceClient::getInstance()->getNewCUController(device_id, &controller); + if(!controller) throw CException(4, "Error allocating decive controller", "device controller creation"); // controller->setQueryOnIndex(true); ChaosStringSet search_tags; @@ -303,6 +304,8 @@ int main(int argc, const char* argv[]) { (*destination_stream) << std::endl; } + int64_t last_rid = 0; + int64_t last_sid = 0; if(query_cursor) { uint32_t exported = 0; std::cout << "Exported " << std::flush; @@ -312,6 +315,22 @@ int main(int argc, const char* argv[]) { if(q_result.get()) { retry = 0; SerializationBufferUPtr ser; + if(q_result->hasKey(chaos::ControlUnitNodeDefinitionKey::CONTROL_UNIT_RUN_ID) && + q_result->hasKey(chaos::DataPackCommonKey::DPCK_SEQ_ID)) { + int64_t rid = q_result->getInt64Value(chaos::ControlUnitNodeDefinitionKey::CONTROL_UNIT_RUN_ID); + int64_t sid = q_result->getInt64Value(chaos::DataPackCommonKey::DPCK_SEQ_ID); + std::cout << CHAOS_FORMAT(" rid %1% sid %2%", %rid%sid); + if(last_rid < rid && + last_sid >= sid) { + std::cout << " [SEQERR] "; + } + if(last_sid!=0 && + last_sid+1 != sid) { + std::cout << " [SEQERR + 1] "; + } + last_rid = rid; + last_sid = sid; + } //get serialization buffer by type switch (dest_type) { //BSON diff --git a/ChaosMetadataService/object_storage/mongodb/MongoDBObjectStorageDataAccess.cpp b/ChaosMetadataService/object_storage/mongodb/MongoDBObjectStorageDataAccess.cpp index 24eb1cd5cded3bfa4c58fd16c869bfc3e2410bc1..6149b15771c233304df761457f479afbccc8dfee 100644 --- a/ChaosMetadataService/object_storage/mongodb/MongoDBObjectStorageDataAccess.cpp +++ b/ChaosMetadataService/object_storage/mongodb/MongoDBObjectStorageDataAccess.cpp @@ -253,13 +253,13 @@ int MongoDBObjectStorageDataAccess::findObject(const std::string& key, chaos::DataPackCommonKey::DPCK_TIMESTAMP << BSON("$gte" << mongo::Date_t(timestamp_from) << "$lte" << mongo::Date_t(timestamp_to)) << run_key << BSON("$gte" << (long long)last_record_found_seq.run_id) << - counter_key << BSON("$gte" << (long long)last_record_found_seq.datapack_counter); + counter_key << BSON("$gt" << (long long)last_record_found_seq.datapack_counter); } else { q_builder << chaos::DataPackCommonKey::DPCK_DEVICE_ID << key << chaos::DataPackCommonKey::DPCK_TIMESTAMP << BSON("$lte" << mongo::Date_t(timestamp_from) << "$gte" << mongo::Date_t(timestamp_to)) << run_key << BSON("$lte" << (long long)last_record_found_seq.run_id) << - counter_key << BSON("$lte" << (long long)last_record_found_seq.datapack_counter); + counter_key << BSON("$lt" << (long long)last_record_found_seq.datapack_counter); } //add tags diff --git a/ChaosMetadataService/object_storage/mongodb_3/MongoDBObjectStorageDataAccess.cpp b/ChaosMetadataService/object_storage/mongodb_3/MongoDBObjectStorageDataAccess.cpp index 9bc0238efdaadb771b017336ec31b25fce7b4cc9..56e4566d65f40c6143f3c5aa98ede2bd70849b73 100644 --- a/ChaosMetadataService/object_storage/mongodb_3/MongoDBObjectStorageDataAccess.cpp +++ b/ChaosMetadataService/object_storage/mongodb_3/MongoDBObjectStorageDataAccess.cpp @@ -307,15 +307,15 @@ int MongoDBObjectStorageDataAccess::findObject(const std::string& time_builder.append(kvp("$lte", b_date(std::chrono::milliseconds(timestamp_to)))); builder.append(kvp(std::string(chaos::DataPackCommonKey::DPCK_TIMESTAMP), time_builder.view())); builder.append(kvp(run_key, make_document(kvp("$gte", last_record_found_seq.run_id)))); - builder.append(kvp(counter_key, make_document(kvp("$gte", last_record_found_seq.datapack_counter )))); - }else{ + builder.append(kvp(counter_key, make_document(kvp("$gte", last_record_found_seq.datapack_counter)))); + } else { time_builder.append(kvp("$lte", b_date(std::chrono::milliseconds(timestamp_from)))); time_builder.append(kvp("$gte", b_date(std::chrono::milliseconds(timestamp_to)))); builder.append(kvp(std::string(chaos::DataPackCommonKey::DPCK_TIMESTAMP), time_builder.view())); builder.append(kvp(run_key, make_document(kvp("$lte", last_record_found_seq.run_id)))); - builder.append(kvp(counter_key, make_document(kvp("$lte", last_record_found_seq.datapack_counter )))); - } + builder.append(kvp(counter_key, make_document(kvp("$lte", last_record_found_seq.datapack_counter)))); + } if(meta_tags.size()) { auto array_builder = bsoncxx::builder::basic::array{}; for(auto& it: meta_tags) { @@ -386,13 +386,16 @@ int MongoDBObjectStorageDataAccess::findObjectIndex(const DataSearch& search, if(reverse_order == false) { time_builder.append(kvp("$gte", b_date(std::chrono::milliseconds(search.timestamp_from)))); time_builder.append(kvp("$lte", b_date(std::chrono::milliseconds(search.timestamp_to)))); + builder.append(kvp(std::string(chaos::DataPackCommonKey::DPCK_TIMESTAMP), time_builder.view())); + builder.append(kvp(run_key, make_document(kvp("$gte", last_record_found_seq.run_id)))); + builder.append(kvp(counter_key, make_document(kvp("$gte", last_record_found_seq.datapack_counter)))); }else{ time_builder.append(kvp("$lte", b_date(std::chrono::milliseconds(search.timestamp_from)))); time_builder.append(kvp("$gte", b_date(std::chrono::milliseconds(search.timestamp_to)))); + builder.append(kvp(std::string(chaos::DataPackCommonKey::DPCK_TIMESTAMP), time_builder.view())); + builder.append(kvp(run_key, make_document(kvp("$lte", last_record_found_seq.run_id)))); + builder.append(kvp(counter_key, make_document(kvp("$lte", last_record_found_seq.datapack_counter)))); } - builder.append(kvp(std::string(chaos::DataPackCommonKey::DPCK_TIMESTAMP), time_builder.view())); - builder.append(kvp(run_key, make_document(kvp("$gte", last_record_found_seq.run_id)))); - builder.append(kvp(counter_key, make_document(kvp("$gte", last_record_found_seq.datapack_counter )))); if(search.meta_tags.size()) { auto array_builder = bsoncxx::builder::basic::array{}; for(auto& it: search.meta_tags) { @@ -495,6 +498,5 @@ int MongoDBObjectStorageDataAccess::countObject(const std::string& key, const uint64_t& object_count) { int err = 0; auto client = pool_ref.acquire(); - return err; } diff --git a/chaos/common/CMakeLists.txt b/chaos/common/CMakeLists.txt index 1f5271ce73008cb4b7ce4963957f82eee6564ff7..24eb6397df481d32c47ceb87c14cfa653cbf970b 100644 --- a/chaos/common/CMakeLists.txt +++ b/chaos/common/CMakeLists.txt @@ -6,8 +6,25 @@ IF(NOT CHAOS_DISABLE_EVENTFD) ELSE() SET(ZMQ_FLAGS "") ENDIF() -CheckConfigureBuild(zmq zeromq4-1 "${ZMQ_FLAGS}" https://github.com/zeromq) +IF (CHAOS_TARGET) +MESG("Configure libzmq with autotools") +CheckConfigureBuild(zmq libzmq "${ZMQ_FLAGS}" https://github.com/zeromq) +ELSE() +MESG("Configure libzmq as CMake") +ExternalProject_Add( + libzmq + GIT_REPOSITORY https://github.com/zeromq/libzmq.git + GIT_TAG v4.3.1 + PREFIX "${CMAKE_BINARY_DIR}/ext_dep/libzmq-prefix" + SOURCE_DIR "${CMAKE_BINARY_DIR}/ext_dep/libzmq-src" + BINARY_DIR "${CMAKE_BINARY_DIR}/ext_dep/libzmq-build" + CMAKE_ARGS + -DCMAKE_INSTALL_PREFIX:PATH=${CMAKE_INSTALL_PREFIX} -DCMAKE_BUILD_TYPE=Release -DENABLE_DRAFTS=1 + LOG_DOWNLOAD ON + LOG_CONFIGURE ON + LOG_BUILD ON) +ENDIF() IF (CLING_VIRTUAL_MACHINE_ENABLE) MESG("Using ${CMAKE_BINARY_DIR}/ext_dep/cling-src for download cling") @@ -47,6 +64,7 @@ IF (CLING_VIRTUAL_MACHINE_ENABLE) MESG("Searching cling cmake module in ${CMAKE_MODULE_PATH}") # This project needs cling. find_library(Cling REQUIRED) + add_dependencies(${PROJECT_NAME} cling) ENDIF() ENDIF() @@ -352,6 +370,10 @@ ELSE () ADD_LIBRARY(${PROJECT_NAME} SHARED ${common_lib_src}) set_target_properties(${PROJECT_NAME} PROPERTIES VERSION 1.0.0 SOVERSION 1) ENDIF () + +#add dependency to libzmq +add_dependencies(${PROJECT_NAME} libzmq) + IF (CLING_VIRTUAL_MACHINE) add_dependencies(${PROJECT_NAME} cling) ENDIF() diff --git a/chaos/common/configuration/GlobalConfiguration.cpp b/chaos/common/configuration/GlobalConfiguration.cpp index c4484fbd29de872d669d86bdd978d7bec022eb35..d5377eead05b2179024b6daceac52fb42378a0c2 100644 --- a/chaos/common/configuration/GlobalConfiguration.cpp +++ b/chaos/common/configuration/GlobalConfiguration.cpp @@ -71,7 +71,7 @@ void GlobalConfiguration::preParseStartupParameters() { addOption(InitOption::OPT_DIRECT_IO_IMPLEMENTATION, po::value< string >()->default_value("ZMQ"), "Specify the direct io implementation"); addOption(InitOption::OPT_DIRECT_IO_PRIORITY_SERVER_PORT, po::value<uint32_t>()->default_value(_DIRECT_IO_PRIORITY_PORT), "DirectIO priority server port"); addOption(InitOption::OPT_DIRECT_IO_SERVICE_SERVER_PORT, po::value<uint32_t>()->default_value(_DIRECT_IO_SERVICE_PORT), "DirectIO service server port"); - addOption(InitOption::OPT_DIRECT_IO_SERVER_THREAD_NUMBER, po::value<uint32_t>()->default_value(2),"DirectIO server thread number"); + addOption(InitOption::OPT_DIRECT_IO_SERVER_THREAD_NUMBER, po::value<uint32_t>()->default_value(1),"DirectIO server thread number"); addOption(InitOption::OPT_DIRECT_IO_SERVER_IMPL_KV_PARAM, po::value< std::vector<std::string> >(),"DirectIO implementation key value parameters[k:v]"); addOption(InitOption::OPT_DIRECT_IO_CLIENT_IMPL_KV_PARAM, po::value< std::vector<std::string> >(),"DirectIO implementation key value parameters[k:v]"); addOption(InitOption::OPT_DIRECT_IO_LOG_METRIC, po::value< bool >()->zero_tokens(), "Enable the logging of the DirectIO metric"); diff --git a/chaos/common/direct_io/DirectIODataPack.h b/chaos/common/direct_io/DirectIODataPack.h index b9f5e251164e02bc064d41d37c27368c6b9b3aeb..cd55238c1426f23ac643ec39394388a3498ee12d 100644 --- a/chaos/common/direct_io/DirectIODataPack.h +++ b/chaos/common/direct_io/DirectIODataPack.h @@ -45,8 +45,8 @@ namespace chaos { #define DIRECT_IO_DISPATCHER_HEADER_SIZE 10 #define DIRECT_IO_GET_DISPATCHER_DATA(d) chaos::common::utility::byte_swap<chaos::common::utility::little_endian, chaos::common::utility::host_endian, uint64_t>(*((uint64_t*)d)); -#define DIRECT_IO_GET_CHANNEL_HEADER_SIZE(d) chaos::common::utility::byte_swap<chaos::common::utility::little_endian, chaos::common::utility::host_endian, uint32_t>(*((uint32_t*)((char*)d+12))); -#define DIRECT_IO_GET_CHANNEL_DATA_SIZE(d) chaos::common::utility::byte_swap<chaos::common::utility::little_endian, chaos::common::utility::host_endian, uint32_t>(*((uint32_t*)((char*)d+16))); +#define DIRECT_IO_GET_CHANNEL_HEADER_SIZE(d) chaos::common::utility::byte_swap<chaos::common::utility::little_endian, chaos::common::utility::host_endian, uint32_t>(d); +#define DIRECT_IO_GET_CHANNEL_DATA_SIZE(d) chaos::common::utility::byte_swap<chaos::common::utility::little_endian, chaos::common::utility::host_endian, uint32_t>(d); #define DIRECT_IO_SET_DISPATCHER_DATA(d) chaos::common::utility::byte_swap<chaos::common::utility::host_endian, chaos::common::utility::little_endian, uint64_t>(d); #define DIRECT_IO_SET_CHANNEL_HEADER_SIZE(d) chaos::common::utility::byte_swap<chaos::common::utility::host_endian, chaos::common::utility::little_endian, uint32_t>(d); diff --git a/chaos/common/direct_io/impl/ZMQBaseClass.cpp b/chaos/common/direct_io/impl/ZMQBaseClass.cpp index e3f6510dd62bc8d7aa66a4cceaaaf3834958407b..74f564778932a7ad6983b5132a4705f9789d5ba4 100644 --- a/chaos/common/direct_io/impl/ZMQBaseClass.cpp +++ b/chaos/common/direct_io/impl/ZMQBaseClass.cpp @@ -45,6 +45,18 @@ ZMQDIO_BASE_LERR_<< message;\ return err;\ } +#define EXIT_IF_NO_MORE_MESSAGE_MSG_T(msg_t, err, message) \ +if(!moreMessageToRead(msg_t)){ \ +ZMQDIO_BASE_LERR_<< message;\ +return err; \ +} + +#define EXIT_ON_ASSERT(assert, err, message) \ +if(!(assert)){ \ +ZMQDIO_BASE_LERR_<< message;\ +return err; \ +} + #define SYNC_DELETE_HEADER_AND_DATA(mem,dealloc,part,opcode)\ DirectIOForwarder::freeSentData(mem,\ new DisposeSentMemoryInfo(dealloc,\ @@ -184,48 +196,111 @@ int ZMQBaseClass::closeSocketNoWhait (void *socket) { } int ZMQBaseClass::readMessage(void *socket, - BufferSPtr& msg_buffer) { + zmq_msg_t& message) { int err = 0; - /* Create an empty ØMQ message to hold the message part */ - zmq_msg_t part; - if((err = zmq_msg_init(&part))) { + if((err = zmq_msg_init(&message))) { err = zmq_errno(); ZMQDIO_BASE_LERR_ << "Error initilizing message" << PRINT_ZMQ_ERR(err); return err; } /* Block until a message is available to be received from socket */ - if((err = zmq_msg_recv(&part, socket, 0))) { + if((err = zmq_msg_recv(&message, socket, 0)) <= 0) { err = zmq_errno(); ZMQDIO_BASE_LERR_ << "Error receiving message" << PRINT_ZMQ_ERR(err); + } else { + err = 0; } - msg_buffer = ChaosMakeSharedPtr<Buffer>(); - msg_buffer->append(zmq_msg_data(&part), zmq_msg_size(&part)); - zmq_msg_close(&part); return err; } -int ZMQBaseClass::readMessage(void * socket, - void *message_data, - size_t message_max_size, - size_t& message_size_read) { +int ZMQBaseClass::readMessage(void *socket, + BufferSPtr& buffer, + bool& has_next) { int err = 0; - //no we can read the message - if((err = zmq_recv(socket, - message_data, - message_max_size, - 0)) == -1) { - //we got an error + has_next = false; + zmq_msg_t message; + if((err = zmq_msg_init(&message))) { err = zmq_errno(); - ZMQDIO_BASE_LERR_ << "Error receiving data from socket with code:" << PRINT_ZMQ_ERR(err); - message_size_read=0; + ZMQDIO_BASE_LERR_ << "Error initilizing message" << PRINT_ZMQ_ERR(err); return err; + } + + /* Block until a message is available to be received from socket */ + if((err = zmq_msg_recv(&message, socket, 0)) <= 0) { + err = zmq_errno(); + ZMQDIO_BASE_LERR_ << "Error receiving message" << PRINT_ZMQ_ERR(err); } else { - //take the readed byte - message_size_read = err; + err = 0; + //extract buffer from zmq message + buffer = zmqMsgToBufferShrdPtr(message); + //check if we have other message + has_next = moreMessageToRead(message); + if((err = zmq_msg_close(&message)) != 0) { + err = zmq_errno(); + ZMQDIO_BASE_LERR_ << "Error closing message" << PRINT_ZMQ_ERR(err); + } + err = 0; } - //return success - return 0; + return err; +} + +int ZMQBaseClass::readMessage(void *socket, + std::string& buffer, + bool& has_next, + std::string *peer_ip) { + int err = 0; + has_next = false; + zmq_msg_t message; + if((err = zmq_msg_init(&message))) { + err = zmq_errno(); + ZMQDIO_BASE_LERR_ << "Error initilizing message" << PRINT_ZMQ_ERR(err); + return err; + } + + /* Block until a message is available to be received from socket */ + if((err = zmq_msg_recv(&message, socket, 0)) <= 0) { + err = zmq_errno(); + ZMQDIO_BASE_LERR_ << "Error receiving message" << PRINT_ZMQ_ERR(err); + } else { + err = 0; + buffer.assign((const char *)zmq_msg_data(&message), zmq_msg_size(&message)); + + if(peer_ip) { + if(zmq_has("draft")) { + const char * ip = zmq_msg_gets(&message, "Peer-Address"); + if(ip) { + (*peer_ip) = ip; + } + } else { + peer_ip->assign("no draft zmq support"); + } + } + + //check if we have other message + has_next = moreMessageToRead(message); + if((err = zmq_msg_close(&message)) != 0) { + err = zmq_errno(); + ZMQDIO_BASE_LERR_ << "Error closing message" << PRINT_ZMQ_ERR(err); + } + err = 0; + } + return err; +} + +int ZMQBaseClass::sendMessage(void *socket, + zmq_msg_t& message, + int flag) { + int err = 0; + //send data + if((err = zmq_msg_send(&message, socket, flag)) == -1){ + err = zmq_errno(); + ZMQDIO_BASE_LERR_ << "Error sending message with error:" << PRINT_ZMQ_ERR(err); + } else { + //reset the error + err = 0; + } + return err; } //! send a new message from zmq socket @@ -250,7 +325,7 @@ int ZMQBaseClass::sendMessage(void *socket, ZMQDIO_BASE_LERR_ << "Error initializing message with error:" << PRINT_ZMQ_ERR(err); } else { //send data - if((err = zmq_sendmsg(socket, &message, more_to_send?ZMQ_SNDMORE:ZMQ_DONTWAIT)) == -1){ + if((err = sendMessage(socket, message, more_to_send?ZMQ_SNDMORE:ZMQ_DONTWAIT)) == -1){ err = zmq_errno(); ZMQDIO_BASE_LERR_ << "Error sending message with error:" << PRINT_ZMQ_ERR(err); } else { @@ -281,7 +356,7 @@ int ZMQBaseClass::sendMessage(void *socket, message_data, message_size); //send data - if((err = zmq_sendmsg(socket, &message, more_to_send?ZMQ_SNDMORE:ZMQ_DONTWAIT)) == -1){ + if((err = zmq_msg_send(&message, socket, more_to_send?ZMQ_SNDMORE:ZMQ_DONTWAIT)) == -1){ err = zmq_errno(); ZMQDIO_BASE_LERR_ << "Error sending message with error:" << PRINT_ZMQ_ERR(err); } else { @@ -294,56 +369,12 @@ int ZMQBaseClass::sendMessage(void *socket, return err; } -int ZMQBaseClass::moreMessageToRead(void * socket, - bool& more_to_read) { - int err = 0; - int option_result = 0; - size_t size_int = sizeof(int); - more_to_read=false; - //we heva received the message now check the size aspected - if((err = zmq_getsockopt(socket, ZMQ_RCVMORE, &option_result, &size_int))) { - err = zmq_errno(); - ZMQDIO_BASE_LERR_ << "Error checking if are present more submessage with error:" << PRINT_ZMQ_ERR(err); - } else { - more_to_read = (bool)option_result; - } - return err; -} - -// Receive 0MQ string from socket and convert into C string -// Caller must free returned string. Returns NULL if the context -// is being terminated. -int ZMQBaseClass::stringReceive(void *socket, std::string& received_string) { - char buffer [256]; - size_t readed_byte = 0; - buffer[sizeof(buffer)-1]=0; - buffer[0]=0; - - //read message and check the error - int err = readMessage(socket, buffer, sizeof(buffer)-1, readed_byte); - if(err) return err; - - //we got string so cap it for nullify at the end - buffer[readed_byte] = 0; - //return the string - received_string = buffer; - //return success - return 0; -} - -// Convert C string to 0MQ string and send to socket -int ZMQBaseClass::stringSend(void *socket, const char *string) { - return sendMessage(socket, (void*)string, strlen(string), false); +bool ZMQBaseClass::moreMessageToRead(zmq_msg_t& cur_msg) { + return zmq_msg_more(&cur_msg); } -// Sends string as 0MQ string, as multipart non-terminal -int ZMQBaseClass::stringSendMore(void *socket, const char *string) { - return sendMessage(socket, (void*)string, strlen(string), true); -} - -int ZMQBaseClass::setID(void *socket) { - std::string uid = UUIDUtil::generateUUIDLite(); - return zmq_setsockopt (socket, ZMQ_IDENTITY, uid.c_str(), uid.size()); +BufferSPtr ZMQBaseClass::zmqMsgToBufferShrdPtr(zmq_msg_t& msg) { + return ChaosMakeSharedPtr<Buffer>((const char*)zmq_msg_data(&msg), zmq_msg_size(&msg)); } int ZMQBaseClass::setAndReturnID(void *socket, @@ -351,57 +382,27 @@ int ZMQBaseClass::setAndReturnID(void *socket, new_id = UUIDUtil::generateUUIDLite(); return zmq_setsockopt (socket, ZMQ_IDENTITY, new_id.c_str(), new_id.size()); } - -int ZMQBaseClass::resetOutputQueue(void *socket, - MapZMQConfiguration &default_conf, - const MapZMQConfiguration &startup_conf) { - int err = 0; - int prop_value = 0; - err = zmq_setsockopt(socket, ZMQ_RCVHWM, &prop_value, sizeof(int)); - if(err == 0) { - err = setSocketOption(socket, - default_conf, - startup_conf, - ZMQ_RCVHWM, - "ZMQ_RCVHWM", - "resetOutputQueue"); - } - return err; -} - -int ZMQBaseClass::sendStartEnvelop(void *socket) { - //sending envelop delimiter - return stringSendMore(socket, EmptyMessage); -} - -int ZMQBaseClass::receiveStartEnvelop(void *socket) { - std::string empty_delimiter; - int err = 0; - if((err = stringReceive(socket, empty_delimiter))){ - return err; - } - //assert on different delimiter size - if(empty_delimiter.size() != 0) { - ZMQDIO_BASE_LERR_<<"invalid start envelop"; - err = -10000; - } - return err; -} - +#pragma mark High Level Api int ZMQBaseClass::reveiceDatapack(void *socket, std::string& identity, DirectIODataPackSPtr& data_pack_handle) { //read first the identity int err = 0; - if((err = stringReceive(socket, identity))) { + bool has_more = false; + std::string peer_ip; + identity.clear(); + if((err = readMessage(socket, + identity, + has_more, + &peer_ip))) { + ZMQDIO_BASE_LERR_<< CHAOS_FORMAT("Error %1% receiving identity form peer:%2%", %err%peer_ip); + //try to get the ip of source peer return err; } - //check the identity size that need to be different from zero - if(identity.size()==0){ - ZMQDIO_BASE_LERR_<<" malformed packet, empty identity"; - return -12000; - } + //check for error + EXIT_ON_ASSERT(identity.size(), -12000, "Malformed packet, empty identity"); + EXIT_ON_ASSERT(has_more, -12001, CHAOS_FORMAT("No more message after identity for peer %1%",%peer_ip)); //read the direct io datapack on zmq messages return reveiceDatapack(socket, @@ -410,41 +411,30 @@ int ZMQBaseClass::reveiceDatapack(void *socket, int ZMQBaseClass::reveiceDatapack(void *socket, DirectIODataPackSPtr& data_pack_handle) { - int err = 0; - std::string empty_delimiter; - bool have_more_message = false; - size_t readed_byte; - char header_buffer[DIRECT_IO_HEADER_SIZE]; - //receive the zmq evenlop delimiter - if((err = receiveStartEnvelop(socket))) { - return err; - } - - //check if we have other message - EXIT_IF_NO_MORE_MESSAGE(-13000, "No other message after envelop"); - + int err = 0; + bool has_more_messages = false; + BufferSPtr header_buffer; //read header - if((err = readMessage(socket, header_buffer, DIRECT_IO_HEADER_SIZE, readed_byte))){ + if((err = readMessage(socket, + header_buffer, + has_more_messages))){ + ZMQDIO_BASE_LERR_<< CHAOS_FORMAT("Errore reading header with error %1% ", %err); return err; } - if(DIRECT_IO_HEADER_SIZE != readed_byte) { - ZMQDIO_BASE_LERR_<< "The header read phase has reported a different size of '"<<readed_byte<<"' bytes"; + if(DIRECT_IO_HEADER_SIZE != header_buffer->size()) { + ZMQDIO_BASE_LERR_<< CHAOS_FORMAT("The header read phase has reported a different size of %1% bytes insead of %2%",%header_buffer->size()%DIRECT_IO_HEADER_SIZE); //consume other messages if are present because the request is not conform to protocols - do { - have_more_message = false; - if((err = moreMessageToRead(socket, have_more_message))) { - ZMQDIO_BASE_LAPP_ << "Error reading if there are other mesages to read"; + while(has_more_messages) { + //!consume messages + BufferSPtr tmp_buffer; + if((err = readMessage(socket, + tmp_buffer, + has_more_messages))) { + ZMQDIO_BASE_LAPP_ << "Error consuming unrecognized messages"; break; - } else if(have_more_message) { - //!consume messages - BufferSPtr msg_buffer; - if((err = readMessage(socket, msg_buffer))) { - ZMQDIO_BASE_LAPP_ << "Error consuming unrecognized messages"; - break; - } } - }while(have_more_message); + }; return -13003; } @@ -452,75 +442,86 @@ int ZMQBaseClass::reveiceDatapack(void *socket, data_pack_handle = ChaosMakeSharedPtr<DirectIODataPack>(); //manage little endina conversion for header - memcpy(&data_pack_handle->header, header_buffer, sizeof(DirectIODataPackDispatchHeader_t)); + memcpy(&data_pack_handle->header, + header_buffer->data(), + header_buffer->size()); DIRECT_IO_DATAPACK_DISPATCH_HEADER_FROM_ENDIAN(data_pack_handle); - data_pack_handle->header.channel_data_size = DIRECT_IO_GET_CHANNEL_DATA_SIZE(header_buffer); - data_pack_handle->header.channel_header_size = DIRECT_IO_GET_CHANNEL_HEADER_SIZE(header_buffer); + data_pack_handle->header.channel_data_size = DIRECT_IO_GET_CHANNEL_DATA_SIZE(data_pack_handle->header.channel_data_size); + data_pack_handle->header.channel_header_size = DIRECT_IO_GET_CHANNEL_HEADER_SIZE(data_pack_handle->header.channel_header_size); //check what i need to reice switch(data_pack_handle->header.dispatcher_header.fields.channel_part) { case DIRECT_IO_CHANNEL_PART_EMPTY: break; - case DIRECT_IO_CHANNEL_PART_HEADER_ONLY: - + case DIRECT_IO_CHANNEL_PART_HEADER_ONLY:{ //check if we have header message - EXIT_IF_NO_MORE_MESSAGE(-13002, "No other message after header for DIRECT_IO_CHANNEL_PART_HEADER_ONLY"); - - //init header data buffer - data_pack_handle->channel_header_data = ChaosMakeSharedPtr<Buffer>(data_pack_handle->header.channel_header_size); - - //read the channel header + EXIT_ON_ASSERT(has_more_messages, -13002, "No other message after header for DIRECT_IO_CHANNEL_PART_HEADER_ONLY"); + //get hedaer part if((err = readMessage(socket, - data_pack_handle->channel_header_data->data(), - data_pack_handle->header.channel_header_size, - readed_byte))){ + data_pack_handle->channel_header_data, + has_more_messages))) { ZMQDIO_BASE_LERR_<< "Error reading the channel header with code:"<<err; + } else { + if(data_pack_handle->channel_header_data->size() != + data_pack_handle->header.channel_header_size) { + ZMQDIO_BASE_LERR_<< CHAOS_FORMAT("Header part size received %1% expeted %2%", %data_pack_handle->channel_header_data->size()%data_pack_handle->header.channel_header_size); + err = -13003; + } } break; - case DIRECT_IO_CHANNEL_PART_DATA_ONLY: - + } + case DIRECT_IO_CHANNEL_PART_DATA_ONLY:{ //check if we have data message - EXIT_IF_NO_MORE_MESSAGE(-13003, "No other message after header for DIRECT_IO_CHANNEL_PART_DATA_ONLY"); - - //init data buffer - data_pack_handle->channel_data = ChaosMakeSharedPtr<Buffer>(data_pack_handle->header.channel_data_size); - + EXIT_ON_ASSERT(has_more_messages, -13004, "No other message after header for DIRECT_IO_CHANNEL_PART_DATA_ONLY"); //read data part if((err = readMessage(socket, - data_pack_handle->channel_data->data(), - data_pack_handle->header.channel_data_size, - readed_byte))){ - ZMQDIO_BASE_LERR_<< "Error reading the channel header with code:"<<err; + data_pack_handle->channel_data, + has_more_messages))) { + ZMQDIO_BASE_LERR_<< "Error reading the channel data with code:"<<err; + } else { + if(data_pack_handle->channel_data->size() != + data_pack_handle->header.channel_data_size) { + ZMQDIO_BASE_LERR_<< CHAOS_FORMAT("Data part size received %1% expeted %2%:", %data_pack_handle->channel_data->size()%data_pack_handle->header.channel_data_size); + err = -13005; + } } break; - case DIRECT_IO_CHANNEL_PART_HEADER_DATA: + } + case DIRECT_IO_CHANNEL_PART_HEADER_DATA:{ //check if we have header message - EXIT_IF_NO_MORE_MESSAGE(-13004, "No other message after header for DIRECT_IO_CHANNEL_PART_HEADER_DATA"); - - //allocate header buffer - data_pack_handle->channel_header_data = ChaosMakeSharedPtr<Buffer>(data_pack_handle->header.channel_header_size); + EXIT_ON_ASSERT(has_more_messages, -13006, "No other message after header for DIRECT_IO_CHANNEL_PART_HEADER_DATA"); + //read header part if((err = readMessage(socket, - data_pack_handle->channel_header_data->data(), - data_pack_handle->header.channel_header_size, - readed_byte))){ - ZMQDIO_BASE_LERR_<< "Error reading the channel header with code:"<<err; + data_pack_handle->channel_header_data, + has_more_messages))) { + ZMQDIO_BASE_LERR_<< CHAOS_FORMAT("Error reading the channel header with code: %1%",%err); } else { - //check if we have data message - EXIT_IF_NO_MORE_MESSAGE(-13005, "No other message after header for DIRECT_IO_CHANNEL_PART_HEADER_DATA"); - - //allocate data buffer - data_pack_handle->channel_data = ChaosMakeSharedPtr<Buffer>(data_pack_handle->header.channel_data_size); - - //read data part - if((err = readMessage(socket, - data_pack_handle->channel_data->data(), - data_pack_handle->header.channel_data_size, - readed_byte))){ - ZMQDIO_BASE_LERR_<< "Error reading the channel header with code:"<<err; + if(data_pack_handle->channel_header_data->size() != + data_pack_handle->header.channel_header_size) { + ZMQDIO_BASE_LERR_<< CHAOS_FORMAT("Header part size received %1% expeted %2%:", %data_pack_handle->channel_header_data->size()%data_pack_handle->header.channel_header_size); + err = -13007; + } else { + //check if we have data message + EXIT_ON_ASSERT(has_more_messages, -13008, "No other message after header for DIRECT_IO_CHANNEL_PART_HEADER_DATA"); + + //read data part + if((err = readMessage(socket, + data_pack_handle->channel_data, + has_more_messages))) { + ZMQDIO_BASE_LERR_<< "Error reading the channel data with code:"<<err; + } else { + //check size of data + if(data_pack_handle->channel_data->size() != + data_pack_handle->header.channel_data_size) { + ZMQDIO_BASE_LERR_<< CHAOS_FORMAT("Data part size received %1% expeted %2%:", %data_pack_handle->channel_data->size()%data_pack_handle->header.channel_data_size); + err = -13009; + } + } } } break; + } } return err; } @@ -530,7 +531,7 @@ int ZMQBaseClass::sendDatapack(void *socket, DirectIODataPackSPtr data_pack) { //send identity int err = 0; - if((err = stringSendMore(socket, identity.c_str()))) { + if((err = sendMessage(socket, (void*)identity.c_str(), identity.size(), true))) { return err; } //read the direct io datapack with zmq messages @@ -548,76 +549,71 @@ int ZMQBaseClass::sendDatapack(void *socket, data_pack->header.channel_header_size = DIRECT_IO_SET_CHANNEL_HEADER_SIZE(data_pack->header.channel_header_size); data_pack->header.channel_data_size = DIRECT_IO_SET_CHANNEL_DATA_SIZE(data_pack->header.channel_data_size); - //send first the direct io envelop delimiter - if((err = sendStartEnvelop(socket))) { - return err; - } else { - //envelope has been sent so we can send direct io messages - switch(data_pack->header.dispatcher_header.fields.channel_part) { - case DIRECT_IO_CHANNEL_PART_EMPTY: - if((err = sendMessage(socket, &data_pack->header, DIRECT_IO_HEADER_SIZE, false))) { - ZMQDIO_BASE_LERR_ << "Error sending header part:"<< PRINT_ZMQ_ERR(err); - } - break; - - case DIRECT_IO_CHANNEL_PART_HEADER_ONLY: - if((err = sendMessage(socket, &data_pack->header, DIRECT_IO_HEADER_SIZE, true))) { - ZMQDIO_BASE_LERR_ << "Error sending header part:"<< PRINT_ZMQ_ERR(err); - } else if((err = sendMessage(socket, - (void*)data_pack->channel_header_data->data(), - (size_t)data_pack->header.channel_header_size, - zqmFreeSentData, - new DisposeSentMemoryInfo(data_pack->channel_header_data, - DisposeSentMemoryInfo::SentPartHeader, - sending_opcode), - false))){ - ZMQDIO_BASE_LERR_ << "Error sending channel header part:"<< PRINT_ZMQ_ERR(err); - } - break; - - case DIRECT_IO_CHANNEL_PART_DATA_ONLY: - if((err = sendMessage(socket, &data_pack->header, DIRECT_IO_HEADER_SIZE, true))) { - ZMQDIO_BASE_LERR_ << "Error sending header part:"<< PRINT_ZMQ_ERR(err); - } else if((err = sendMessage(socket, - data_pack->channel_data->data(), - (size_t)data_pack->header.channel_data_size, - zqmFreeSentData, - new DisposeSentMemoryInfo(data_pack->channel_data, - DisposeSentMemoryInfo::SentPartData, - sending_opcode), - false))){ - ZMQDIO_BASE_LERR_ << "Error sending channel data part:"<< PRINT_ZMQ_ERR(err); - } - break; - - case DIRECT_IO_CHANNEL_PART_HEADER_DATA: - if((err = sendMessage(socket, &data_pack->header, DIRECT_IO_HEADER_SIZE, true))) { - ZMQDIO_BASE_LERR_ << "Error sending header part:"<< PRINT_ZMQ_ERR(err); - } else if((err = sendMessage(socket, - data_pack->channel_header_data->data(), - (size_t)data_pack->header.channel_header_size, - zqmFreeSentData, - new DisposeSentMemoryInfo(data_pack->channel_header_data, - DisposeSentMemoryInfo::SentPartHeader, - sending_opcode), - true))){ - ZMQDIO_BASE_LERR_ << "Error sending channel header part:"<< PRINT_ZMQ_ERR(err); - //error sending header data - } else if((err = sendMessage(socket, - data_pack->channel_data->data(), - (size_t)data_pack->header.channel_data_size, - zqmFreeSentData, - new DisposeSentMemoryInfo(data_pack->channel_data, - DisposeSentMemoryInfo::SentPartData, - sending_opcode), - false))){ - ZMQDIO_BASE_LERR_ << "Error sending channel data part:"<< PRINT_ZMQ_ERR(err); - if(data_pack->channel_data != NULL) { - ZMQDIO_BASE_LERR_ << "Free the channel data memory"; - } + //envelope has been sent so we can send direct io messages + switch(data_pack->header.dispatcher_header.fields.channel_part) { + case DIRECT_IO_CHANNEL_PART_EMPTY: + if((err = sendMessage(socket, &data_pack->header, DIRECT_IO_HEADER_SIZE, false))) { + ZMQDIO_BASE_LERR_ << "Error sending header part:"<< PRINT_ZMQ_ERR(err); + } + break; + + case DIRECT_IO_CHANNEL_PART_HEADER_ONLY: + if((err = sendMessage(socket, &data_pack->header, DIRECT_IO_HEADER_SIZE, true))) { + ZMQDIO_BASE_LERR_ << "Error sending header part:"<< PRINT_ZMQ_ERR(err); + } else if((err = sendMessage(socket, + (void*)data_pack->channel_header_data->data(), + (size_t)data_pack->header.channel_header_size, + zqmFreeSentData, + new DisposeSentMemoryInfo(data_pack->channel_header_data, + DisposeSentMemoryInfo::SentPartHeader, + sending_opcode), + false))){ + ZMQDIO_BASE_LERR_ << "Error sending channel header part:"<< PRINT_ZMQ_ERR(err); + } + break; + + case DIRECT_IO_CHANNEL_PART_DATA_ONLY: + if((err = sendMessage(socket, &data_pack->header, DIRECT_IO_HEADER_SIZE, true))) { + ZMQDIO_BASE_LERR_ << "Error sending header part:"<< PRINT_ZMQ_ERR(err); + } else if((err = sendMessage(socket, + data_pack->channel_data->data(), + (size_t)data_pack->header.channel_data_size, + zqmFreeSentData, + new DisposeSentMemoryInfo(data_pack->channel_data, + DisposeSentMemoryInfo::SentPartData, + sending_opcode), + false))){ + ZMQDIO_BASE_LERR_ << "Error sending channel data part:"<< PRINT_ZMQ_ERR(err); + } + break; + + case DIRECT_IO_CHANNEL_PART_HEADER_DATA: + if((err = sendMessage(socket, &data_pack->header, DIRECT_IO_HEADER_SIZE, true))) { + ZMQDIO_BASE_LERR_ << "Error sending header part:"<< PRINT_ZMQ_ERR(err); + } else if((err = sendMessage(socket, + data_pack->channel_header_data->data(), + (size_t)data_pack->header.channel_header_size, + zqmFreeSentData, + new DisposeSentMemoryInfo(data_pack->channel_header_data, + DisposeSentMemoryInfo::SentPartHeader, + sending_opcode), + true))){ + ZMQDIO_BASE_LERR_ << "Error sending channel header part:"<< PRINT_ZMQ_ERR(err); + //error sending header data + } else if((err = sendMessage(socket, + data_pack->channel_data->data(), + (size_t)data_pack->header.channel_data_size, + zqmFreeSentData, + new DisposeSentMemoryInfo(data_pack->channel_data, + DisposeSentMemoryInfo::SentPartData, + sending_opcode), + false))){ + ZMQDIO_BASE_LERR_ << "Error sending channel data part:"<< PRINT_ZMQ_ERR(err); + if(data_pack->channel_data != NULL) { + ZMQDIO_BASE_LERR_ << "Free the channel data memory"; } - break; - } + } + break; } return err; } diff --git a/chaos/common/direct_io/impl/ZMQBaseClass.h b/chaos/common/direct_io/impl/ZMQBaseClass.h index 3a864e26aad89079fcdcbf8af046c9b59893c4ab..ebcd37275a58081a3ba3e1bba139bed292186f2d 100644 --- a/chaos/common/direct_io/impl/ZMQBaseClass.h +++ b/chaos/common/direct_io/impl/ZMQBaseClass.h @@ -70,16 +70,20 @@ namespace chaos { int closeSocketNoWhait (void *socket); inline int readMessage(void *socket, - chaos::common::data::BufferSPtr& msg_buffer); - - //! read a new message from zmq socket - /*! - - */ + zmq_msg_t& message); + inline int readMessage(void *socket, - void *message_data, - size_t message_max_size, - size_t& message_size_read); + chaos::common::data::BufferSPtr& buffer, + bool& has_next); + + inline int readMessage(void *socket, + std::string& buffer, + bool& has_next, + std::string *peer_ip = NULL); + + inline int sendMessage(void *socket, + zmq_msg_t& message, + int flag); //! send a new message from zmq socket /*! @@ -101,23 +105,9 @@ namespace chaos { /*! */ - inline int moreMessageToRead(void * socket, - bool& more_to_read); - - //! receive a string - inline int stringReceive(void *socket, - std::string& received_string); - - //! send string closing message - int stringSend(void *socket, - const char *string); + inline bool moreMessageToRead(zmq_msg_t& cur_msg); - //! send a string with more message option - int stringSendMore(void *socket, - const char *string); - - //! Set a random id on socket - int setID(void *socket); + inline chaos::common::data::BufferSPtr zmqMsgToBufferShrdPtr(zmq_msg_t& msg); //! Set a socket id and return it /*! @@ -127,23 +117,6 @@ namespace chaos { int setAndReturnID(void *socket, std::string& new_id); - int resetOutputQueue(void *socket, - MapZMQConfiguration &default_conf, - const MapZMQConfiguration &startup_conf); - - //! send the start of the zmq envelop start - /*! - This method need to be called befor message forwarding, - it start the zmq forwarding envelop - */ - inline int sendStartEnvelop(void *socket); - - //! receive the start of a message envelop - /*! - Before message can be read, the start of the envelop need to be received - */ - inline int receiveStartEnvelop(void *socket); - //! receive Direct io datapack by socket int reveiceDatapack(void *socket, std::string& identity, @@ -161,12 +134,8 @@ namespace chaos { int sendDatapack(void *socket, chaos::common::direct_io::DirectIODataPackSPtr data_pack); }; - } } } } - - - #endif diff --git a/chaos/common/direct_io/impl/ZMQDirectIOClientConnection.cpp b/chaos/common/direct_io/impl/ZMQDirectIOClientConnection.cpp index 486ed1e1817e80d0e62201c9b3c66709819773c4..a05eb172fbe9fb7272a37396ac5749250a32d128 100644 --- a/chaos/common/direct_io/impl/ZMQDirectIOClientConnection.cpp +++ b/chaos/common/direct_io/impl/ZMQDirectIOClientConnection.cpp @@ -268,18 +268,13 @@ int ZMQDirectIOClientConnection::sendPriorityData(chaos::common::direct_io::Dire priority_identity, MOVE(data_pack), synchronous_answer); - if(err > 0 /*resource not available*/) { - //change id ofr socket - if((ZMQBaseClass::setAndReturnID(socket_priority, - service_identity))) { - ERR << "Error configuring new id for socker :" << service_endpoint; - } else { - ZMQBaseClass::resetOutputQueue(socket_service, - default_configuration, - chaos::GlobalConfiguration::getInstance()->getDirectIOClientImplKVParam()); - - } - } +// if(err > 0 /*resource not available*/) { +// //change id ofr socket +// if((ZMQBaseClass::setAndReturnID(socket_priority, +// service_identity))) { +// ERR << "Error configuring new id for socker :" << service_endpoint; +// } +// } return err; } @@ -303,18 +298,18 @@ int ZMQDirectIOClientConnection::sendServiceData(chaos::common::direct_io::Direc service_identity, MOVE(data_pack), synchronous_answer); - if(err > 0 /*resource not available*/) { - //change id ofr socket - if((ZMQBaseClass::setAndReturnID(socket_service, - service_identity))) { - ERR << "Error configuring new id for socker :" << service_endpoint; - } else { - ZMQBaseClass::resetOutputQueue(socket_service, - default_configuration, - chaos::GlobalConfiguration::getInstance()->getDirectIOClientImplKVParam()); - - } - } +// if(err > 0 /*resource not available*/) { +// //change id ofr socket +// if((ZMQBaseClass::setAndReturnID(socket_service, +// service_identity))) { +// ERR << "Error configuring new id for socker :" << service_endpoint; +// } else { +// ZMQBaseClass::resetOutputQueue(socket_service, +// default_configuration, +// chaos::GlobalConfiguration::getInstance()->getDirectIOClientImplKVParam()); +// +// } +// } return err; } @@ -329,7 +324,7 @@ bool ZMQDirectIOClientConnection::ensureSocket() { //send data with zmq tech int ZMQDirectIOClientConnection::writeToSocket(void *socket, std::string& identity, - chaos::common::direct_io::DirectIODataPackSPtr data_pack) { + chaos::common::direct_io::DirectIODataPackSPtr data_pack) { CHAOS_ASSERT(socket && data_pack); CHAOS_ASSERT(data_pack->header.dispatcher_header.fields.synchronous_answer == false); int err = 0; @@ -343,7 +338,7 @@ int ZMQDirectIOClientConnection::writeToSocket(void *socket, int ZMQDirectIOClientConnection::writeToSocket(void *socket, std::string& identity, chaos::common::direct_io::DirectIODataPackSPtr data_pack, - chaos::common::direct_io::DirectIODataPackSPtr& synchronous_answer) { + chaos::common::direct_io::DirectIODataPackSPtr& synchronous_answer) { CHAOS_ASSERT(socket && data_pack); CHAOS_ASSERT(data_pack->header.dispatcher_header.fields.synchronous_answer); uint16_t current_counter = data_pack->header.dispatcher_header.fields.counter = message_counter++; diff --git a/chaos/common/direct_io/impl/ZMQDirectIOServer.cpp b/chaos/common/direct_io/impl/ZMQDirectIOServer.cpp index a7b3b288aca82ac9c8f91cf4796642e7e3ed61b2..6ce5e04a90007a38159d61f94fe8487602938b52 100644 --- a/chaos/common/direct_io/impl/ZMQDirectIOServer.cpp +++ b/chaos/common/direct_io/impl/ZMQDirectIOServer.cpp @@ -85,7 +85,7 @@ void ZMQDirectIOServer::start() { MapZMQConfiguration default_context_configuration; default_context_configuration["ZMQ_IO_THREADS"] = "1"; - direct_io_thread_number = 2; + direct_io_thread_number = 1; DirectIOServer::start(); run_server = true; @@ -185,7 +185,7 @@ void ZMQDirectIOServer::poller(const std::string& public_url, default_socket_configuration["ZMQ_RCVTIMEO"] = "-1"; default_socket_configuration["ZMQ_SNDTIMEO"] = "1000"; - proxy_socket_configuration["ZMQ_LINGER"] = "0"; + proxy_socket_configuration["ZMQ_LINGER"] = "500"; //keep space for 2 compelte direct io message(3 message part) for every working thread proxy_socket_configuration["ZMQ_RCVHWM"] = "1000";//boost::lexical_cast<std::string>((direct_io_thread_number*3)*2); proxy_socket_configuration["ZMQ_SNDHWM"] = "1000"; @@ -260,7 +260,7 @@ void ZMQDirectIOServer::worker(unsigned int w_type, MapZMQConfiguration worker_empty_default_configuration; MapZMQConfiguration worker_socket_configuration; - worker_socket_configuration["ZMQ_LINGER"] = "0"; + worker_socket_configuration["ZMQ_LINGER"] = "500"; worker_socket_configuration["ZMQ_RCVHWM"] = "1000"; worker_socket_configuration["ZMQ_SNDHWM"] = "1000"; worker_socket_configuration["ZMQ_RCVTIMEO"] = "-1"; diff --git a/chaos/common/rpc/zmq/ZMQClient.cpp b/chaos/common/rpc/zmq/ZMQClient.cpp index 67becaad251ea0ccd411949b4f89af98597fd751..c22320b60d07061bcad53a8d892bd863535b41ab 100644 --- a/chaos/common/rpc/zmq/ZMQClient.cpp +++ b/chaos/common/rpc/zmq/ZMQClient.cpp @@ -158,6 +158,7 @@ ZMQSocketPool::ResourceSlot *ZMQClient::getSocketForNFI(NetworkForwardInfo *nfi) if(it != map_socket.end()){ return it->second->getNewResource(); } else { + ChaosSharedPtr< ZMQSocketPool > socket_pool(new ZMQSocketPool(nfi->destinationAddr, this)); map_socket.insert(make_pair(nfi->destinationAddr, socket_pool)); return socket_pool->getNewResource(); @@ -180,49 +181,55 @@ void ZMQClient::deleteSocket(ZMQSocketPool::ResourceSlot *socket_slot_to_release } //----resource pool handler----- -void* ZMQClient::allocateResource(const std::string& pool_identification, - uint32_t& alive_for_ms) { +ZMQSocketPoolDef* ZMQClient::allocateResource(const std::string& pool_identification, + uint32_t& alive_for_ms) { int err = 0; int linger = 0; int water_mark = 2; - + std::string new_id; + ChaosUniquePtr<ZMQSocketPoolDef> socket_def(new ZMQSocketPoolDef()); //set the alive time to one minute alive_for_ms = ZMQ_SOCKET_LIFETIME_TIMEOUT; - //create zmq socket - void *new_socket = zmq_socket (zmq_context, ZMQ_REQ); - if(!new_socket) { + socket_def->socket = zmq_socket (zmq_context, ZMQ_DEALER); + if(!socket_def->socket) { return NULL; - } else if ((err = zmq_setsockopt(new_socket, ZMQ_LINGER, &linger, sizeof(int)))) { - } else if ((err = zmq_setsockopt(new_socket, ZMQ_RCVHWM, &water_mark, sizeof(int)))) { - } else if ((err = zmq_setsockopt(new_socket, ZMQ_SNDHWM, &water_mark, sizeof(int)))) { - } else if ((err = zmq_setsockopt(new_socket, ZMQ_SNDTIMEO, &zmq_timeout, sizeof(int)))) { - } else if ((err = zmq_setsockopt(new_socket, ZMQ_RCVTIMEO, &zmq_timeout, sizeof(int)))) { + } else if ((err = zmq_setsockopt(socket_def->socket, ZMQ_LINGER, &linger, sizeof(int)))) { + } else if ((err = zmq_setsockopt(socket_def->socket, ZMQ_RCVHWM, &water_mark, sizeof(int)))) { + } else if ((err = zmq_setsockopt(socket_def->socket, ZMQ_SNDHWM, &water_mark, sizeof(int)))) { + } else if ((err = zmq_setsockopt(socket_def->socket, ZMQ_SNDTIMEO, &zmq_timeout, sizeof(int)))) { + } else if ((err = zmq_setsockopt(socket_def->socket, ZMQ_RCVTIMEO, &zmq_timeout, sizeof(int)))) { } else { string url = "tcp://"; url.append(pool_identification); - if((err = zmq_connect(new_socket, url.c_str()))) { + if((err = zmq_connect(socket_def->socket, url.c_str()))) { ZMQC_LERR << "Error "<< err <<" connecting socket to " <<pool_identification; } else { DEBUG_CODE(ZMQC_LAPP << "New socket for "<<pool_identification;) + socket_def->identity = common::utility::UUIDUtil::generateUUIDLite(); + if((err = zmq_setsockopt(socket_def->socket, ZMQ_IDENTITY, socket_def->identity.c_str(), socket_def->identity.size())) != 0){ + DEBUG_CODE(ZMQC_LAPP << "Error setting socket identity for "<<pool_identification;) + } } } if(err) { - if(new_socket) { + if(socket_def->socket) { ZMQC_LERR << "Error during configuration of the socket for "<<pool_identification; - zmq_close(new_socket); + zmq_close(socket_def->socket); //reset socket - new_socket = NULL; + socket_def->socket = NULL; } + socket_def.reset(); } - return new_socket; + return socket_def.release(); } -void ZMQClient::deallocateResource(const std::string& pool_identification, void* resource_to_deallocate) { +void ZMQClient::deallocateResource(const std::string& pool_identification, ZMQSocketPoolDef* resource_to_deallocate) { CHAOS_ASSERT(resource_to_deallocate) DEBUG_CODE(ZMQC_LAPP << "delete socket for "<<pool_identification;) - zmq_close(resource_to_deallocate); + zmq_close(resource_to_deallocate->socket); + delete(resource_to_deallocate); } //-----timer handler------ @@ -239,6 +246,37 @@ void ZMQClient::timeout() { } } +int ZMQClient::sendMessage(void *socket, + void *message_data, + size_t message_size, + bool more_to_send) { + int err = 0; + zmq_msg_t message; + + if((err = zmq_msg_init_size(&message, + message_size)) == -1){ + //error creating the message + err = zmq_errno(); + ZMQC_LERR << "Error initializing message with error:" << zmq_strerror(err); + } else { + //copy content into message + memcpy(zmq_msg_data(&message), + message_data, + message_size); + //send data + if((err = zmq_msg_send(&message, socket, more_to_send?ZMQ_SNDMORE:ZMQ_DONTWAIT)) == -1){ + err = zmq_errno(); + ZMQC_LERR << "Error sending message with error:" << zmq_strerror(err); + } else { + //reset the error + err = 0; + } + //close the message + zmq_msg_close(&message); + } + return err; +} + /* process the element action to be executed */ @@ -277,7 +315,12 @@ void ZMQClient::processBufferElement(NFISharedPtr messageInfo) { return; } - if((err = zmq_msg_init_data(&message, (void*)message_data->getBSONRawData(), message_data->getBSONRawSize(), my_free, new MemoryManagement(message_data))) == -1) { + //send data + if((err = zmq_msg_init_data(&message, + (void*)message_data->getBSONRawData(), + message_data->getBSONRawSize(), + my_free, + new MemoryManagement(message_data))) == -1) { int32_t sent_error = zmq_errno(); std::string error_message =zmq_strerror(sent_error); ZMQC_LERR << "Error allocating zmq messagecode:" << sent_error << " message:" <<error_message; @@ -287,10 +330,11 @@ void ZMQClient::processBufferElement(NFISharedPtr messageInfo) { "Error initializiend rcp message", __PRETTY_FUNCTION__); } - //err = 0; } else { ZMQC_LDBG << "Try to send message seq_id:"<<loc_seq_id; - err = zmq_sendmsg(socket_info->resource_pooled, &message, ZMQ_DONTWAIT); + err = zmq_msg_send(&message, + socket_info->resource_pooled->socket, + ZMQ_DONTWAIT); if(err == -1) { int32_t sent_error = zmq_errno(); std::string error_message = zmq_strerror(sent_error); @@ -308,7 +352,9 @@ void ZMQClient::processBufferElement(NFISharedPtr messageInfo) { }else{ ZMQC_LDBG << "Message seq_id:"<<loc_seq_id<<" sent now wait for ack"; //ok get the answer - err = zmq_recvmsg(socket_info->resource_pooled, &reply, 0); + err = zmq_msg_recv(&reply, + socket_info->resource_pooled->socket, + 0); if(err == -1) { int32_t sent_error = zmq_errno(); std::string error_message = zmq_strerror(sent_error); diff --git a/chaos/common/rpc/zmq/ZMQClient.h b/chaos/common/rpc/zmq/ZMQClient.h index 8d8df022d2549bf4f02b22e50058ec8cdac33226..dad330441cc1c81ee21dfe9abd1394403e4d0194 100644 --- a/chaos/common/rpc/zmq/ZMQClient.h +++ b/chaos/common/rpc/zmq/ZMQClient.h @@ -33,8 +33,13 @@ namespace chaos { class ZMQClient; class SocketEndpointPool; - - typedef chaos::common::pool::ResourcePool<void> ZMQSocketPool; + + struct ZMQSocketPoolDef{ + void * socket; + std::string identity; + }; + + typedef chaos::common::pool::ResourcePool<ZMQSocketPoolDef> ZMQSocketPool; //define the pool my for every endpoint CHAOS_DEFINE_MAP_FOR_TYPE(std::string, ChaosSharedPtr< ZMQSocketPool >, SocketMap) @@ -43,7 +48,7 @@ namespace chaos { Class that implemnt !CHAOS RPC messaggin gusing ZMQ driver parameter: - key:zmq_timeout value is a stirng that represent the integer used as timeout + key:zmq_timeout value is a stirng that represent the integer used as timeout */ DECLARE_CLASS_FACTORY(ZMQClient, RpcClient), public ZMQSocketPool::ResourcePoolHelper, @@ -64,14 +69,19 @@ namespace chaos { inline void deleteSocket(ZMQSocketPool::ResourceSlot *socket_slot_to_release); //resource pool handler - void* allocateResource(const std::string& pool_identification, - uint32_t& alive_for_ms); + ZMQSocketPoolDef* allocateResource(const std::string& pool_identification, + uint32_t& alive_for_ms); void deallocateResource(const std::string& pool_identification, - void* resource_to_deallocate); + ZMQSocketPoolDef* resource_to_deallocate); //timer handler void timeout(); - + + int sendMessage(void *socket, + void *message_data, + size_t message_size, + bool more_to_send); + public: /* diff --git a/chaos/common/rpc/zmq/ZMQServer.cpp b/chaos/common/rpc/zmq/ZMQServer.cpp index fc56d022496ba1a5e525b3ae7bd1ad6b1708f210..2f9182fe17bdbcf80bba7ab41a189577c96b61c9 100644 --- a/chaos/common/rpc/zmq/ZMQServer.cpp +++ b/chaos/common/rpc/zmq/ZMQServer.cpp @@ -118,10 +118,10 @@ void ZMQServer::deinit() { ZMQS_LAPP << "Stopping thread"; //wiath all thread zmq_ctx_shutdown(zmq_context); - + thread_group.join_all(); zmq_ctx_destroy(zmq_context); - + ZMQS_LAPP << "Thread stopped"; } #define ZMQ_DO_AGAIN(x) do{x}while(err == EAGAIN); @@ -168,7 +168,7 @@ void ZMQServer::worker() { int linger = 500; int water_mark = 10; - void *receiver = zmq_socket (zmq_context, ZMQ_REP); + void *receiver = zmq_socket (zmq_context, ZMQ_DEALER); if(!receiver) return; //err = zmq_bind(receiver, bind_str.str().c_str()); @@ -201,20 +201,30 @@ void ZMQServer::worker() { } while (run_server) { try { + bool has_more; + std::string identity; zmq_msg_t request; zmq_msg_t response; - err = zmq_msg_init(&request); + if((err = readMessage(receiver, + identity, + has_more) != 0)) { + continue; + } else if(has_more == false) { + ZMQS_LERR << "Identity without message"; + continue; + } - DEBUG_CODE(ZMQS_LDBG << "Wait for message";); - err = zmq_recvmsg(receiver, &request, 0); + //read message + err = zmq_msg_init(&request); + ZMQS_LDBG << "Wait for message"; + err = zmq_msg_recv(&request, receiver, 0); if(run_server==0){ - // no error should be issued on normal exit + // no error should be issued on normal exit ZMQS_LDBG << "exiting from worker.."; - - continue; + continue; } - + if(err == -1 ) { int32_t sent_error = zmq_errno(); std::string error_message = zmq_strerror(sent_error); @@ -227,8 +237,8 @@ void ZMQServer::worker() { //dispatch the command if(message_data->hasKey("seq_id")){ seq_id=message_data->getInt64Value("seq_id"); - } - DEBUG_CODE(ZMQS_LDBG << "Message Received seq_id:"<<seq_id;); + } + ZMQS_LDBG << "Message Received seq_id:"<<seq_id; const std::string msg_desc = message_data->getCompliantJSONString(); if(message_data->hasKey("syncrhonous_call") && message_data->getBoolValue("syncrhonous_call")) { @@ -236,10 +246,13 @@ void ZMQServer::worker() { } else { result_data_pack = command_handler->dispatchCommand(MOVE(message_data)); } + + //send identity + err = sendMessage(receiver, (void*)identity.c_str(), identity.size(), true); //create zmq message if(result_data_pack.get()==NULL){ ZMQS_LERR << "ERROR:"<<msg_desc; - + } result_data_pack->addInt64Value("seq_id",seq_id); err = zmq_msg_init_data(&response, @@ -254,8 +267,8 @@ void ZMQServer::worker() { ZMQS_LERR << "Error initializing the response message with code:" << sent_error << " message:" <<error_message; } else { //no error on create message - // ZMQS_LDBG << "Send ack"; - err = zmq_sendmsg(receiver, &response, ZMQ_NOBLOCK); + // ZMQS_LDBG << "Send ack"; + err = zmq_msg_send(&response, receiver, ZMQ_NOBLOCK); if(err == -1) { int32_t sent_error = zmq_errno(); std::string error_message = zmq_strerror(sent_error); @@ -278,3 +291,78 @@ void ZMQServer::worker() { zmq_close(receiver); ZMQS_LAPP << CHAOS_FORMAT("Leaving worker for %1%", %bind_str.str()); } + +int ZMQServer::sendMessage(void *socket, + void *message_data, + size_t message_size, + bool more_to_send) { + int err = 0; + zmq_msg_t message; + + if((err = zmq_msg_init_size(&message, + message_size)) == -1){ + //error creating the message + err = zmq_errno(); + ZMQS_LERR << "Error initializing message with error:" << zmq_strerror(err); + } else { + //copy content into message + memcpy(zmq_msg_data(&message), + message_data, + message_size); + //send data + if((err = zmq_msg_send(&message, socket, more_to_send?ZMQ_SNDMORE:ZMQ_DONTWAIT)) == -1){ + err = zmq_errno(); + ZMQS_LERR << "Error sending message with error:" << zmq_strerror(err); + } else { + //reset the error + err = 0; + } + //close the message + zmq_msg_close(&message); + } + return err; +} + +int ZMQServer::readMessage(void *socket, + std::string& buffer, + bool& has_next, + std::string *peer_ip) { + int err = 0; + has_next = false; + zmq_msg_t message; + if((err = zmq_msg_init(&message))) { + err = zmq_errno(); + ZMQS_LERR << "Error initilizing message" << zmq_strerror(err); + return err; + } + + /* Block until a message is available to be received from socket */ + if((err = zmq_msg_recv(&message, socket, 0)) <= 0) { + err = zmq_errno(); + ZMQS_LERR << "Error receiving message" << zmq_strerror(err); + } else { + err = 0; + size_t msg_size = zmq_msg_size(&message); + buffer.assign((const char *)zmq_msg_data(&message), msg_size); + + if(peer_ip) { + if(zmq_has("draft")) { + const char * ip = zmq_msg_gets(&message, "Peer-Address"); + if(ip) { + (*peer_ip) = ip; + } + } else { + peer_ip->assign("no draft zmq support"); + } + } + + //check if we have other message + has_next = zmq_msg_more(&message); + if((err = zmq_msg_close(&message)) != 0) { + err = zmq_errno(); + ZMQS_LERR << "Error closing message" << zmq_strerror(err); + } + err = 0; + } + return err; +} diff --git a/chaos/common/rpc/zmq/ZMQServer.h b/chaos/common/rpc/zmq/ZMQServer.h index 3caf1c6b16450d916c737115ee8a3de1d7167cee..559d69291586aa4e266ecc464d6535adf1d5fecc 100644 --- a/chaos/common/rpc/zmq/ZMQServer.h +++ b/chaos/common/rpc/zmq/ZMQServer.h @@ -48,8 +48,18 @@ namespace chaos { virtual ~ZMQServer(); //worker that process request in a separate thread void worker(); + + int sendMessage(void *socket, + void *message_data, + size_t message_size, + bool more_to_send); + + int readMessage(void *socket, + std::string& buffer, + bool& has_next, + std::string *peer_ip = NULL); public: - + /* init the rpc adapter */ @@ -67,7 +77,7 @@ namespace chaos { */ void deinit(); - //server worker thread + //server worker thread /*! Thread where data is received and managed */ diff --git a/config/CMakeChaos.txt b/config/CMakeChaos.txt index e554cf2929edc0336cde4ecea4392dc92a3c87d1..e6b190485140a84f78ebb246cc215c5ca7d94e8e 100644 --- a/config/CMakeChaos.txt +++ b/config/CMakeChaos.txt @@ -336,7 +336,7 @@ SET( CMAKE_CHAOS $ENV{CHAOS_CMAKE_FLAGS}) #separate_arguments(FrameworkLib) INCLUDE_DIRECTORIES(. ${PROJECT_SOURCE_DIR} ${CMAKE_INSTALL_PREFIX}/include) -LINK_DIRECTORIES(${CMAKE_LIBRARY_OUTPUT_DIRECTORY} ${CMAKE_INSTALL_PREFIX}/lib) +LINK_DIRECTORIES(${CMAKE_LIBRARY_OUTPUT_DIRECTORY} ${CMAKE_INSTALL_PREFIX}/lib ${CMAKE_INSTALL_PREFIX}/lib64) # foreach(cu ${CHAOS_CUS}) # string (REPLACE ".cpp" ".h" cuh ${cu}) diff --git a/config/localhost/MDSConfig-full.json b/config/localhost/MDSConfig-full.json index 8ec5acf8c2c1a2f0903a1058211725e8be71b2b0..8a6ce10923905b4c5c0c6a1b256a854ccb84b8fe 100644 --- a/config/localhost/MDSConfig-full.json +++ b/config/localhost/MDSConfig-full.json @@ -4125,7 +4125,7 @@ "cu_id": "ACCUMULATOR/BPM/ORBIT_E", "cu_type": "::driver::data_import::RTChaos2Memcache", "cu_param": "ACCUMULATOR/BPM/BPMSYNC/BPBA1001/X ACCUMULATOR/BPM/BPMSYNC/BPSA1001/X ACCUMULATOR/BPM/BPMSYNC/BPBA1002/X ACCUMULATOR/BPM/BPMSYNC/BPBA2001/X ACCUMULATOR/BPM/BPMSYNC/BPSA2001/X ACCUMULATOR/BPM/BPMSYNC/BPBA2002/X ACCUMULATOR/BPM/BPMSYNC/BPSA3001/X ACCUMULATOR/BPM/BPMSYNC/BPBA3001/X ACCUMULATOR/BPM/BPMSYNC/BPBA3002/X ACCUMULATOR/BPM/BPMSYNC/BPBA4001/X ACCUMULATOR/BPM/BPMSYNC/BPBA4002/X ACCUMULATOR/BPM/BPMSYNC/BPSA4001/X ACCUMULATOR/BPM/BPMSYNC/BPBA1001/Y ACCUMULATOR/BPM/BPMSYNC/BPSA1001/Y ACCUMULATOR/BPM/BPMSYNC/BPBA1002/Y ACCUMULATOR/BPM/BPMSYNC/BPBA2001/Y ACCUMULATOR/BPM/BPMSYNC/BPSA2001/Y ACCUMULATOR/BPM/BPMSYNC/BPBA2002/Y ACCUMULATOR/BPM/BPMSYNC/BPSA3001/Y ACCUMULATOR/BPM/BPMSYNC/BPBA3001/Y ACCUMULATOR/BPM/BPMSYNC/BPBA3002/Y ACCUMULATOR/BPM/BPMSYNC/BPBA4001/Y ACCUMULATOR/BPM/BPMSYNC/BPBA4002/Y ACCUMULATOR/BPM/BPMSYNC/BPSA4001/Y", - "auto_load": true, + "auto_load": false, "auto_start": false, "auto_init": false, "storage_type": 2, @@ -4152,7 +4152,7 @@ "cu_id": "ACCUMULATOR/BPM/ORBIT_P", "cu_type": "::driver::data_import::RTChaos2Memcache", "cu_param": "ACCUMULATOR/BPM/BPMSYNC/BPBA2002/X ACCUMULATOR/BPM/BPMSYNC/BPSA2001/X ACCUMULATOR/BPM/BPMSYNC/BPBA2001/X ACCUMULATOR/BPM/BPMSYNC/BPBA1002/X ACCUMULATOR/BPM/BPMSYNC/BPSA1001/X ACCUMULATOR/BPM/BPMSYNC/BPBA1001/X ACCUMULATOR/BPM/BPMSYNC/BPSA4001/X ACCUMULATOR/BPM/BPMSYNC/BPBA4002/X ACCUMULATOR/BPM/BPMSYNC/BPBA4001/X ACCUMULATOR/BPM/BPMSYNC/BPBA3002/X ACCUMULATOR/BPM/BPMSYNC/BPBA3001/X ACCUMULATOR/BPM/BPMSYNC/BPSA3001/X ACCUMULATOR/BPM/BPMSYNC/BPBA2002/Y ACCUMULATOR/BPM/BPMSYNC/BPSA2001/Y ACCUMULATOR/BPM/BPMSYNC/BPBA2001/Y ACCUMULATOR/BPM/BPMSYNC/BPBA1002/Y ACCUMULATOR/BPM/BPMSYNC/BPSA1001/Y ACCUMULATOR/BPM/BPMSYNC/BPBA1001/Y ACCUMULATOR/BPM/BPMSYNC/BPSA4001/Y ACCUMULATOR/BPM/BPMSYNC/BPBA4002/Y ACCUMULATOR/BPM/BPMSYNC/BPBA4001/Y ACCUMULATOR/BPM/BPMSYNC/BPBA3002/Y ACCUMULATOR/BPM/BPMSYNC/BPBA3001/Y ACCUMULATOR/BPM/BPMSYNC/BPSA3001/Y", - "auto_load": true, + "auto_load": false, "auto_start": false, "auto_init": false, "storage_type": 2,