diff --git a/ChaosDataService/ChaosDataService.cpp b/ChaosDataService/ChaosDataService.cpp index 4bbcd6b275a7434896650af45c0f1366b60d91a0..0f530d1fbf4ba9e555d199100d350a16ddd3ee32 100644 --- a/ChaosDataService/ChaosDataService.cpp +++ b/ChaosDataService/ChaosDataService.cpp @@ -178,7 +178,7 @@ void ChaosDataService::init(void *init_data) throw(CException) { if(run_mode == QUERY || run_mode == BOTH) { CDSLAPP_ << "Allocate the Query Data Consumer"; - data_consumer.reset(new QueryDataConsumer(vfs_file_manager.get()), "QueryDataConsumer"); + data_consumer.reset(new QueryDataConsumer(vfs_file_manager.get(), db_driver_ptr), "QueryDataConsumer"); if(!data_consumer.get()) throw chaos::CException(-1, "Error instantiating data consumer", __PRETTY_FUNCTION__); data_consumer->settings = &settings; data_consumer->network_broker = network_broker.get(); diff --git a/ChaosDataService/QueryDataConsumer.cpp b/ChaosDataService/QueryDataConsumer.cpp index b2381a13108e96121a5b0e59f07624f8672acb0c..a218a148f1592593706233947694bbf238fdc3c6 100644 --- a/ChaosDataService/QueryDataConsumer.cpp +++ b/ChaosDataService/QueryDataConsumer.cpp @@ -38,15 +38,13 @@ using namespace chaos::common::direct_io::channel; #define QDCDBG_ LDBG_ << QueryDataConsumer_LOG_HEAD << __FUNCTION__ << " - " #define QDCERR_ LERR_ << QueryDataConsumer_LOG_HEAD << __FUNCTION__ << "(" << __LINE__ << ") - " -QueryDataConsumer::QueryDataConsumer(vfs::VFSManager *_vfs_manager_instance): +QueryDataConsumer::QueryDataConsumer(vfs::VFSManager *_vfs_manager_instance, + db_system::DBDriver *_db_driver): vfs_manager_instance(_vfs_manager_instance), -query_engine(NULL){ - -} +db_driver(_db_driver), +query_engine(NULL){} -QueryDataConsumer::~QueryDataConsumer() { - -} +QueryDataConsumer::~QueryDataConsumer() {} void QueryDataConsumer::init(void *init_data) throw (chaos::CException) { if(!settings) throw chaos::CException(-1, "No setting provided", __FUNCTION__); @@ -109,7 +107,7 @@ void QueryDataConsumer::init(void *init_data) throw (chaos::CException) { QDCAPP_ << "Allocating Snapshoot worker"; if(!settings->cache_only) { snapshot_data_worker = new chaos::data_service::worker::SnapshotCreationWorker(cache_impl_name, - db_impl_name, + db_driver, network_broker); if(!snapshot_data_worker) throw chaos::CException(-5, "Error allocating snapshot worker", __FUNCTION__); chaos::utility::StartableService::initImplementation(snapshot_data_worker, init_data, "SnapshotCreationWorker", __PRETTY_FUNCTION__); @@ -251,21 +249,25 @@ int QueryDataConsumer::consumeGetEvent(DirectIODeviceChannelHeaderGetOpcode *hea void *channel_data, uint32_t channel_data_len, DirectIOSynchronousAnswerPtr synchronous_answer) { - return cache_driver->getData(channel_data, + int err = cache_driver->getData(channel_data, channel_data_len, &synchronous_answer->answer_data, synchronous_answer->answer_size); + if(channel_data) free(channel_data); + if(header) free(header); + return err; } #pragma mark DirectIOSystemAPIServerChannelHandler int QueryDataConsumer::consumeNewSnapshotEvent(opcode_headers::DirectIOSystemAPIChannelOpcodeNewSnapshootHeader *header, - const std::vector<std::string>& snapped_producer_key, - DirectIOSystemAPINewSnapshootResult& api_result) { + void *concatenated_unique_id_memory, + uint32_t concatenated_unique_id_memory_size, + DirectIOSystemAPINewSnapshootResult *api_result) { //check if we can work - if(!settings->cache_only) { + if(settings->cache_only) { //data service is in cache only mode throw the error - api_result.result_field.error = -1000; - std::strcpy(api_result.result_field.error_message, "Chaos Data Service is in cache only"); + api_result->error = -1000; + std::strcpy(api_result->error_message, "Chaos Data Service is in cache only"); //delete header if(header) free(header); return 0; @@ -276,17 +278,21 @@ int QueryDataConsumer::consumeNewSnapshotEvent(opcode_headers::DirectIOSystemAPI //copy snapshot name job->snapshot_name = header->field.snap_name; //! copy the vector in job configuration - job->produceter_unique_id_set = snapped_producer_key; - + if(concatenated_unique_id_memory_size && concatenated_unique_id_memory) { + job->concatenated_unique_id_memory = (char*)concatenated_unique_id_memory; + job->concatenated_unique_id_memory_size = concatenated_unique_id_memory_size; + } if(!snapshot_data_worker->submitJobInfo(job)) { DEBUG_CODE(QDCDBG_ << "error pushing snapshot creation job " << job->snapshot_name << " in queue"); - api_result.result_field.error = -1001; - std::strcpy(api_result.result_field.error_message, "error pushing snapshot creation job in queue"); + api_result->error = -1001; + std::strcpy(api_result->error_message, "error pushing snapshot creation job in queue"); + if(concatenated_unique_id_memory) free(concatenated_unique_id_memory); delete job; } else { - api_result.result_field.error = 0; - std::strcpy(api_result.result_field.error_message, "Creation submitted"); + api_result->error = 0; + std::strcpy(api_result->error_message, "Creation submitted"); } + if(header) free(header); return 0; } diff --git a/ChaosDataService/QueryDataConsumer.h b/ChaosDataService/QueryDataConsumer.h index ca61630f4e95d8478fd1305c81589bc24b700468..ca79ab1928aab91f396166c12bbcf0a5c3770b97 100644 --- a/ChaosDataService/QueryDataConsumer.h +++ b/ChaosDataService/QueryDataConsumer.h @@ -26,6 +26,7 @@ #include "vfs/VFSManager.h" #include "worker/DataWorker.h" #include "cache_system/cache_system.h" +#include "db_system/db_system.h" #include "query_engine/QueryEngine.h" #include <chaos/common/utility/ObjectSlot.h> @@ -68,7 +69,7 @@ namespace chaos{ DirectIOSystemAPIServerChannel *system_api_channel; cache_system::CacheDriver *cache_driver; - + db_system::DBDriver *db_driver; vfs::VFSManager *vfs_manager_instance; boost::atomic<uint16_t> device_data_worker_index; chaos::data_service::worker::DataWorker **device_data_worker; @@ -94,12 +95,13 @@ namespace chaos{ //---------------- DirectIOSystemAPIServerChannelHandler ----------------------- int consumeNewSnapshotEvent(opcode_headers::DirectIOSystemAPIChannelOpcodeNewSnapshootHeader *header, - const std::vector<std::string>& snapped_producer_key, - DirectIOSystemAPINewSnapshootResult& api_result); + void *concatenated_unique_id_memory, + uint32_t concatenated_unique_id_memory_size, + DirectIOSystemAPINewSnapshootResult *api_result); //async central timer hook void timeout(); public: - QueryDataConsumer(vfs::VFSManager *_vfs_manager_instance); + QueryDataConsumer(vfs::VFSManager *_vfs_manager_instance, db_system::DBDriver *_db_driver); ~QueryDataConsumer(); void init(void *init_data) throw (chaos::CException); void start() throw (chaos::CException); diff --git a/ChaosDataService/db_system/MongoDBDriver.cpp b/ChaosDataService/db_system/MongoDBDriver.cpp index 29eb9c1c8ef28fc52bc8faa51e30bbfffa2e3cfb..ed92a1f22dae054afb87f5ab921267b7523d8c13 100644 --- a/ChaosDataService/db_system/MongoDBDriver.cpp +++ b/ChaosDataService/db_system/MongoDBDriver.cpp @@ -845,6 +845,10 @@ int MongoDBDriver::snapshotCreateNewWithName(const std::string& snapshot_name) { DEBUG_CODE(MDBID_LDBG_ << "snapshotCreateNewWithName insert ---------------------------------------------";) err = ha_connection_pool->insert(MONGO_DB_COLLECTION_NAME(db_name, MONGO_DB_COLLECTION_SNAPSHOT), q); + if(err == 11000) { + //already exis a snapshot with taht name so no error need to be throw + err = 0; + } } catch( const mongo::DBException &e ) { MDBID_LERR_ << e.what(); err = -1; @@ -862,7 +866,7 @@ int MongoDBDriver::snapshotAddElementToSnapshot(const std::string& snapshot_name mongo::BSONObjBuilder new_dataset; mongo::BSONObjBuilder search_snapshot; try{ - new_dataset << dataset_type << mongo::BSONObj((const char *)data); + new_dataset << "$set"<< BSON(dataset_type << mongo::BSONObj((const char *)data)); //search for snapshot name and producer unique key search_snapshot << MONGO_DB_FIELD_SNAPSHOT_DATA_SNAPSHOT_NAME << snapshot_name; @@ -875,7 +879,8 @@ int MongoDBDriver::snapshotAddElementToSnapshot(const std::string& snapshot_name DEBUG_CODE(MDBID_LDBG_ << "condition" << q;) DEBUG_CODE(MDBID_LDBG_ << "snapshotCreateNewWithName insert ---------------------------------------------";) - err = ha_connection_pool->insert(MONGO_DB_COLLECTION_NAME(db_name, MONGO_DB_COLLECTION_SNAPSHOT_DATA), q); + //update and waith until the data is on the server + err = ha_connection_pool->update(MONGO_DB_COLLECTION_NAME(db_name, MONGO_DB_COLLECTION_SNAPSHOT_DATA), q, u, true, false, &mongo::WriteConcern::acknowledged); } catch( const mongo::DBException &e ) { MDBID_LERR_ << e.what(); err = -1; diff --git a/ChaosDataService/worker/SnapshotCreationWorker.cpp b/ChaosDataService/worker/SnapshotCreationWorker.cpp index 518001cf33207da90201a0edc0f016c3d54cc3b2..16f94d8539a235c016ca41e3dfbbefacd35018e8 100644 --- a/ChaosDataService/worker/SnapshotCreationWorker.cpp +++ b/ChaosDataService/worker/SnapshotCreationWorker.cpp @@ -24,6 +24,7 @@ #include <chaos/common/utility/ObjectFactoryRegister.h> #include <boost/lexical_cast.hpp> +#include <boost/algorithm/string.hpp> using namespace chaos::common::network; using namespace chaos::data_service::worker; @@ -37,10 +38,10 @@ using namespace chaos::data_service::worker; //------------------------------------------------------------------------------------------------------------------------ SnapshotCreationWorker::SnapshotCreationWorker(const std::string& _cache_impl_name, - const std::string& _db_impl_name, + db_system::DBDriver *_db_driver_ptr, NetworkBroker *_network_broker): cache_impl_name(_cache_impl_name), -db_impl_name(_db_impl_name), +db_driver_ptr(_db_driver_ptr), network_broker(_network_broker), mds_channel(NULL){} @@ -57,16 +58,12 @@ void SnapshotCreationWorker::init(void *init_data) throw (chaos::CException) { SCW_LAPP_ << "allocating cache driver"; cache_driver_ptr = chaos::ObjectFactoryRegister<cache_system::CacheDriver>::getInstance()->getNewInstanceByName(cache_impl_name); - - SCW_LAPP_ << "allocating cache driver"; - db_driver_ptr = chaos::ObjectFactoryRegister<db_system::DBDriver>::getInstance()->getNewInstanceByName(db_impl_name); } void SnapshotCreationWorker::deinit() throw (chaos::CException) { SCW_LAPP_ << "deallocating cache driver"; if(cache_driver_ptr) delete(cache_driver_ptr); SCW_LAPP_ << "deallocating db driver"; - if(db_driver_ptr) delete(db_driver_ptr); if(mds_channel && network_broker) { network_broker->disposeMessageChannel(mds_channel); @@ -82,14 +79,14 @@ int SnapshotCreationWorker::storeDatasetTypeInSnapsnot(const std::string& snapsh void *data = NULL; uint32_t data_len = 0; std::string dataset_to_fetch = unique_id + dataset_type; - SCW_LDBG_ << "Get live data for " << dataset_to_fetch << " in channel "; + SCW_LDBG_ << "Get live data for " << dataset_to_fetch << " in channel"; if((err = cache_driver_ptr->getData((void*)dataset_to_fetch.c_str(), dataset_to_fetch.size(), &data, data_len))) { SCW_LERR_<< "Error retrieving live data for " << dataset_to_fetch << " with error: " << err; } else if(data) { SCW_LDBG_ << "Store data on snapshot for " << dataset_to_fetch; if((err = db_driver_ptr->snapshotAddElementToSnapshot(snapshot_name, unique_id, - dataset_type, + dataset_to_fetch, data, data_len))) { SCW_LERR_<< "Error storign dataset type "<< dataset_type <<" for " << unique_id << " in snapshot " << snapshot_name << " with error: " << err; @@ -105,9 +102,16 @@ int SnapshotCreationWorker::storeDatasetTypeInSnapsnot(const std::string& snapsh void SnapshotCreationWorker::executeJob(WorkerJobPtr job_info, void* cookie) { int err = 0; - - + std::vector<std::string> snapped_producer_keys; SnapshotCreationJob *job_ptr = reinterpret_cast<SnapshotCreationJob*>(job_info); + //recreate the array of producer key set + if(job_ptr->concatenated_unique_id_memory_size) { + std::string concatenated_keys((const char*)job_ptr->concatenated_unique_id_memory, job_ptr->concatenated_unique_id_memory_size); + //split the concatenated string + boost::split( snapped_producer_keys, concatenated_keys, is_any_of(","), token_compress_on ); + free(job_ptr->concatenated_unique_id_memory); + } + //check what kind of push we have //read lock on mantainance mutex SCW_LDBG_ << "Start snapshot creation for name" << job_ptr->snapshot_name; @@ -118,16 +122,16 @@ void SnapshotCreationWorker::executeJob(WorkerJobPtr job_info, void* cookie) { SCW_LERR_<< "Error creating snapshot "<< job_ptr->snapshot_name <<" on database with error: " << err; }else { //get the unique id to snap - if(job_ptr->produceter_unique_id_set.size()) { + if(snapped_producer_keys.size()) { SCW_LDBG_ << "make snapshot on the user producer'id set"; } else { SCW_LDBG_ << "make snapshot on all producer key"; - mds_channel->getAllDeviceID(job_ptr->produceter_unique_id_set); + mds_channel->getAllDeviceID(snapped_producer_keys); } //scann all id - for (std::vector<std::string>::iterator it = job_ptr->produceter_unique_id_set.begin(); - it != job_ptr->produceter_unique_id_set.end(); + for (std::vector<std::string>::iterator it = snapped_producer_keys.begin(); + it != snapped_producer_keys.end(); it++) { //snap output channel diff --git a/ChaosDataService/worker/SnapshotCreationWorker.h b/ChaosDataService/worker/SnapshotCreationWorker.h index 9b9e6930a05103145ea1f8991ffb8f2aa039eb7f..dc586e5268302d59213ef9553533afa816211f56 100644 --- a/ChaosDataService/worker/SnapshotCreationWorker.h +++ b/ChaosDataService/worker/SnapshotCreationWorker.h @@ -42,10 +42,13 @@ namespace chaos{ struct SnapshotCreationJob : public WorkerJob { //the name of the new snapshot - std::string snapshot_name; + std::string snapshot_name; - //! if empty all key need to be insert into the snapshot - std::vector<std::string> produceter_unique_id_set; + //! buffer fileld with concatenated unique id + char * concatenated_unique_id_memory; + + //the length of the buffer + uint32_t concatenated_unique_id_memory_size; }; //! worker for create the snapshoot in async way @@ -71,7 +74,7 @@ namespace chaos{ void executeJob(WorkerJobPtr job_info, void* cookie); public: SnapshotCreationWorker(const std::string& _cache_impl_name, - const std::string& _db_impl_name, + db_system::DBDriver *_db_driver_ptr, chaos_network::NetworkBroker *_network_broker); ~SnapshotCreationWorker(); void init(void *init_data) throw (chaos::CException); diff --git a/ChaosSnapshotUtility/main.cpp b/ChaosSnapshotUtility/main.cpp index 01e10074538e479765fd33028ec9e9803e9f0e22..cb33f5652c6dedc76575e8b341b946ea8b3be39f 100644 --- a/ChaosSnapshotUtility/main.cpp +++ b/ChaosSnapshotUtility/main.cpp @@ -34,13 +34,14 @@ using namespace chaos; using namespace chaos::ui; int main(int argc, char * argv[]) { - std::string device_id; + int64_t err = 0; + std::vector<std::string> device_id_list; std::string snap_name; std::string cds_addr; unsigned int operation; try{ ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<std::string>(OPT_CDS_ADDRESS, "CDS address", &cds_addr); - ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<std::string>(OPT_CU_ID, "The identification string of the device to snapshuot", &device_id); + ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption< std::vector<std::string> >(OPT_CU_ID, "The identification string of the device to snapshuot", &device_id_list); ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<std::string>(OPT_SNAP_NAME, "The name of the snapshot", &snap_name); ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<unsigned int>(OPT_SNAPSHOT_OP, "Operation on snapshot [create(0), delete(1)]", 0, &operation); @@ -62,6 +63,25 @@ int main(int argc, char * argv[]) { if(!snap_name.size()) throw CException(-3, "Snapshot name can't zero-length", "check param"); SystemApiChannel *system_api_channel = LLRpcApi::getInstance()->getSystemApiClientChannel(cds_addr); + + chaos::common::direct_io::channel::opcode_headers::DirectIOSystemAPINewSnapshootResultPtr system_api_result = NULL; + + //!make snap on device + if(!(err = system_api_channel->system_api_channel->makeNewDatasetSnapshot(snap_name, + device_id_list, + &system_api_result))){ + if(system_api_result) { + std::cout << "Snapshot creation report: " << std::endl; + std::cout << "Error code:" << system_api_result->error << std::endl; + std::cout << "Error message:" << system_api_result->error_message << std::endl; + free(system_api_result); + } else { + std::cout << "no result received" << std::endl; + } + } else { + std::cout << "Error executing directio call" << std::endl; + } + if(system_api_channel) { LLRpcApi::getInstance()->releaseSystemApyChannel(system_api_channel); } diff --git a/chaos/common/chaos_constants.h b/chaos/common/chaos_constants.h index 4300bdc5f19195fc928d4b878568f1fb13478a85..c6913241ffb734980948bbc6f34acd1133cbbe56 100644 --- a/chaos/common/chaos_constants.h +++ b/chaos/common/chaos_constants.h @@ -534,13 +534,13 @@ namespace chaos { */ namespace DataPackCommonKey { //!define the device unique key, this represent the primary key of the producer[string] - static const char * const DPCK_DEVICE_ID = "dpck.device_id"; + static const char * const DPCK_DEVICE_ID = "dpck_device_id"; //!this define the acquisition timestamp of the data represented by the dataset[uint64_t] - static const char * const DPCK_TIMESTAMP = "dpck.ts"; + static const char * const DPCK_TIMESTAMP = "dpck_ts"; //!define the type of the dataset [output(0) - input(1) - custom(2) - system(3) int32_t] - static const char * const DPCK_DATASET_TYPE = "dpck.ds_type"; + static const char * const DPCK_DATASET_TYPE = "dpck_ds_type"; //! the constant that represent the output dataset type static const unsigned int DPCK_DATASET_TYPE_OUTPUT = 0; //! the constant that represent the input dataset type @@ -560,7 +560,7 @@ namespace chaos { //! Namespace for standard constant used for output attribute of a producer namespace DataPackOutputKey { //!this define key associated to the trigger - static const char * const DPOK_TRIGGER_CODE = "dpok.trigger_key"; + static const char * const DPOK_TRIGGER_CODE = "dpok_trigger_key"; } /** @} */ // end of DataPackKey diff --git a/chaos/common/direct_io/channel/DirectIODeviceChannelGlobal.h b/chaos/common/direct_io/channel/DirectIODeviceChannelGlobal.h index 20072d280106c8f54f75f11c0d52c245f10002d4..617cf7459d8796ac9c27aece612580c528d1ad61 100644 --- a/chaos/common/direct_io/channel/DirectIODeviceChannelGlobal.h +++ b/chaos/common/direct_io/channel/DirectIODeviceChannelGlobal.h @@ -247,15 +247,11 @@ namespace chaos { *DirectIOSystemAPIChannelOpcodeNewSnapshootHeaderPtr; //!result - typedef union DirectIOSystemAPINewSnapshootResult { - //raw data representation of the header - char raw_data[256+4]; - struct ResultFiled { + typedef struct DirectIOSystemAPINewSnapshootResult { int32_t error; char error_message[256]; - } result_field; }DirectIOSystemAPINewSnapshootResult, - DirectIOSystemAPINewSnapshootResultPtr; + *DirectIOSystemAPINewSnapshootResultPtr; } } } diff --git a/chaos/common/direct_io/channel/DirectIODeviceClientChannel.cpp b/chaos/common/direct_io/channel/DirectIODeviceClientChannel.cpp index d0d9b35d695e38866bceb7e80904b77c613d48bb..7a4a88901a8c2990d589941acafe7e5bc5c3f7ec 100644 --- a/chaos/common/direct_io/channel/DirectIODeviceClientChannel.cpp +++ b/chaos/common/direct_io/channel/DirectIODeviceClientChannel.cpp @@ -90,7 +90,7 @@ int64_t DirectIODeviceClientChannel::storeAndCacheDataOutputChannel(const std::s int64_t DirectIODeviceClientChannel::requestLastOutputData(const std::string& key, void **result, uint32_t &size) { - uint64_t err = 0; + int64_t err = 0; DirectIOSynchronousAnswer *answer = NULL; DirectIODataPack *data_pack = (DirectIODataPack*)calloc(sizeof(DirectIODataPack), 1); diff --git a/chaos/common/direct_io/channel/DirectIOSystemAPIClientChannel.cpp b/chaos/common/direct_io/channel/DirectIOSystemAPIClientChannel.cpp index 36b0b48c098fb699d99e931b1372db41f3e24f05..19dcb53934413293215c19b2cd288a1b556ea730 100644 --- a/chaos/common/direct_io/channel/DirectIOSystemAPIClientChannel.cpp +++ b/chaos/common/direct_io/channel/DirectIOSystemAPIClientChannel.cpp @@ -68,11 +68,11 @@ int64_t DirectIOSystemAPIClientChannel::makeNewDatasetSnapshot(const std::string //we have also a set of producer key so senti it in the data part of message std::string producer_key_concatenation; for(std::vector<std::string>::const_iterator it = producer_keys.begin(); - it != producer_keys.end(); - it++) { + it != producer_keys.end();) { //add key producer_key_concatenation.append(*it); - if(it != producer_keys.end()) { + + if((++it) != producer_keys.end()) { producer_key_concatenation.append(","); } } @@ -93,7 +93,7 @@ int64_t DirectIOSystemAPIClientChannel::makeNewDatasetSnapshot(const std::string //we got answer if(answer && answer->answer_size == sizeof(DirectIOSystemAPINewSnapshootResult)) { *api_result_handle = static_cast<DirectIOSystemAPINewSnapshootResult*>(answer->answer_data); - (*api_result_handle)->result_field.error = FROM_LITTLE_ENDNS_NUM(int32_t, (*api_result_handle)->result_field.error); + (*api_result_handle)->error = FROM_LITTLE_ENDNS_NUM(int32_t, (*api_result_handle)->error); } else { *api_result_handle = NULL; } diff --git a/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.cpp b/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.cpp index 3448f1bc4b7f21ce6a552b2ee384160dd13bb463..76712f5a220cb015849bc03ae9449cccf3dcb7cc 100644 --- a/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.cpp +++ b/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.cpp @@ -21,7 +21,6 @@ #include <chaos/common/utility/endianess.h> #include <chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.h> -#include <boost/algorithm/string.hpp> namespace chaos_data = chaos::common::data; using namespace chaos::common::direct_io; @@ -49,30 +48,21 @@ int DirectIOSystemAPIServerChannel::consumeDataPack(DirectIODataPack *dataPack, switch (channel_opcode) { case opcode::SystemAPIChannelOpcodeNewNewSnapshootDataset: { - //passed unique produce key vector - std::vector<std::string> snapped_producer_keys; - - //the result of the api - DirectIOSystemAPINewSnapshootResult *api_result = (DirectIOSystemAPINewSnapshootResult*)std::calloc(sizeof(DirectIOSystemAPINewSnapshootResult), 1) ; - //set the answer pointer - synchronous_answer->answer_data = api_result; + synchronous_answer->answer_data = std::calloc(sizeof(DirectIOSystemAPINewSnapshootResult), 1); synchronous_answer->answer_size = sizeof(DirectIOSystemAPINewSnapshootResult); //get the header opcode_headers::DirectIOSystemAPIChannelOpcodeNewSnapshootHeaderPtr header = reinterpret_cast< opcode_headers::DirectIOSystemAPIChannelOpcodeNewSnapshootHeaderPtr >(dataPack->channel_header_data); header->field.producer_key_set_len = FROM_LITTLE_ENDNS_NUM(uint32_t, header->field.producer_key_set_len); - - if(header->field.producer_key_set_len) { - //recreate the array of producer key set - std::string concatenated_keys((const char*)dataPack->channel_data, header->field.producer_key_set_len); - //split the concatenated string - boost::split( snapped_producer_keys, concatenated_keys, is_any_of(","), token_compress_on ); - } + //call the handler - handler->consumeNewSnapshotEvent(header, snapped_producer_keys, *api_result); + handler->consumeNewSnapshotEvent(header, + dataPack->channel_data, + dataPack->header.channel_data_size, + (DirectIOSystemAPINewSnapshootResult*)synchronous_answer->answer_data); //fix endianes into api result - api_result->result_field.error = TO_LITTE_ENDNS_NUM(int32_t, api_result->result_field.error); + ((DirectIOSystemAPINewSnapshootResult*)synchronous_answer->answer_data)->error = TO_LITTE_ENDNS_NUM(int32_t, ((DirectIOSystemAPINewSnapshootResult*)synchronous_answer->answer_data)->error); break; } diff --git a/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.h b/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.h index 51ef97b8097848a6e6faab690289e73250a942fd..77278934330296bfc535fdd5248287c128e521c0 100644 --- a/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.h +++ b/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.h @@ -54,9 +54,10 @@ namespace chaos { unique key, to include into the snaphsoot */ virtual int consumeNewSnapshotEvent(opcode_headers::DirectIOSystemAPIChannelOpcodeNewSnapshootHeader *header, - const std::vector<std::string>& snapped_producer_key, - DirectIOSystemAPINewSnapshootResult& api_result) - {DELETE_HEADER(header) return 0;}; + void *concatenated_unique_id_memory, + uint32_t concatenated_unique_id_memory_size, + DirectIOSystemAPINewSnapshootResult *api_result) + {DELETE_HEADER_DATA(header, concatenated_unique_id_memory) return 0;}; } DirectIOSystemAPIServerChannelHandler;