From e44714d4a1b624f1f8deefe0975e27d2b94472e6 Mon Sep 17 00:00:00 2001 From: Claudio Bisegni <Claudio.Bisegni@lnf.infn.it> Date: Mon, 24 Nov 2014 18:35:41 +0100 Subject: [PATCH] CDS DBDriver has been update with the "delete snapshot api" and direct io system channel (client and server) has been finalized --- ChaosDataService/QueryDataConsumer.cpp | 15 ++- ChaosDataService/QueryDataConsumer.h | 12 +- ChaosDataService/db_system/DBDriver.h | 27 ++++- ChaosDataService/db_system/MongoDBDriver.cpp | 100 ++++++++++++---- ChaosDataService/db_system/MongoDBDriver.h | 10 ++ .../channel/DirectIODeviceChannelGlobal.h | 34 +++--- .../DirectIOSystemAPIClientChannel.cpp | 112 ++++++++++++++++-- .../channel/DirectIOSystemAPIClientChannel.h | 5 +- .../DirectIOSystemAPIServerChannel.cpp | 53 ++++++++- .../channel/DirectIOSystemAPIServerChannel.h | 31 ++++- 10 files changed, 335 insertions(+), 64 deletions(-) diff --git a/ChaosDataService/QueryDataConsumer.cpp b/ChaosDataService/QueryDataConsumer.cpp index ddc0b296c..386ae1b28 100644 --- a/ChaosDataService/QueryDataConsumer.cpp +++ b/ChaosDataService/QueryDataConsumer.cpp @@ -259,7 +259,8 @@ int QueryDataConsumer::consumeGetEvent(DirectIODeviceChannelHeaderGetOpcode *hea } #pragma mark DirectIOSystemAPIServerChannelHandler -int QueryDataConsumer::consumeNewSnapshotEvent(opcode_headers::DirectIOSystemAPIChannelOpcodeNewSnapshotHeader *header, +// Manage the creation of a snapshot +int QueryDataConsumer::consumeNewSnapshotEvent(opcode_headers::DirectIOSystemAPIChannelOpcodeNDGSnapshotHeader *header, void *concatenated_unique_id_memory, uint32_t concatenated_unique_id_memory_size, DirectIOSystemAPISnapshotResult *api_result) { @@ -308,3 +309,15 @@ int QueryDataConsumer::consumeNewSnapshotEvent(opcode_headers::DirectIOSystemAPI return 0; } +// Manage the delete operation on an existing snapshot +int QueryDataConsumer::consumeDeleteSnapshotEvent(opcode_headers::DirectIOSystemAPIChannelOpcodeNDGSnapshotHeader *header, + DirectIOSystemAPISnapshotResult *api_result) { + +} + +// Return the dataset for a producerkey ona specific snapshot +int QueryDataConsumer::consumeGetDatasetSnapshotEvent(opcode_headers::DirectIOSystemAPIChannelOpcodeNDGSnapshotHeader *header, + const std::string& producer_id, + DirectIOSystemAPIGetDatasetSnapshotResult *api_result) { + +} diff --git a/ChaosDataService/QueryDataConsumer.h b/ChaosDataService/QueryDataConsumer.h index d675c2b12..945f57d85 100644 --- a/ChaosDataService/QueryDataConsumer.h +++ b/ChaosDataService/QueryDataConsumer.h @@ -94,10 +94,20 @@ namespace chaos{ DirectIOSynchronousAnswerPtr synchronous_answer); //---------------- DirectIOSystemAPIServerChannelHandler ----------------------- - int consumeNewSnapshotEvent(opcode_headers::DirectIOSystemAPIChannelOpcodeNewSnapshotHeader *header, + // Manage the creation of a snapshot + int consumeNewSnapshotEvent(opcode_headers::DirectIOSystemAPIChannelOpcodeNDGSnapshotHeader *header, void *concatenated_unique_id_memory, uint32_t concatenated_unique_id_memory_size, DirectIOSystemAPISnapshotResult *api_result); + + // Manage the delete operation on an existing snapshot + int consumeDeleteSnapshotEvent(opcode_headers::DirectIOSystemAPIChannelOpcodeNDGSnapshotHeader *header, + DirectIOSystemAPISnapshotResult *api_result); + + // Return the dataset for a producerkey ona specific snapshot + int consumeGetDatasetSnapshotEvent(opcode_headers::DirectIOSystemAPIChannelOpcodeNDGSnapshotHeader *header, + const std::string& producer_id, + DirectIOSystemAPIGetDatasetSnapshotResult *api_result); //async central timer hook void timeout(); public: diff --git a/ChaosDataService/db_system/DBDriver.h b/ChaosDataService/db_system/DBDriver.h index 5c5970e09..26e4cf0a2 100644 --- a/ChaosDataService/db_system/DBDriver.h +++ b/ChaosDataService/db_system/DBDriver.h @@ -208,7 +208,8 @@ namespace chaos { \param _query the query \param index_cursor paged cursor for retrieve the result */ - virtual int idxStartSearchDataPack(const chaos::data_service::db_system::DataPackIndexQuery& _query, DBIndexCursor **index_cursor) = 0; + virtual int idxStartSearchDataPack(const chaos::data_service::db_system::DataPackIndexQuery& _query, + DBIndexCursor **index_cursor) = 0; //! Create a new snapshot /*! @@ -247,6 +248,30 @@ namespace chaos { virtual int snapshotIncrementJobCounter(const std::string& working_job_unique_id, const std::string& snapshot_name, bool add) = 0; + + //! get the dataset from a snapshot + /*! + Return the dataset asociated to a prducer key from a determinated + snapshot + \param snapshot_name the name of the snapshot to delete + \param producer_unique_key the unique key of the producer + \param dataset_type the type of the dataset, refer to @DataPackCommonKey::DPCK_DATASET_TYPE field of the dataset + \param channel_data the data of the channel; + \param channel_data_size the size of the channel data + */ + virtual int snapshotGetDatasetForProducerKey(const std::string& snapshot_name, + const std::string& producer_unique_key, + const std::string& dataset_type, + void **channel_data, + uint32_t& channel_data_size) = 0; + + //! Delete a snapshot where no job is working + /*! + Delete the snapshot and all dataset associated to it + \param snapshot_name the name of the snapshot to delete + */ + virtual int snapshotDeleteWithName(const std::string& snapshot_name) = 0; + }; } } diff --git a/ChaosDataService/db_system/MongoDBDriver.cpp b/ChaosDataService/db_system/MongoDBDriver.cpp index 8fcea9764..a12df3f5d 100644 --- a/ChaosDataService/db_system/MongoDBDriver.cpp +++ b/ChaosDataService/db_system/MongoDBDriver.cpp @@ -81,13 +81,13 @@ void MongoDBDriver::init(void *init_data) throw (chaos::CException) { index_on_domain = BSON(MONGO_DB_FIELD_SNAPSHOT_NAME<< 1); err = ha_connection_pool->ensureIndex(db_name, MONGO_DB_COLLECTION_SNAPSHOT, index_on_domain, true, "", true); if(err) throw chaos::CException(-1, "Error creating snapshot collection index", __PRETTY_FUNCTION__); - + index_on_domain = BSON(MONGO_DB_FIELD_SNAPSHOT_DATA_SNAPSHOT_NAME << 1 << MONGO_DB_FIELD_JOB_WORK_UNIQUE_CODE << 1 << MONGO_DB_FIELD_SNAPSHOT_DATA_PRODUCER_ID << 1); err = ha_connection_pool->ensureIndex(db_name, MONGO_DB_COLLECTION_SNAPSHOT_DATA, index_on_domain, true, "", true); if(err) throw chaos::CException(-1, "Error creating snapshot data collection index", __PRETTY_FUNCTION__); - + } //!deinit @@ -176,8 +176,8 @@ int MongoDBDriver::vfsDomainHeartBeat(vfs::VFSDomain domain) { //! Register a new data block wrote on stage area int MongoDBDriver::vfsAddNewDataBlock(chaos_vfs::VFSFile *vfs_file, - chaos_vfs::DataBlock *data_block, - vfs::data_block_state::DataBlockState new_block_state) { + chaos_vfs::DataBlock *data_block, + vfs::data_block_state::DataBlockState new_block_state) { int err = 0; bool f_exists = false; @@ -254,7 +254,7 @@ int MongoDBDriver::vfsAddNewDataBlock(chaos_vfs::VFSFile *vfs_file, //! Delete a virtual file datablock int MongoDBDriver::vfsDeleteDataBlock(chaos_vfs::VFSFile *vfs_file, - chaos_vfs::DataBlock *data_block) { + chaos_vfs::DataBlock *data_block) { int err = 0; mongo::BSONObjBuilder file_search; mongo::BSONObj file_search_result; @@ -291,8 +291,8 @@ int MongoDBDriver::vfsDeleteDataBlock(chaos_vfs::VFSFile *vfs_file, //! Set the state for a stage datablock int MongoDBDriver::vfsSetStateOnDataBlock(chaos_vfs::VFSFile *vfs_file, - chaos_vfs::DataBlock *data_block, - int state) { + chaos_vfs::DataBlock *data_block, + int state) { int err = 0; mongo::BSONObjBuilder bson_search; mongo::BSONObjBuilder bson_block_query; @@ -336,10 +336,10 @@ int MongoDBDriver::vfsSetStateOnDataBlock(chaos_vfs::VFSFile *vfs_file, //! Set the state for a stage datablock int MongoDBDriver::vfsSetStateOnDataBlock(chaos_vfs::VFSFile *vfs_file, - chaos_vfs::DataBlock *data_block, - int cur_state, - int new_state, - bool& success) { + chaos_vfs::DataBlock *data_block, + int cur_state, + int new_state, + bool& success) { int err = 0; mongo::BSONObjBuilder command; mongo::BSONObjBuilder query_master; @@ -402,7 +402,7 @@ int MongoDBDriver::vfsSetHeartbeatOnDatablock(chaos_vfs::VFSFile *vfs_file, DEBUG_CODE(MDBID_LDBG_ << "vfsSetHeartbeatOnDatablock query ---------------------------------------------";) DEBUG_CODE(MDBID_LDBG_ << "Query: " << q.jsonString();) DEBUG_CODE(MDBID_LDBG_ << "vfsSetHeartbeatOnDatablock query ---------------------------------------------";) - + if(err) { MDBID_LERR_ << "Error " << err << " updating state on datablock"; } @@ -485,7 +485,7 @@ int MongoDBDriver::vfsFindSinceTimeDataBlock(chaos_vfs::VFSFile *vfs_file, //search on file path, the datablock is always the end token of the path query_master << MONGO_DB_FIELD_DATA_BLOCK_VFS_PATH << BSON("$regex" << boost::str(boost::format("%1%%2%") % vfs_file->getVFSFileInfo()->vfs_fpath % ".*")); - + //search for state query_master << MONGO_DB_FIELD_DATA_BLOCK_STATE << state; //search on the timestamp @@ -495,7 +495,7 @@ int MongoDBDriver::vfsFindSinceTimeDataBlock(chaos_vfs::VFSFile *vfs_file, DEBUG_CODE(MDBID_LDBG_ << "vfsFindSinceTimeDataBlock query ---------------------------------------------";) DEBUG_CODE(MDBID_LDBG_ << "Query: " << q.jsonString();) DEBUG_CODE(MDBID_LDBG_ << "vfsFindSinceTimeDataBlock query ---------------------------------------------";) - + err = ha_connection_pool->findOne(result, MONGO_DB_COLLECTION_NAME(db_name, MONGO_DB_COLLECTION_VFS_VBLOCK), q); if(err) { @@ -615,10 +615,10 @@ int MongoDBDriver::vfsGetFilePathForDomain(const std::string& vfs_domain, _prefix_filter.append(".*"); query_master << "$query" << BSON(MONGO_DB_FIELD_FILE_VFS_DOMAIN << vfs_domain << MONGO_DB_FIELD_FILE_VFS_PATH << BSON("$regex" << _prefix_filter)) - << "$orderby" << BSON(MONGO_DB_FIELD_DATA_BLOCK_CREATION_TS << 1); + << "$orderby" << BSON(MONGO_DB_FIELD_DATA_BLOCK_CREATION_TS << 1); //search for domain //query_master << MONGO_DB_FIELD_FILE_VFS_DOMAIN << vfs_domain; - + //query_master << MONGO_DB_FIELD_FILE_VFS_PATH << BSON("$regex" << _prefix_filter); @@ -721,7 +721,7 @@ int MongoDBDriver::idxSetDataPackIndexStateByDataBlock(const std::string& vfs_da if(err) { MDBID_LERR_ << "Error " << err << " updating state on all datablock index"; } - + } catch( const mongo::DBException &e ) { MDBID_LERR_ << e.what(); err = -1; @@ -751,7 +751,7 @@ int MongoDBDriver::idxSearchResultCountDataPack(const DataPackIndexQuery& data_p index_search_builder << MONGO_DB_FIELD_IDX_DATA_PACK_DID << data_pack_index_query.did; index_search_builder << MONGO_DB_FIELD_IDX_DATA_PACK_STATE << (int32_t)DataPackIndexQueryStateQuerable; //select only querable indexes index_search_builder << MONGO_DB_FIELD_IDX_DATA_PACK_ACQ_TS_NUMERIC << BSON("$gte" << (long long)data_pack_index_query.start_ts << - "$lte" << (long long)data_pack_index_query.end_ts); + "$lte" << (long long)data_pack_index_query.end_ts); mongo::BSONObj q = index_search_builder.obj(); DEBUG_CODE(MDBID_LDBG_ << "idxSearchResultCountDataPack insert ---------------------------------------------";) @@ -784,17 +784,17 @@ int MongoDBDriver::idxSearchDataPack(const DataPackIndexQuery& data_pack_index_q //set the field to return return_field << MONGO_DB_FIELD_IDX_DATA_PACK_DATA_BLOCK_DST_DOMAIN << 1 - << MONGO_DB_FIELD_IDX_DATA_PACK_DATA_BLOCK_DST_PATH << 1 - << MONGO_DB_FIELD_IDX_DATA_PACK_DATA_BLOCK_DST_OFFSET << 1 - << MONGO_DB_FIELD_IDX_DATA_PACK_SIZE << 1 - << MONGO_DB_FIELD_IDX_DATA_PACK_ACQ_TS_NUMERIC << 1; + << MONGO_DB_FIELD_IDX_DATA_PACK_DATA_BLOCK_DST_PATH << 1 + << MONGO_DB_FIELD_IDX_DATA_PACK_DATA_BLOCK_DST_OFFSET << 1 + << MONGO_DB_FIELD_IDX_DATA_PACK_SIZE << 1 + << MONGO_DB_FIELD_IDX_DATA_PACK_ACQ_TS_NUMERIC << 1; mongo::BSONObj q = index_search_builder.obj(); mongo::BSONObj r = return_field.obj(); DEBUG_CODE(MDBID_LDBG_ << "idxDeleteDataPackIndex insert ---------------------------------------------";) DEBUG_CODE(MDBID_LDBG_ << "query: " << q.jsonString()); DEBUG_CODE(MDBID_LDBG_ << "idxDeleteDataPackIndex insert ---------------------------------------------";) - + ha_connection_pool->findN(found_element, MONGO_DB_COLLECTION_NAME(db_name, MONGO_DB_COLLECTION_IDX_DATA_PACK), q, limit_to, 0, &r); } catch( const mongo::DBException &e ) { MDBID_LERR_ << e.what(); @@ -815,13 +815,13 @@ int MongoDBDriver::idxMaxAndMInimumTimeStampForDataPack(const DataPackIndexQuery index_search_builder << MONGO_DB_FIELD_IDX_DATA_PACK_DID << data_pack_index_query.did; index_search_builder << MONGO_DB_FIELD_IDX_DATA_PACK_STATE << (int32_t)DataPackIndexQueryStateQuerable; //select only querable indexes - + mongo::BSONObj p = return_field.obj(); mongo::BSONObj q = index_search_builder.obj(); DEBUG_CODE(MDBID_LDBG_ << "idxMaxAndMInimumTimeStampForDataPack insert ---------------------------------------------";) DEBUG_CODE(MDBID_LDBG_ << "query: " << q.jsonString()); DEBUG_CODE(MDBID_LDBG_ << "idxMaxAndMInimumTimeStampForDataPack insert ---------------------------------------------";) - + if((err = ha_connection_pool->findOne(r_max, MONGO_DB_COLLECTION_NAME(db_name, MONGO_DB_COLLECTION_IDX_DATA_PACK), mongo::Query(q).sort(MONGO_DB_FIELD_IDX_DATA_PACK_ACQ_TS_NUMERIC,-1), &p))){ MDBID_LERR_ << "Error getting maximum timestamp on index of " << data_pack_index_query.did; } else if((err = ha_connection_pool->findOne(r_min, MONGO_DB_COLLECTION_NAME(db_name, MONGO_DB_COLLECTION_IDX_DATA_PACK), mongo::Query(q).sort(MONGO_DB_FIELD_IDX_DATA_PACK_ACQ_TS_NUMERIC,1), &p))) { @@ -887,7 +887,7 @@ int MongoDBDriver::snapshotAddElementToSnapshot(const std::string& working_job_u search_snapshot << MONGO_DB_FIELD_SNAPSHOT_DATA_SNAPSHOT_NAME << snapshot_name; search_snapshot << MONGO_DB_FIELD_SNAPSHOT_DATA_PRODUCER_ID << producer_unique_key; search_snapshot << MONGO_DB_FIELD_JOB_WORK_UNIQUE_CODE << working_job_unique_id; - + mongo::BSONObj u = new_dataset.obj(); mongo::BSONObj q = search_snapshot.obj(); DEBUG_CODE(MDBID_LDBG_ << "snapshotCreateNewWithName insert ---------------------------------------------";) @@ -931,4 +931,52 @@ int MongoDBDriver::snapshotIncrementJobCounter(const std::string& working_job_un err = -1; } return err; +} + +//! get the dataset from a snapshot +int MongoDBDriver::snapshotGetDatasetForProducerKey(const std::string& snapshot_name, + const std::string& producer_unique_key, + const std::string& dataset_type, + void **channel_data, + uint32_t& channel_data_size) { + +} + +//! Delete a snapshot where no job is working +int MongoDBDriver::snapshotDeleteWithName(const std::string& snapshot_name) { + int err = 0; + mongo::BSONObj result; + mongo::BSONObjBuilder search_snapshot; + try{ + //search for snapshot name and producer unique key + search_snapshot << MONGO_DB_FIELD_SNAPSHOT_DATA_SNAPSHOT_NAME << snapshot_name; + + mongo::BSONObj q = search_snapshot.obj(); + DEBUG_CODE(MDBID_LDBG_ << "snapshotDeleteWithName count ---------------------------------------------";) + + DEBUG_CODE(MDBID_LDBG_ << "condition" << q;) + DEBUG_CODE(MDBID_LDBG_ << "snapshotDeleteWithName count ---------------------------------------------";) + + //update and waith until the data is on the server + if((err = ha_connection_pool->findOne(result, MONGO_DB_COLLECTION_NAME(db_name, MONGO_DB_COLLECTION_SNAPSHOT), q))) { + MDBID_LERR_ << "Errore finding the snapshot "<< snapshot_name << "with error "<<err; + return err; + } + + //the snapshot to delete is present, so we delete it + if((err = ha_connection_pool->remove(MONGO_DB_COLLECTION_NAME(db_name, MONGO_DB_COLLECTION_SNAPSHOT), q))){ + MDBID_LERR_ << "Errore deleting the snapshot "<< snapshot_name << "with error "<<err; + return err; + } + + + //no we need to delete all dataset associated to it + if((err = ha_connection_pool->remove(MONGO_DB_COLLECTION_NAME(db_name, MONGO_DB_COLLECTION_SNAPSHOT_DATA), q))){ + MDBID_LERR_ << "Errore deleting the snapshot data "<< snapshot_name << "with error "<<err; + } + } catch( const mongo::DBException &e ) { + MDBID_LERR_ << e.what(); + err = -1; + } + return err; } \ No newline at end of file diff --git a/ChaosDataService/db_system/MongoDBDriver.h b/ChaosDataService/db_system/MongoDBDriver.h index 6b61a8989..d57148a1c 100644 --- a/ChaosDataService/db_system/MongoDBDriver.h +++ b/ChaosDataService/db_system/MongoDBDriver.h @@ -155,6 +155,16 @@ namespace chaos { int snapshotIncrementJobCounter(const std::string& working_job_unique_id, const std::string& snapshot_name, bool add); + + //! get the dataset from a snapshot + virtual int snapshotGetDatasetForProducerKey(const std::string& snapshot_name, + const std::string& producer_unique_key, + const std::string& dataset_type, + void **channel_data, + uint32_t& channel_data_size); + + //! Delete a snapshot where no job is working + virtual int snapshotDeleteWithName(const std::string& snapshot_name); }; } } diff --git a/chaos/common/direct_io/channel/DirectIODeviceChannelGlobal.h b/chaos/common/direct_io/channel/DirectIODeviceChannelGlobal.h index 1119794a6..661191acf 100644 --- a/chaos/common/direct_io/channel/DirectIODeviceChannelGlobal.h +++ b/chaos/common/direct_io/channel/DirectIODeviceChannelGlobal.h @@ -48,8 +48,9 @@ namespace chaos { and collect all API for system managment */ typedef enum SystemAPIChannelOpcode { - SystemAPIChannelOpcodeNewNewSnapshotDataset = 1, /**< start new datasets snapshot creation process*/ - SystemAPIChannelOpcodeNewDeleteSnapshotDataset = 2, /**< delete the snapshot associated to the input tag */ + SystemAPIChannelOpcodeNewSnapshotDataset = 1, /**< start new datasets snapshot creation process*/ + SystemAPIChannelOpcodeDeleteSnapshotDataset = 2, /**< delete the snapshot associated to the input tag */ + SystemAPIChannelOpcodeGetSnapshotDatasetForAKey = 3, /**< return the snapshoted datasets for a determinated producer key*/ } SystemAPIChannelOpcode; } @@ -224,14 +225,16 @@ namespace chaos { #pragma mark System Channel #define SYSTEM_API_CHANNEL_NEW_Snapshot 256+4 - //! Header for the snapshot system api managment + //! Header for the snapshot system api managment for new, delete and get managment /*! - this header is usedfor the managment of the creation - of a new snapshot + this header is used for the managment of the creation, deletion and retrieve + of a snapshot the opcode associated to this header is: - SystemAPIChannelOpcodeNewSnapshotDatasetNew + - SystemAPIChannelOpcodeNewNewSnapshotDataset + - SystemAPIChannelOpcodeNewDeleteSnapshotDataset + - SystemAPIChannelOpcodeGetSnapshotDatasetForAKey */ - typedef union DirectIOSystemAPIChannelOpcodeNewSnapshotHeader { + typedef union DirectIOSystemAPIChannelOpcodeNDGSnapshotHeader { //raw data representation of the header char raw_data[SYSTEM_API_CHANNEL_NEW_Snapshot]; struct header { @@ -243,8 +246,8 @@ namespace chaos { //! passed into the data part of the direct io message uint32_t producer_key_set_len; } field; - } DirectIOSystemAPIChannelOpcodeNewSnapshotHeader, - *DirectIOSystemAPIChannelOpcodeNewSnapshotHeaderPtr; + } DirectIOSystemAPIChannelOpcodeNDGSnapshotHeader, + *DirectIOSystemAPIChannelOpcodeNDGSnapshotHeaderPtr; //!result of the new and delete api typedef struct DirectIOSystemAPISnapshotResult { @@ -258,11 +261,14 @@ namespace chaos { //! api result DirectIOSystemAPISnapshotResult api_result; - //channels - data::CDataWrapper *output_channel; - data::CDataWrapper *input_channel; - data::CDataWrapper *custom_channel; - data::CDataWrapper *system_channel; + //channels lenght + uint32_t output_channel_len; + uint32_t input_channel_len; + uint32_t custom_channel_len; + uint32_t system_channel_len; + + //!concatenated channels data in order [o,i,c,s] + void* channels_data; }DirectIOSystemAPIGetDatasetSnapshotResult, *DirectIOSystemAPIGetDatasetSnapshotResultPtr; } diff --git a/chaos/common/direct_io/channel/DirectIOSystemAPIClientChannel.cpp b/chaos/common/direct_io/channel/DirectIOSystemAPIClientChannel.cpp index 5ab9f3e56..b1eed03c2 100644 --- a/chaos/common/direct_io/channel/DirectIOSystemAPIClientChannel.cpp +++ b/chaos/common/direct_io/channel/DirectIOSystemAPIClientChannel.cpp @@ -42,8 +42,8 @@ DirectIOSystemAPIClientChannel::~DirectIOSystemAPIClientChannel() { // start a new Snapshot creation int64_t DirectIOSystemAPIClientChannel::makeNewDatasetSnapshot(const std::string& snapshot_name, - const std::vector<std::string>& producer_keys, - DirectIOSystemAPISnapshotResult **api_result_handle) { + const std::vector<std::string>& producer_keys, + DirectIOSystemAPISnapshotResult **api_result_handle) { int64_t err = 0; DirectIOSynchronousAnswer *answer = NULL; if(snapshot_name.size() > 255) { @@ -54,16 +54,17 @@ int64_t DirectIOSystemAPIClientChannel::makeNewDatasetSnapshot(const std::string DirectIODataPack *data_pack = (DirectIODataPack*)calloc(sizeof(DirectIODataPack), 1); //allocate the header - DirectIOSystemAPIChannelOpcodeNewSnapshotHeaderPtr new_snapshot_opcode_header = (DirectIOSystemAPIChannelOpcodeNewSnapshotHeaderPtr)calloc(sizeof(DirectIOSystemAPIChannelOpcodeNewSnapshotHeader), 1); + DirectIOSystemAPIChannelOpcodeNDGSnapshotHeaderPtr new_snapshot_opcode_header = + (DirectIOSystemAPIChannelOpcodeNDGSnapshotHeaderPtr)calloc(sizeof(DirectIOSystemAPIChannelOpcodeNDGSnapshotHeader), 1); //set opcode - data_pack->header.dispatcher_header.fields.channel_opcode = static_cast<uint8_t>(opcode::SystemAPIChannelOpcodeNewNewSnapshotDataset); + data_pack->header.dispatcher_header.fields.channel_opcode = static_cast<uint8_t>(opcode::SystemAPIChannelOpcodeNewSnapshotDataset); - //copy the snapshot name + //copy the snapshot name to the header std::memcpy(new_snapshot_opcode_header->field.snap_name, snapshot_name.c_str(), snapshot_name.size()); //set header - DIRECT_IO_SET_CHANNEL_HEADER(data_pack, new_snapshot_opcode_header, sizeof(DirectIOSystemAPIChannelOpcodeNewSnapshotHeader)) + DIRECT_IO_SET_CHANNEL_HEADER(data_pack, new_snapshot_opcode_header, sizeof(DirectIOSystemAPIChannelOpcodeNDGSnapshotHeader)) if(producer_keys.size()) { //we have also a set of producer key so senti it in the data part of message std::string producer_key_concatenation; @@ -103,14 +104,98 @@ int64_t DirectIOSystemAPIClientChannel::makeNewDatasetSnapshot(const std::string } //! delete the snapshot identified by name -int64_t DirectIOSystemAPIClientChannel::deleteDatasetSnapshot(const std::string& snapshot_name) { - return 0; +int64_t DirectIOSystemAPIClientChannel::deleteDatasetSnapshot(const std::string& snapshot_name, + DirectIOSystemAPISnapshotResult **api_result_handle) { + int64_t err = 0; + DirectIOSynchronousAnswer *answer = NULL; + if(snapshot_name.size() > 255) { + //bad Snapshot name size + return -1000; + } + //allocate the datapack + DirectIODataPack *data_pack = (DirectIODataPack*)calloc(sizeof(DirectIODataPack), 1); + + //allocate the header + DirectIOSystemAPIChannelOpcodeNDGSnapshotHeaderPtr new_snapshot_opcode_header = + (DirectIOSystemAPIChannelOpcodeNDGSnapshotHeaderPtr)calloc(sizeof(DirectIOSystemAPIChannelOpcodeNDGSnapshotHeader), 1); + + //set opcode + data_pack->header.dispatcher_header.fields.channel_opcode = static_cast<uint8_t>(opcode::SystemAPIChannelOpcodeDeleteSnapshotDataset); + + //copy the snapshot name to the header + std::memcpy(new_snapshot_opcode_header->field.snap_name, snapshot_name.c_str(), snapshot_name.size()); + + //set header + DIRECT_IO_SET_CHANNEL_HEADER(data_pack, new_snapshot_opcode_header, sizeof(DirectIOSystemAPIChannelOpcodeNDGSnapshotHeader)) + + //send data with synchronous answer flag + if((err = (int)sendPriorityData(data_pack, &answer))) { + //error getting last value + if(answer && answer->answer_data) free(answer->answer_data); + } else { + //we got answer + if(answer && answer->answer_size == sizeof(DirectIOSystemAPISnapshotResult)) { + *api_result_handle = static_cast<DirectIOSystemAPISnapshotResult*>(answer->answer_data); + (*api_result_handle)->error = FROM_LITTLE_ENDNS_NUM(int32_t, (*api_result_handle)->error); + } else { + *api_result_handle = NULL; + } + } + if(answer) free(answer); + return err; } //! get the snapshot for one or more producer key int64_t DirectIOSystemAPIClientChannel::getDatasetSnapshotForProducerKey(const std::string& snapshot_name, - const std::string& producer_key, - DirectIOSystemAPIGetDatasetSnapshotResult **api_reuslt_handle) { + const std::string& producer_key, + DirectIOSystemAPIGetDatasetSnapshotResult **api_result_handle) { + int64_t err = 0; + DirectIOSynchronousAnswer *answer = NULL; + if(snapshot_name.size() > 255) { + //bad Snapshot name size + return -1000; + } + //allocate the datapack + DirectIODataPack *data_pack = (DirectIODataPack*)calloc(sizeof(DirectIODataPack), 1); + + //allocate the header + DirectIOSystemAPIChannelOpcodeNDGSnapshotHeaderPtr get_snapshot_opcode_header = + (DirectIOSystemAPIChannelOpcodeNDGSnapshotHeaderPtr)calloc(sizeof(DirectIOSystemAPIChannelOpcodeNDGSnapshotHeader), 1); + + //set opcode + data_pack->header.dispatcher_header.fields.channel_opcode = static_cast<uint8_t>(opcode::SystemAPIChannelOpcodeGetSnapshotDatasetForAKey); + + //copy the snapshot name to the header + std::memcpy(get_snapshot_opcode_header->field.snap_name, snapshot_name.c_str(), snapshot_name.size()); + + //set header + DIRECT_IO_SET_CHANNEL_HEADER(data_pack, get_snapshot_opcode_header, sizeof(DirectIOSystemAPIChannelOpcodeNDGSnapshotHeader)) + if(producer_key.size()) { + + //set the header field for the producer concatenation string + get_snapshot_opcode_header->field.producer_key_set_len = TO_LITTE_ENDNS_NUM(uint32_t, (uint32_t)producer_key.size()); + + //copy the memory for forwarding buffer + void * producer_key_send_buffer = malloc(producer_key.size()); + std::memcpy(producer_key_send_buffer, producer_key.c_str(), producer_key.size()); + //set as data + DIRECT_IO_SET_CHANNEL_DATA(data_pack, producer_key_send_buffer, (uint32_t)producer_key.size()); + } + //send data with synchronous answer flag + if((err = (int)sendPriorityData(data_pack, &answer))) { + //error getting last value + if(answer && answer->answer_data) free(answer->answer_data); + } else { + //we got answer + if(answer && answer->answer_size == sizeof(DirectIOSystemAPISnapshotResult)) { + *api_result_handle = static_cast<DirectIOSystemAPIGetDatasetSnapshotResult*>(answer->answer_data); + (*api_result_handle)->api_result.error = FROM_LITTLE_ENDNS_NUM(int32_t, (*api_result_handle)->api_result.error); + } else { + *api_result_handle = NULL; + } + } + if(answer) free(answer); + return err; return 0; } @@ -120,7 +205,9 @@ void DirectIOSystemAPIClientChannel::DirectIOSystemAPIClientChannelDeallocator:: switch(free_info_ptr->sent_part) { case DisposeSentMemoryInfo::SentPartHeader:{ switch(static_cast<opcode::SystemAPIChannelOpcode>(free_info_ptr->sent_opcode)) { - case opcode::SystemAPIChannelOpcodeNewNewSnapshotDataset: + case opcode::SystemAPIChannelOpcodeNewSnapshotDataset: + case opcode::SystemAPIChannelOpcodeDeleteSnapshotDataset: + case opcode::SystemAPIChannelOpcodeGetSnapshotDatasetForAKey: free(sent_data_ptr); break; default: @@ -131,7 +218,8 @@ void DirectIOSystemAPIClientChannel::DirectIOSystemAPIClientChannelDeallocator:: case DisposeSentMemoryInfo::SentPartData: { switch(static_cast<opcode::SystemAPIChannelOpcode>(free_info_ptr->sent_opcode)) { - case opcode::SystemAPIChannelOpcodeNewNewSnapshotDataset: + case opcode::SystemAPIChannelOpcodeNewSnapshotDataset: + case opcode::SystemAPIChannelOpcodeGetSnapshotDatasetForAKey: free(sent_data_ptr); break; default: diff --git a/chaos/common/direct_io/channel/DirectIOSystemAPIClientChannel.h b/chaos/common/direct_io/channel/DirectIOSystemAPIClientChannel.h index 232c97808..87bc2ac4e 100644 --- a/chaos/common/direct_io/channel/DirectIOSystemAPIClientChannel.h +++ b/chaos/common/direct_io/channel/DirectIOSystemAPIClientChannel.h @@ -70,7 +70,8 @@ namespace chaos { Delete the entry of the snapshot and all dataset associated to it \param snapshot_name the name of the snapshot to delete */ - int64_t deleteDatasetSnapshot(const std::string& snapshot_name); + int64_t deleteDatasetSnapshot(const std::string& snapshot_name, + DirectIOSystemAPISnapshotResult **api_result_handle); //! get the snapshot for one or more producer key /*! @@ -81,7 +82,7 @@ namespace chaos { */ int64_t getDatasetSnapshotForProducerKey(const std::string& snapshot_name, const std::string& producer_key, - DirectIOSystemAPIGetDatasetSnapshotResult **api_reuslt_handle); + DirectIOSystemAPIGetDatasetSnapshotResult **api_result_handle); }; } } diff --git a/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.cpp b/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.cpp index 87feee1c3..5bea5ea04 100644 --- a/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.cpp +++ b/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.cpp @@ -47,13 +47,13 @@ int DirectIOSystemAPIServerChannel::consumeDataPack(DirectIODataPack *dataPack, opcode::SystemAPIChannelOpcode channel_opcode = static_cast<opcode::SystemAPIChannelOpcode>(dataPack->header.dispatcher_header.fields.channel_opcode); switch (channel_opcode) { - case opcode::SystemAPIChannelOpcodeNewNewSnapshotDataset: { + case opcode::SystemAPIChannelOpcodeNewSnapshotDataset: { //set the answer pointer synchronous_answer->answer_data = std::calloc(sizeof(DirectIOSystemAPISnapshotResult), 1); synchronous_answer->answer_size = sizeof(DirectIOSystemAPISnapshotResult); //get the header - opcode_headers::DirectIOSystemAPIChannelOpcodeNewSnapshotHeaderPtr header = reinterpret_cast< opcode_headers::DirectIOSystemAPIChannelOpcodeNewSnapshotHeaderPtr >(dataPack->channel_header_data); + opcode_headers::DirectIOSystemAPIChannelOpcodeNDGSnapshotHeaderPtr header = reinterpret_cast< opcode_headers::DirectIOSystemAPIChannelOpcodeNDGSnapshotHeaderPtr >(dataPack->channel_header_data); header->field.producer_key_set_len = FROM_LITTLE_ENDNS_NUM(uint32_t, header->field.producer_key_set_len); //call the handler @@ -62,10 +62,57 @@ int DirectIOSystemAPIServerChannel::consumeDataPack(DirectIODataPack *dataPack, dataPack->header.channel_data_size, (DirectIOSystemAPISnapshotResult*)synchronous_answer->answer_data); //fix endianes into api result - ((DirectIOSystemAPISnapshotResult*)synchronous_answer->answer_data)->error = TO_LITTE_ENDNS_NUM(int32_t, ((DirectIOSystemAPISnapshotResult*)synchronous_answer->answer_data)->error); + ((DirectIOSystemAPISnapshotResult*)synchronous_answer->answer_data)->error = + TO_LITTE_ENDNS_NUM(int32_t, ((DirectIOSystemAPISnapshotResult*)synchronous_answer->answer_data)->error); break; } + case opcode::SystemAPIChannelOpcodeDeleteSnapshotDataset: { + //set the answer pointer + synchronous_answer->answer_data = std::calloc(sizeof(DirectIOSystemAPISnapshotResult), 1); + synchronous_answer->answer_size = sizeof(DirectIOSystemAPISnapshotResult); + + //get the header + opcode_headers::DirectIOSystemAPIChannelOpcodeNDGSnapshotHeaderPtr header = reinterpret_cast< opcode_headers::DirectIOSystemAPIChannelOpcodeNDGSnapshotHeaderPtr >(dataPack->channel_header_data); + header->field.producer_key_set_len = FROM_LITTLE_ENDNS_NUM(uint32_t, header->field.producer_key_set_len); + + //call the handler + handler->consumeDeleteSnapshotEvent(header, + (DirectIOSystemAPISnapshotResult*)synchronous_answer->answer_data); + //fix endianes into api result + ((DirectIOSystemAPISnapshotResult*)synchronous_answer->answer_data)->error = + TO_LITTE_ENDNS_NUM(int32_t, ((DirectIOSystemAPISnapshotResult*)synchronous_answer->answer_data)->error); + break; + } + + case opcode::SystemAPIChannelOpcodeGetSnapshotDatasetForAKey: { + std::string producer_key; + //set the answer pointer + synchronous_answer->answer_data = std::calloc(sizeof(DirectIOSystemAPIGetDatasetSnapshotResult), 1); + synchronous_answer->answer_size = sizeof(DirectIOSystemAPIGetDatasetSnapshotResult); + + //get the header + opcode_headers::DirectIOSystemAPIChannelOpcodeNDGSnapshotHeaderPtr header = reinterpret_cast< opcode_headers::DirectIOSystemAPIChannelOpcodeNDGSnapshotHeaderPtr >(dataPack->channel_header_data); + header->field.producer_key_set_len = FROM_LITTLE_ENDNS_NUM(uint32_t, header->field.producer_key_set_len); + + //chec if a producere key has been forwarded + if(!dataPack->header.channel_data_size) { + //set error + producer_key.assign((const char*)dataPack->channel_data, dataPack->header.channel_data_size); + //delete the memory where is located producer key + free(dataPack->channel_data); + } + + + //call the handler + handler->consumeGetDatasetSnapshotEvent(header, + producer_key, + (DirectIOSystemAPIGetDatasetSnapshotResult*)synchronous_answer->answer_data); + //fix endianes into api result + ((DirectIOSystemAPIGetDatasetSnapshotResult*)synchronous_answer->answer_data)->api_result.error = + TO_LITTE_ENDNS_NUM(int32_t, ((DirectIOSystemAPIGetDatasetSnapshotResult*)synchronous_answer->answer_data)->api_result.error); + break; + } default: break; } diff --git a/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.h b/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.h index 04474f9da..2f1d3101a 100644 --- a/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.h +++ b/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.h @@ -44,21 +44,44 @@ namespace chaos { //! System API DirectIO server handler typedef class DirectIOSystemAPIServerChannelHandler { public: - //! Receive the SystemAPIChannelOpcodeNewSnapshotDatasetNew opcode + //! Manage the creation of a snapshot /*! The creation for a new snapshot has been requested, all information - on the live cache will be stored into database layer creating a + on the live cache will be stored into database layer creating a reference to this snapshot. \param header header of the new snapshot api \param snapped_producer_key the list of the producer, identfied the - unique key, to include into the snaphsoot + unique key, to include into the snaphsoot + \param api_result the result of the api + \return error on the forwading of the event */ - virtual int consumeNewSnapshotEvent(opcode_headers::DirectIOSystemAPIChannelOpcodeNewSnapshotHeader *header, + virtual int consumeNewSnapshotEvent(opcode_headers::DirectIOSystemAPIChannelOpcodeNDGSnapshotHeader *header, void *concatenated_unique_id_memory, uint32_t concatenated_unique_id_memory_size, DirectIOSystemAPISnapshotResult *api_result) {DELETE_HEADER_DATA(header, concatenated_unique_id_memory) return 0;}; + //! Manage the delete operation on an existing snapshot + /*! + Perform the delete operation on the snpashot and all dataset associated to it. + \param header of the snapshot to delete + \param api_result the result of the api + \return error on the forwading of the event + */ + virtual int consumeDeleteSnapshotEvent(opcode_headers::DirectIOSystemAPIChannelOpcodeNDGSnapshotHeader *header, + DirectIOSystemAPISnapshotResult *api_result) + {DELETE_HEADER(header) return 0;}; + + //! Return the dataset for a producerkey ona specific snapshot + /*! + \param header of the snapshot where to fetch the dataasets + \param producer_id is the identification of the producre of the returning datasets + \return error on the forwading of the event + */ + virtual int consumeGetDatasetSnapshotEvent(opcode_headers::DirectIOSystemAPIChannelOpcodeNDGSnapshotHeader *header, + const std::string& producer_id, + DirectIOSystemAPIGetDatasetSnapshotResult *api_result) + {DELETE_HEADER(header) return 0;}; } DirectIOSystemAPIServerChannelHandler; void setHandler(DirectIOSystemAPIServerChannelHandler *_handler); -- GitLab