From b5612e4f272be88e99fd65d4e1655290c512ca89 Mon Sep 17 00:00:00 2001
From: Claudio Bisegni <Claudio.Bisegni@lnf.infn.it>
Date: Fri, 21 Nov 2014 19:14:20 +0100
Subject: [PATCH] The snapshot creation has been done - a new utility
 ChaosSnapshotUtility as been created (now permit only to create a snapshot on
 all or a subset of producer id)

---
 ChaosDataService/ChaosDataService.cpp         |  2 +-
 ChaosDataService/QueryDataConsumer.cpp        | 46 +++++++++++--------
 ChaosDataService/QueryDataConsumer.h          | 10 ++--
 ChaosDataService/db_system/MongoDBDriver.cpp  |  9 +++-
 .../worker/SnapshotCreationWorker.cpp         | 32 +++++++------
 .../worker/SnapshotCreationWorker.h           | 11 +++--
 ChaosSnapshotUtility/main.cpp                 | 24 +++++++++-
 chaos/common/chaos_constants.h                |  8 ++--
 .../channel/DirectIODeviceChannelGlobal.h     |  8 +---
 .../channel/DirectIODeviceClientChannel.cpp   |  2 +-
 .../DirectIOSystemAPIClientChannel.cpp        |  8 ++--
 .../DirectIOSystemAPIServerChannel.cpp        | 24 +++-------
 .../channel/DirectIOSystemAPIServerChannel.h  |  7 +--
 13 files changed, 109 insertions(+), 82 deletions(-)

diff --git a/ChaosDataService/ChaosDataService.cpp b/ChaosDataService/ChaosDataService.cpp
index 4bbcd6b27..0f530d1fb 100644
--- a/ChaosDataService/ChaosDataService.cpp
+++ b/ChaosDataService/ChaosDataService.cpp
@@ -178,7 +178,7 @@ void ChaosDataService::init(void *init_data)  throw(CException) {
 		if(run_mode == QUERY ||
 		   run_mode == BOTH) {
 			CDSLAPP_ << "Allocate the Query Data Consumer";
-			data_consumer.reset(new QueryDataConsumer(vfs_file_manager.get()), "QueryDataConsumer");
+			data_consumer.reset(new QueryDataConsumer(vfs_file_manager.get(), db_driver_ptr), "QueryDataConsumer");
 			if(!data_consumer.get()) throw chaos::CException(-1, "Error instantiating data consumer", __PRETTY_FUNCTION__);
 			data_consumer->settings = &settings;
 			data_consumer->network_broker = network_broker.get();
diff --git a/ChaosDataService/QueryDataConsumer.cpp b/ChaosDataService/QueryDataConsumer.cpp
index b2381a131..a218a148f 100644
--- a/ChaosDataService/QueryDataConsumer.cpp
+++ b/ChaosDataService/QueryDataConsumer.cpp
@@ -38,15 +38,13 @@ using namespace chaos::common::direct_io::channel;
 #define QDCDBG_ LDBG_ << QueryDataConsumer_LOG_HEAD << __FUNCTION__ << " - "
 #define QDCERR_ LERR_ << QueryDataConsumer_LOG_HEAD << __FUNCTION__ << "(" << __LINE__ << ") - "
 
-QueryDataConsumer::QueryDataConsumer(vfs::VFSManager *_vfs_manager_instance):
+QueryDataConsumer::QueryDataConsumer(vfs::VFSManager *_vfs_manager_instance,
+									 db_system::DBDriver *_db_driver):
 vfs_manager_instance(_vfs_manager_instance),
-query_engine(NULL){
-	
-}
+db_driver(_db_driver),
+query_engine(NULL){}
 
-QueryDataConsumer::~QueryDataConsumer() {
-	
-}
+QueryDataConsumer::~QueryDataConsumer() {}
 
 void QueryDataConsumer::init(void *init_data) throw (chaos::CException) {
 	if(!settings)  throw chaos::CException(-1, "No setting provided", __FUNCTION__);
@@ -109,7 +107,7 @@ void QueryDataConsumer::init(void *init_data) throw (chaos::CException) {
 	QDCAPP_ << "Allocating Snapshoot worker";
 	if(!settings->cache_only) {
 		snapshot_data_worker = new chaos::data_service::worker::SnapshotCreationWorker(cache_impl_name,
-																					   db_impl_name,
+																					   db_driver,
 																					   network_broker);
 		if(!snapshot_data_worker) throw chaos::CException(-5, "Error allocating snapshot worker", __FUNCTION__);
 		chaos::utility::StartableService::initImplementation(snapshot_data_worker, init_data, "SnapshotCreationWorker", __PRETTY_FUNCTION__);
@@ -251,21 +249,25 @@ int QueryDataConsumer::consumeGetEvent(DirectIODeviceChannelHeaderGetOpcode *hea
 									   void *channel_data,
 									   uint32_t channel_data_len,
 									   DirectIOSynchronousAnswerPtr synchronous_answer) {
-	return cache_driver->getData(channel_data,
+	int err = cache_driver->getData(channel_data,
 								 channel_data_len,
 								 &synchronous_answer->answer_data,
 								 synchronous_answer->answer_size);
+	if(channel_data) free(channel_data);
+	if(header) free(header);
+	return err;
 }
 
 #pragma mark DirectIOSystemAPIServerChannelHandler
 int QueryDataConsumer::consumeNewSnapshotEvent(opcode_headers::DirectIOSystemAPIChannelOpcodeNewSnapshootHeader *header,
-											   const std::vector<std::string>& snapped_producer_key,
-											   DirectIOSystemAPINewSnapshootResult& api_result) {
+											   void *concatenated_unique_id_memory,
+											   uint32_t concatenated_unique_id_memory_size,
+											   DirectIOSystemAPINewSnapshootResult *api_result) {
 	//check if we can work
-	if(!settings->cache_only) {
+	if(settings->cache_only) {
 		//data service is in cache only mode throw the error
-		api_result.result_field.error = -1000;
-		std::strcpy(api_result.result_field.error_message, "Chaos Data Service is in cache only");
+		api_result->error = -1000;
+		std::strcpy(api_result->error_message, "Chaos Data Service is in cache only");
 		//delete header
 		if(header) free(header);
 		return 0;
@@ -276,17 +278,21 @@ int QueryDataConsumer::consumeNewSnapshotEvent(opcode_headers::DirectIOSystemAPI
 	//copy snapshot name
 	job->snapshot_name = header->field.snap_name;
 	//! copy the vector in job configuration
-	job->produceter_unique_id_set = snapped_producer_key;
-	
+	if(concatenated_unique_id_memory_size && concatenated_unique_id_memory) {
+		job->concatenated_unique_id_memory = (char*)concatenated_unique_id_memory;
+		job->concatenated_unique_id_memory_size = concatenated_unique_id_memory_size;
+	}
 	if(!snapshot_data_worker->submitJobInfo(job)) {
 		DEBUG_CODE(QDCDBG_ << "error pushing snapshot creation job " << job->snapshot_name << " in queue");
-		api_result.result_field.error = -1001;
-		std::strcpy(api_result.result_field.error_message, "error pushing snapshot creation job in queue");
+		api_result->error = -1001;
+		std::strcpy(api_result->error_message, "error pushing snapshot creation job in queue");
+		if(concatenated_unique_id_memory) free(concatenated_unique_id_memory);
 		delete job;
 	} else {
-		api_result.result_field.error = 0;
-		std::strcpy(api_result.result_field.error_message, "Creation submitted");
+		api_result->error = 0;
+		std::strcpy(api_result->error_message, "Creation submitted");
 	}
+	if(header) free(header);
 	return 0;
 }
 
diff --git a/ChaosDataService/QueryDataConsumer.h b/ChaosDataService/QueryDataConsumer.h
index ca61630f4..ca79ab192 100644
--- a/ChaosDataService/QueryDataConsumer.h
+++ b/ChaosDataService/QueryDataConsumer.h
@@ -26,6 +26,7 @@
 #include "vfs/VFSManager.h"
 #include "worker/DataWorker.h"
 #include "cache_system/cache_system.h"
+#include "db_system/db_system.h"
 #include "query_engine/QueryEngine.h"
 
 #include <chaos/common/utility/ObjectSlot.h>
@@ -68,7 +69,7 @@ namespace chaos{
 			DirectIOSystemAPIServerChannel			*system_api_channel;
 			
 			cache_system::CacheDriver				*cache_driver;
-			
+			db_system::DBDriver						*db_driver;
 			vfs::VFSManager *vfs_manager_instance;
 			boost::atomic<uint16_t> device_data_worker_index;
 			chaos::data_service::worker::DataWorker	**device_data_worker;
@@ -94,12 +95,13 @@ namespace chaos{
 			
 			//---------------- DirectIOSystemAPIServerChannelHandler -----------------------
 			int consumeNewSnapshotEvent(opcode_headers::DirectIOSystemAPIChannelOpcodeNewSnapshootHeader *header,
-										const std::vector<std::string>& snapped_producer_key,
-										DirectIOSystemAPINewSnapshootResult& api_result);
+										void *concatenated_unique_id_memory,
+										uint32_t concatenated_unique_id_memory_size,
+										DirectIOSystemAPINewSnapshootResult *api_result);
 			//async central timer hook
 			void timeout();
         public:
-			QueryDataConsumer(vfs::VFSManager *_vfs_manager_instance);
+			QueryDataConsumer(vfs::VFSManager *_vfs_manager_instance, db_system::DBDriver *_db_driver);
             ~QueryDataConsumer();
             void init(void *init_data) throw (chaos::CException);
             void start() throw (chaos::CException);
diff --git a/ChaosDataService/db_system/MongoDBDriver.cpp b/ChaosDataService/db_system/MongoDBDriver.cpp
index 29eb9c1c8..ed92a1f22 100644
--- a/ChaosDataService/db_system/MongoDBDriver.cpp
+++ b/ChaosDataService/db_system/MongoDBDriver.cpp
@@ -845,6 +845,10 @@ int MongoDBDriver::snapshotCreateNewWithName(const std::string& snapshot_name) {
 		DEBUG_CODE(MDBID_LDBG_ << "snapshotCreateNewWithName insert ---------------------------------------------";)
 		
 		err = ha_connection_pool->insert(MONGO_DB_COLLECTION_NAME(db_name, MONGO_DB_COLLECTION_SNAPSHOT), q);
+		if(err == 11000) {
+			//already exis a snapshot with taht name so no error need to be throw
+			err = 0;
+		}
 	} catch( const mongo::DBException &e ) {
 		MDBID_LERR_ << e.what();
 		err = -1;
@@ -862,7 +866,7 @@ int MongoDBDriver::snapshotAddElementToSnapshot(const std::string& snapshot_name
 	mongo::BSONObjBuilder	new_dataset;
 	mongo::BSONObjBuilder	search_snapshot;
 	try{
-		new_dataset << dataset_type << mongo::BSONObj((const char *)data);
+		new_dataset << "$set"<< BSON(dataset_type << mongo::BSONObj((const char *)data));
 		
 		//search for snapshot name and producer unique key
 		search_snapshot << MONGO_DB_FIELD_SNAPSHOT_DATA_SNAPSHOT_NAME << snapshot_name;
@@ -875,7 +879,8 @@ int MongoDBDriver::snapshotAddElementToSnapshot(const std::string& snapshot_name
 		DEBUG_CODE(MDBID_LDBG_ << "condition" << q;)
 		DEBUG_CODE(MDBID_LDBG_ << "snapshotCreateNewWithName insert ---------------------------------------------";)
 		
-		err = ha_connection_pool->insert(MONGO_DB_COLLECTION_NAME(db_name, MONGO_DB_COLLECTION_SNAPSHOT_DATA), q);
+		//update and waith until the data is on the server
+		err = ha_connection_pool->update(MONGO_DB_COLLECTION_NAME(db_name, MONGO_DB_COLLECTION_SNAPSHOT_DATA), q, u, true, false, &mongo::WriteConcern::acknowledged);
 	} catch( const mongo::DBException &e ) {
 		MDBID_LERR_ << e.what();
 		err = -1;
diff --git a/ChaosDataService/worker/SnapshotCreationWorker.cpp b/ChaosDataService/worker/SnapshotCreationWorker.cpp
index 518001cf3..16f94d853 100644
--- a/ChaosDataService/worker/SnapshotCreationWorker.cpp
+++ b/ChaosDataService/worker/SnapshotCreationWorker.cpp
@@ -24,6 +24,7 @@
 #include <chaos/common/utility/ObjectFactoryRegister.h>
 
 #include <boost/lexical_cast.hpp>
+#include <boost/algorithm/string.hpp>
 
 using namespace chaos::common::network;
 using namespace chaos::data_service::worker;
@@ -37,10 +38,10 @@ using namespace chaos::data_service::worker;
 //------------------------------------------------------------------------------------------------------------------------
 
 SnapshotCreationWorker::SnapshotCreationWorker(const std::string& _cache_impl_name,
-											   const std::string& _db_impl_name,
+											   db_system::DBDriver	*_db_driver_ptr,
 											   NetworkBroker	*_network_broker):
 cache_impl_name(_cache_impl_name),
-db_impl_name(_db_impl_name),
+db_driver_ptr(_db_driver_ptr),
 network_broker(_network_broker),
 mds_channel(NULL){}
 
@@ -57,16 +58,12 @@ void SnapshotCreationWorker::init(void *init_data) throw (chaos::CException) {
 		
 	SCW_LAPP_ << "allocating cache driver";
 	cache_driver_ptr = chaos::ObjectFactoryRegister<cache_system::CacheDriver>::getInstance()->getNewInstanceByName(cache_impl_name);
-	
-	SCW_LAPP_ << "allocating cache driver";
-	db_driver_ptr = chaos::ObjectFactoryRegister<db_system::DBDriver>::getInstance()->getNewInstanceByName(db_impl_name);
 }
 
 void SnapshotCreationWorker::deinit() throw (chaos::CException) {
 	SCW_LAPP_ << "deallocating cache driver";
 	if(cache_driver_ptr) delete(cache_driver_ptr);
 	SCW_LAPP_ << "deallocating db driver";
-	if(db_driver_ptr) delete(db_driver_ptr);
 	
 	if(mds_channel && network_broker) {
 		network_broker->disposeMessageChannel(mds_channel);
@@ -82,14 +79,14 @@ int SnapshotCreationWorker::storeDatasetTypeInSnapsnot(const std::string& snapsh
 	void *data = NULL;
 	uint32_t data_len = 0;
 	std::string dataset_to_fetch = unique_id + dataset_type;
-	SCW_LDBG_ << "Get live data for " << dataset_to_fetch << " in channel ";
+	SCW_LDBG_ << "Get live data for " << dataset_to_fetch << " in channel";
 	if((err = cache_driver_ptr->getData((void*)dataset_to_fetch.c_str(), dataset_to_fetch.size(), &data, data_len))) {
 		SCW_LERR_<< "Error retrieving live data for " << dataset_to_fetch << " with error: " << err;
 	} else if(data) {
 		SCW_LDBG_ << "Store data on snapshot for " << dataset_to_fetch;
 		if((err = db_driver_ptr->snapshotAddElementToSnapshot(snapshot_name,
 															  unique_id,
-															  dataset_type,
+															  dataset_to_fetch,
 															  data,
 															  data_len))) {
 			SCW_LERR_<< "Error storign dataset type "<< dataset_type <<" for " << unique_id << " in snapshot " << snapshot_name << " with error: " << err;
@@ -105,9 +102,16 @@ int SnapshotCreationWorker::storeDatasetTypeInSnapsnot(const std::string& snapsh
 
 void SnapshotCreationWorker::executeJob(WorkerJobPtr job_info, void* cookie) {
 	int err = 0;
-
-	
+	std::vector<std::string> snapped_producer_keys;
 	SnapshotCreationJob *job_ptr = reinterpret_cast<SnapshotCreationJob*>(job_info);
+	//recreate the array of producer key set
+	if(job_ptr->concatenated_unique_id_memory_size) {
+		std::string concatenated_keys((const char*)job_ptr->concatenated_unique_id_memory, job_ptr->concatenated_unique_id_memory_size);
+		//split the concatenated string
+		boost::split( snapped_producer_keys, concatenated_keys, is_any_of(","), token_compress_on );
+		free(job_ptr->concatenated_unique_id_memory);
+	}
+	
 	//check what kind of push we have
 	//read lock on mantainance mutex
 	SCW_LDBG_ << "Start snapshot creation for name" << job_ptr->snapshot_name;
@@ -118,16 +122,16 @@ void SnapshotCreationWorker::executeJob(WorkerJobPtr job_info, void* cookie) {
 		SCW_LERR_<< "Error creating snapshot "<< job_ptr->snapshot_name <<" on database with error: " << err;
 	}else {
 		//get the unique id to snap
-		if(job_ptr->produceter_unique_id_set.size()) {
+		if(snapped_producer_keys.size()) {
 			SCW_LDBG_ << "make snapshot on the user producer'id set";
 		} else {
 			SCW_LDBG_ << "make snapshot on all producer key";
-			mds_channel->getAllDeviceID(job_ptr->produceter_unique_id_set);
+			mds_channel->getAllDeviceID(snapped_producer_keys);
 		}
 		
 		//scann all id
-		for (std::vector<std::string>::iterator it = job_ptr->produceter_unique_id_set.begin();
-			 it != job_ptr->produceter_unique_id_set.end();
+		for (std::vector<std::string>::iterator it = snapped_producer_keys.begin();
+			 it != snapped_producer_keys.end();
 			 it++) {
 			
 			//snap output channel
diff --git a/ChaosDataService/worker/SnapshotCreationWorker.h b/ChaosDataService/worker/SnapshotCreationWorker.h
index 9b9e6930a..dc586e526 100644
--- a/ChaosDataService/worker/SnapshotCreationWorker.h
+++ b/ChaosDataService/worker/SnapshotCreationWorker.h
@@ -42,10 +42,13 @@ namespace chaos{
 			struct SnapshotCreationJob :
 			public WorkerJob {
 				//the name of the new snapshot
-				std::string					snapshot_name;
+				std::string		snapshot_name;
 				
-				//! if empty all key need to be insert into the snapshot
-				std::vector<std::string>	produceter_unique_id_set;
+				//! buffer fileld with concatenated unique id
+				char *			concatenated_unique_id_memory;
+				
+				//the length of the buffer
+				uint32_t		concatenated_unique_id_memory_size;
 			};
 			
 			//! worker for create the snapshoot in async way
@@ -71,7 +74,7 @@ namespace chaos{
 				void executeJob(WorkerJobPtr job_info, void* cookie);
 			public:
 				SnapshotCreationWorker(const std::string& _cache_impl_name,
-									   const std::string& _db_impl_name,
+									   db_system::DBDriver	*_db_driver_ptr,
 									   chaos_network::NetworkBroker	*_network_broker);
 				~SnapshotCreationWorker();
 				void init(void *init_data) throw (chaos::CException);
diff --git a/ChaosSnapshotUtility/main.cpp b/ChaosSnapshotUtility/main.cpp
index 01e100745..cb33f5652 100644
--- a/ChaosSnapshotUtility/main.cpp
+++ b/ChaosSnapshotUtility/main.cpp
@@ -34,13 +34,14 @@ using namespace chaos;
 using namespace chaos::ui;
 
 int main(int argc, char * argv[]) {
-	std::string device_id;
+	int64_t err = 0;
+	std::vector<std::string> device_id_list;
 	std::string snap_name;
 	std::string cds_addr;
 	unsigned int operation;
 	try{
 		ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<std::string>(OPT_CDS_ADDRESS, "CDS address", &cds_addr);
-		ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<std::string>(OPT_CU_ID, "The identification string of the device to snapshuot", &device_id);
+		ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption< std::vector<std::string> >(OPT_CU_ID, "The identification string of the device to snapshuot", &device_id_list);
 		ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<std::string>(OPT_SNAP_NAME, "The name of the snapshot", &snap_name);
 		ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<unsigned int>(OPT_SNAPSHOT_OP, "Operation on snapshot [create(0), delete(1)]", 0, &operation);
 		
@@ -62,6 +63,25 @@ int main(int argc, char * argv[]) {
 		if(!snap_name.size()) throw CException(-3, "Snapshot name can't zero-length", "check param");
 		
 		SystemApiChannel *system_api_channel = LLRpcApi::getInstance()->getSystemApiClientChannel(cds_addr);
+		
+		chaos::common::direct_io::channel::opcode_headers::DirectIOSystemAPINewSnapshootResultPtr system_api_result = NULL;
+		
+		//!make snap on device
+		if(!(err = system_api_channel->system_api_channel->makeNewDatasetSnapshot(snap_name,
+																				  device_id_list,
+																				  &system_api_result))){
+			if(system_api_result) {
+				std::cout << "Snapshot creation report: " << std::endl;
+				std::cout << "Error code:" << system_api_result->error << std::endl;
+				std::cout << "Error message:" << system_api_result->error_message << std::endl;
+				free(system_api_result);
+			} else {
+				std::cout << "no result received" << std::endl;
+			}
+		} else {
+			std::cout << "Error executing directio call" << std::endl;
+		}
+		
 		if(system_api_channel) {
 			LLRpcApi::getInstance()->releaseSystemApyChannel(system_api_channel);
 		}
diff --git a/chaos/common/chaos_constants.h b/chaos/common/chaos_constants.h
index 4300bdc5f..c6913241f 100644
--- a/chaos/common/chaos_constants.h
+++ b/chaos/common/chaos_constants.h
@@ -534,13 +534,13 @@ namespace chaos {
 	 */
 	namespace DataPackCommonKey {
 		//!define the device unique key, this represent the primary key of the producer[string]
-		static const char * const DPCK_DEVICE_ID                       = "dpck.device_id";
+		static const char * const DPCK_DEVICE_ID                       = "dpck_device_id";
 		
 		//!this define the acquisition timestamp of the data represented by the dataset[uint64_t]
-		static const char * const DPCK_TIMESTAMP                       = "dpck.ts";
+		static const char * const DPCK_TIMESTAMP                       = "dpck_ts";
 		
 		//!define the type of the dataset [output(0) - input(1) - custom(2) - system(3) int32_t]
-		static const char * const DPCK_DATASET_TYPE                    = "dpck.ds_type";
+		static const char * const DPCK_DATASET_TYPE                    = "dpck_ds_type";
 		//! the constant that represent the output dataset type
 		static const unsigned int DPCK_DATASET_TYPE_OUTPUT             = 0;
 		//! the constant that represent the input dataset type
@@ -560,7 +560,7 @@ namespace chaos {
     //! Namespace for standard constant used for output attribute of a producer
     namespace DataPackOutputKey {
         //!this define key associated to the trigger
-        static const char * const DPOK_TRIGGER_CODE                   = "dpok.trigger_key";
+        static const char * const DPOK_TRIGGER_CODE                   = "dpok_trigger_key";
     }
     /** @} */ // end of DataPackKey
 	
diff --git a/chaos/common/direct_io/channel/DirectIODeviceChannelGlobal.h b/chaos/common/direct_io/channel/DirectIODeviceChannelGlobal.h
index 20072d280..617cf7459 100644
--- a/chaos/common/direct_io/channel/DirectIODeviceChannelGlobal.h
+++ b/chaos/common/direct_io/channel/DirectIODeviceChannelGlobal.h
@@ -247,15 +247,11 @@ namespace chaos {
 					*DirectIOSystemAPIChannelOpcodeNewSnapshootHeaderPtr;
 					
 					//!result
-					typedef union DirectIOSystemAPINewSnapshootResult {
-						//raw data representation of the header
-						char raw_data[256+4];
-						struct ResultFiled {
+					typedef  struct DirectIOSystemAPINewSnapshootResult {
 							int32_t		error;
 							char		error_message[256];
-						} result_field;
 					}DirectIOSystemAPINewSnapshootResult,
-					DirectIOSystemAPINewSnapshootResultPtr;
+					*DirectIOSystemAPINewSnapshootResultPtr;
                 }
 			}
 		}
diff --git a/chaos/common/direct_io/channel/DirectIODeviceClientChannel.cpp b/chaos/common/direct_io/channel/DirectIODeviceClientChannel.cpp
index d0d9b35d6..7a4a88901 100644
--- a/chaos/common/direct_io/channel/DirectIODeviceClientChannel.cpp
+++ b/chaos/common/direct_io/channel/DirectIODeviceClientChannel.cpp
@@ -90,7 +90,7 @@ int64_t DirectIODeviceClientChannel::storeAndCacheDataOutputChannel(const std::s
 int64_t DirectIODeviceClientChannel::requestLastOutputData(const std::string& key,
 														   void **result,
 														   uint32_t &size) {
-	uint64_t err = 0;
+	int64_t err = 0;
 	DirectIOSynchronousAnswer *answer = NULL;
 	DirectIODataPack *data_pack = (DirectIODataPack*)calloc(sizeof(DirectIODataPack), 1);
 	
diff --git a/chaos/common/direct_io/channel/DirectIOSystemAPIClientChannel.cpp b/chaos/common/direct_io/channel/DirectIOSystemAPIClientChannel.cpp
index 36b0b48c0..19dcb5393 100644
--- a/chaos/common/direct_io/channel/DirectIOSystemAPIClientChannel.cpp
+++ b/chaos/common/direct_io/channel/DirectIOSystemAPIClientChannel.cpp
@@ -68,11 +68,11 @@ int64_t DirectIOSystemAPIClientChannel::makeNewDatasetSnapshot(const std::string
 		//we have also a set of producer key so senti it in the data part of message
 		std::string producer_key_concatenation;
 		for(std::vector<std::string>::const_iterator it = producer_keys.begin();
-			it != producer_keys.end();
-			it++) {
+			it != producer_keys.end();) {
 			//add key
 			producer_key_concatenation.append(*it);
-			if(it != producer_keys.end()) {
+			
+			if((++it) != producer_keys.end()) {
 				producer_key_concatenation.append(",");
 			}
 		}
@@ -93,7 +93,7 @@ int64_t DirectIOSystemAPIClientChannel::makeNewDatasetSnapshot(const std::string
 		//we got answer
 		if(answer && answer->answer_size == sizeof(DirectIOSystemAPINewSnapshootResult)) {
 			*api_result_handle  = static_cast<DirectIOSystemAPINewSnapshootResult*>(answer->answer_data);
-			(*api_result_handle)->result_field.error = FROM_LITTLE_ENDNS_NUM(int32_t, (*api_result_handle)->result_field.error);
+			(*api_result_handle)->error = FROM_LITTLE_ENDNS_NUM(int32_t, (*api_result_handle)->error);
 		} else {
 			*api_result_handle = NULL;
 		}
diff --git a/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.cpp b/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.cpp
index 3448f1bc4..76712f5a2 100644
--- a/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.cpp
+++ b/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.cpp
@@ -21,7 +21,6 @@
 #include <chaos/common/utility/endianess.h>
 #include <chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.h>
 
-#include <boost/algorithm/string.hpp>
 
 namespace chaos_data = chaos::common::data;
 using namespace chaos::common::direct_io;
@@ -49,30 +48,21 @@ int DirectIOSystemAPIServerChannel::consumeDataPack(DirectIODataPack *dataPack,
 	
 	switch (channel_opcode) {
 		case opcode::SystemAPIChannelOpcodeNewNewSnapshootDataset: {
-			//passed unique produce key vector
-			std::vector<std::string> snapped_producer_keys;
-			
-			//the result of the api
-			DirectIOSystemAPINewSnapshootResult *api_result = (DirectIOSystemAPINewSnapshootResult*)std::calloc(sizeof(DirectIOSystemAPINewSnapshootResult), 1) ;
-			
 			//set the answer pointer
-			synchronous_answer->answer_data = api_result;
+			synchronous_answer->answer_data = std::calloc(sizeof(DirectIOSystemAPINewSnapshootResult), 1);
 			synchronous_answer->answer_size = sizeof(DirectIOSystemAPINewSnapshootResult);
 			
 			//get the header
 			opcode_headers::DirectIOSystemAPIChannelOpcodeNewSnapshootHeaderPtr header = reinterpret_cast< opcode_headers::DirectIOSystemAPIChannelOpcodeNewSnapshootHeaderPtr >(dataPack->channel_header_data);
 			header->field.producer_key_set_len = FROM_LITTLE_ENDNS_NUM(uint32_t, header->field.producer_key_set_len);
-			
-			if(header->field.producer_key_set_len) {
-				//recreate the array of producer key set
-				std::string concatenated_keys((const char*)dataPack->channel_data, header->field.producer_key_set_len);
-				//split the concatenated string
-				boost::split( snapped_producer_keys, concatenated_keys, is_any_of(","), token_compress_on );
-			}
+
 			//call the handler
-			handler->consumeNewSnapshotEvent(header, snapped_producer_keys, *api_result);
+			handler->consumeNewSnapshotEvent(header,
+											 dataPack->channel_data,
+											 dataPack->header.channel_data_size,
+											 (DirectIOSystemAPINewSnapshootResult*)synchronous_answer->answer_data);
 			//fix endianes into api result
-			api_result->result_field.error = TO_LITTE_ENDNS_NUM(int32_t, api_result->result_field.error);
+			((DirectIOSystemAPINewSnapshootResult*)synchronous_answer->answer_data)->error = TO_LITTE_ENDNS_NUM(int32_t, ((DirectIOSystemAPINewSnapshootResult*)synchronous_answer->answer_data)->error);
 			break;
 		}
 			
diff --git a/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.h b/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.h
index 51ef97b80..772789343 100644
--- a/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.h
+++ b/chaos/common/direct_io/channel/DirectIOSystemAPIServerChannel.h
@@ -54,9 +54,10 @@ namespace chaos {
 						unique key, to include into the snaphsoot
 						 */
 						virtual int consumeNewSnapshotEvent(opcode_headers::DirectIOSystemAPIChannelOpcodeNewSnapshootHeader *header,
-															const std::vector<std::string>& snapped_producer_key,
-															DirectIOSystemAPINewSnapshootResult& api_result)
-						{DELETE_HEADER(header) return 0;};
+															void *concatenated_unique_id_memory,
+															uint32_t concatenated_unique_id_memory_size,
+															DirectIOSystemAPINewSnapshootResult *api_result)
+						{DELETE_HEADER_DATA(header, concatenated_unique_id_memory) return 0;};
 						
 					} DirectIOSystemAPIServerChannelHandler;
 					
-- 
GitLab