From f3b018a0bce6e17610f79df22fabfd84711a836a Mon Sep 17 00:00:00 2001
From: amichelo <andrea.michelotti@lnf.infn.it>
Date: Thu, 4 Nov 2021 23:48:22 +0100
Subject: [PATCH] fix and update

---
 ChaosMetadataService/QueryDataConsumer.cpp    |  56 ++-
 ChaosMetadataService/QueryDataConsumer.h      |   2 +-
 .../QueryDataMsgPSConsumer.cpp                | 469 ++++--------------
 .../kafka/rdk/MessagePSRDKafkaConsumer.cpp    |   4 +
 4 files changed, 140 insertions(+), 391 deletions(-)

diff --git a/ChaosMetadataService/QueryDataConsumer.cpp b/ChaosMetadataService/QueryDataConsumer.cpp
index d87b5b35d..ad96dd549 100644
--- a/ChaosMetadataService/QueryDataConsumer.cpp
+++ b/ChaosMetadataService/QueryDataConsumer.cpp
@@ -59,33 +59,36 @@ using namespace chaos::common::direct_io::channel;
 
 //constructor
 QueryDataConsumer::QueryDataConsumer()
-    : server_endpoint(NULL), device_channel(NULL), system_api_channel(NULL), device_data_worker_index(0), storage_queue_push_timeout(ChaosMetadataService::getInstance()->setting.worker_setting.queue_push_timeout) {}
+    : server_endpoint(NULL), device_channel(NULL), system_api_channel(NULL), archive_workers(0),device_data_worker_index(0), storage_queue_push_timeout(ChaosMetadataService::getInstance()->setting.worker_setting.queue_push_timeout) {}
 
 QueryDataConsumer::~QueryDataConsumer() {}
 
 void QueryDataConsumer::init(void* init_data) {
   //get new chaos direct io endpoint
   server_endpoint = NetworkBroker::getInstance()->getDirectIOServerEndpoint();
-  if(server_endpoint==NULL){
-      INFO << "DirectIO disabled";
-      return;
-  }
-//  if (!server_endpoint) throw chaos::CException(-2, "Invalid server endpoint", __FUNCTION__);
-  INFO << "QueryDataConsumer initialized with endpoint " << server_endpoint->getRouteIndex();
+  archive_workers=ChaosMetadataService::getInstance()->setting.worker_setting.instances;
+
+  if (server_endpoint != NULL) {
+    //  if (!server_endpoint) throw chaos::CException(-2, "Invalid server endpoint", __FUNCTION__);
+    INFO << "QueryDataConsumer initialized with endpoint " << server_endpoint->getRouteIndex();
 
-  INFO << "Allocating DirectIODeviceServerChannel";
-  device_channel = (DirectIODeviceServerChannel*)server_endpoint->getNewChannelInstance("DirectIODeviceServerChannel");
-  if (!device_channel) throw chaos::CException(-3, "Error allocating device server channel", __FUNCTION__);
-  device_channel->setHandler(this);
+    INFO << "Allocating DirectIODeviceServerChannel";
+    device_channel = (DirectIODeviceServerChannel*)server_endpoint->getNewChannelInstance("DirectIODeviceServerChannel");
+    if (!device_channel) throw chaos::CException(-3, "Error allocating device server channel", __FUNCTION__);
+    device_channel->setHandler(this);
 
-  INFO << "Allocating DirectIOSystemAPIServerChannel";
-  system_api_channel = (DirectIOSystemAPIServerChannel*)server_endpoint->getNewChannelInstance("DirectIOSystemAPIServerChannel");
-  if (!system_api_channel) throw chaos::CException(-4, "Error allocating system api server channel", __FUNCTION__);
-  system_api_channel->setHandler(this);
+    INFO << "Allocating DirectIOSystemAPIServerChannel";
+    system_api_channel = (DirectIOSystemAPIServerChannel*)server_endpoint->getNewChannelInstance("DirectIOSystemAPIServerChannel");
+    if (!system_api_channel) throw chaos::CException(-4, "Error allocating system api server channel", __FUNCTION__);
+    system_api_channel->setHandler(this);
+  } else {
+    INFO << "DirectIO disabled";
+  }
+  INFO << "Archive workers:"<<archive_workers;
 
   //device data worker instances
   for (int idx = 0;
-       idx < ChaosMetadataService::getInstance()->setting.worker_setting.instances;
+       idx < archive_workers;
        idx++) {
     DataWorkerSharedPtr tmp;
 #if CHAOS_PROMETHEUS
@@ -111,7 +114,7 @@ void QueryDataConsumer::deinit() {
   }
 
   INFO << "Deallocating device push data worker list";
-  for (int idx = 0; idx < ChaosMetadataService::getInstance()->setting.worker_setting.instances; idx++) {
+  for (int idx = 0; idx <archive_workers; idx++) {
     INFO << "Release device worker " << idx;
     device_data_worker[idx]->stop();
     device_data_worker[idx]->deinit();
@@ -122,17 +125,20 @@ int QueryDataConsumer::consumePutEvent(const std::string&                 key,
                                        const uint8_t                      hst_tag,
                                        const ChaosStringSetConstSPtr      meta_tag_set,
                                        chaos::common::data::CDataWrapper& data_pack) {
-  int      err = 0;
-
+  int err = 0;
 
-  
   //data_pack.addInt64Value(NodeHealtDefinitionKey::NODE_HEALT_MDS_TIMESTAMP, now);
-  BufferSPtr channel_data_injected(data_pack.getBSONDataBuffer().release());
+  chaos::common::data::BufferUPtr ptr = data_pack.getBSONDataBuffer();
+  if (ptr.get() == NULL) {
+    ERR << key << " empty packet";
+    return -1;
+  }
+  BufferSPtr channel_data_injected(ptr.release());
 
   DataServiceNodeDefinitionType::DSStorageType storage_type = static_cast<DataServiceNodeDefinitionType::DSStorageType>(hst_tag);
   //! if tag is == 1 the datapack is in liveonly
 
-  if (storage_type & DataServiceNodeDefinitionType::DSStorageTypeLive) {
+  if (storage_type & DataServiceNodeDefinitionType::DSStorageTypeLive && channel_data_injected.get()) {
     //protected access to cached driver
     CacheDriver& cache_slot = DriverPoolManager::getInstance()->getCacheDrv();
     err                     = cache_slot.putData(key,
@@ -150,10 +156,10 @@ int QueryDataConsumer::consumePutEvent(const std::string&                 key,
       ERR << "Error pushing datapack into logstorage driver";
     }
   }
-  if (!err &&
-      (storage_type & (DataServiceNodeDefinitionType::DSStorageTypeHistory | DataServiceNodeDefinitionType::DSStorageTypeFile))) {
+  if (!err && (storage_type & (DataServiceNodeDefinitionType::DSStorageTypeHistory | DataServiceNodeDefinitionType::DSStorageTypeFile))) {
     //compute the index to use for the data worker
-    uint32_t index_to_use = device_data_worker_index++ % ChaosMetadataService::getInstance()->setting.worker_setting.instances;
+    uint32_t index_to_use;
+    index_to_use = device_data_worker_index++ % archive_workers;
     CHAOS_ASSERT(device_data_worker[index_to_use].get())
     //create storage job information
     auto job       = ChaosMakeSharedPtr<DeviceSharedWorkerJob>();
diff --git a/ChaosMetadataService/QueryDataConsumer.h b/ChaosMetadataService/QueryDataConsumer.h
index a099dfcd3..4abb7fb68 100644
--- a/ChaosMetadataService/QueryDataConsumer.h
+++ b/ChaosMetadataService/QueryDataConsumer.h
@@ -64,7 +64,7 @@ namespace chaos{
             boost::atomic<uint16_t>                 device_data_worker_index;
             int64_t         storage_queue_push_timeout;
             DataWorkerVec	device_data_worker;
-            
+            int archive_workers;
             //---------------- DirectIODeviceServerChannelHandler -----------------------
             protected:
              int consumePutEvent(const std::string& key,
diff --git a/ChaosMetadataService/QueryDataMsgPSConsumer.cpp b/ChaosMetadataService/QueryDataMsgPSConsumer.cpp
index 39e99de79..1b4836e40 100644
--- a/ChaosMetadataService/QueryDataMsgPSConsumer.cpp
+++ b/ChaosMetadataService/QueryDataMsgPSConsumer.cpp
@@ -48,8 +48,8 @@ namespace metadata_service {
 QueryDataMsgPSConsumer::~QueryDataMsgPSConsumer() {
 }
 */
-std::map<std::string, uint64_t>  QueryDataMsgPSConsumer::alive_map;
-boost::mutex QueryDataMsgPSConsumer::map_m;
+std::map<std::string, uint64_t> QueryDataMsgPSConsumer::alive_map;
+boost::mutex                    QueryDataMsgPSConsumer::map_m;
 QueryDataMsgPSConsumer::QueryDataMsgPSConsumer(const std::string& id)
     : groupid(id) {
   if (GlobalConfiguration::getInstance()->getConfiguration()->hasKey(InitOption::OPT_HA_ZONE_NAME)) {
@@ -60,107 +60,98 @@ QueryDataMsgPSConsumer::QueryDataMsgPSConsumer(const std::string& id)
 
   cons = chaos::common::message::MessagePSDriver::getConsumerDriver(msgbrokerdrv, groupid);
 }
-void QueryDataMsgPSConsumer::messageHandler( chaos::common::message::ele_t& data) {
+void QueryDataMsgPSConsumer::messageHandler(chaos::common::message::ele_t& data) {
+  
   try {
-  ChaosStringSetConstSPtr meta_tag_set;
-
+    
 
-  if (data.cd->hasKey(ControlUnitNodeDefinitionKey::CONTROL_UNIT_DATASET_TAG)) {
-    ChaosStringSet* tag = new ChaosStringSet();
-    tag->insert(data.cd->getStringValue(ControlUnitNodeDefinitionKey::CONTROL_UNIT_DATASET_TAG));
-    meta_tag_set.reset(tag);
-  }
-  std::string kp ;//= data.key;
+    if (data.cd.get()&&data.cd->hasKey(DataPackCommonKey::DPCK_DATASET_TYPE) && data.cd->hasKey(NodeDefinitionKey::NODE_UNIQUE_ID)) {
+      uint64_t now = TimingUtil::getTimeStamp();
 
-  //std::replace(kp.begin(), kp.end(), '.', '/');
-  //DBG<<"data from:"<<kp<<" size:"<<data.cd->getBSONRawSize();
-  if(data.cd->hasKey(DataPackCommonKey::DPCK_DATASET_TYPE)&&data.cd->hasKey(NodeDefinitionKey::NODE_UNIQUE_ID)){
-    uint64_t now = TimingUtil::getTimeStamp();
+      int pktype = data.cd->getInt32Value(DataPackCommonKey::DPCK_DATASET_TYPE);
 
-    int pktype=data.cd->getInt32Value(DataPackCommonKey::DPCK_DATASET_TYPE);
-    
-    int64_t ts=0;
-    uint32_t                st=(uint32_t)DataServiceNodeDefinitionType::DSStorageTypeLive;
-    if(data.cd->hasKey(DataServiceNodeDefinitionKey::DS_STORAGE_TYPE)){
-      st=data.cd->getInt32Value(DataServiceNodeDefinitionKey::DS_STORAGE_TYPE);
-      if(pktype!=DataPackCommonKey::DPCK_DATASET_TYPE_OUTPUT){
-          st|=(uint32_t)DataServiceNodeDefinitionType::DSStorageTypeLive;
+      int64_t  ts = 0;
+      uint32_t st = (uint32_t)DataServiceNodeDefinitionType::DSStorageTypeLive;
+      if (data.cd->hasKey(DataServiceNodeDefinitionKey::DS_STORAGE_TYPE)) {
+        st = data.cd->getInt32Value(DataServiceNodeDefinitionKey::DS_STORAGE_TYPE);
+        if (pktype != DataPackCommonKey::DPCK_DATASET_TYPE_OUTPUT) {
+          st |= (uint32_t)DataServiceNodeDefinitionType::DSStorageTypeLive;
+        }
       }
-    }
-    
-    kp=data.cd->getStringValue(NodeDefinitionKey::NODE_UNIQUE_ID)+datasetTypeToPostfix(pktype);
-    int32_t lat=0;
-    if(pktype==DataPackCommonKey::DPCK_DATASET_TYPE_LOG){
-        if(data.cd->hasKey(MetadataServerLoggingDefinitionKeyRPC::PARAM_NODE_LOGGING_LOG_TIMESTAMP)){
-          ts=data.cd->getInt64Value(MetadataServerLoggingDefinitionKeyRPC::PARAM_NODE_LOGGING_LOG_TIMESTAMP);
-          lat=now-ts;
-          if(lat>SKIP_OLDER_THAN){
-              ERR<<kp<<" log too old: "<<lat<< " ms, skipping...";
-              return ;
-          }
-          data.cd->addInt32Value(DataPackCommonKey::NODE_MDS_TIMEDIFF,lat );
 
+    //  kp          = data.cd->getStringValue(NodeDefinitionKey::NODE_UNIQUE_ID) + datasetTypeToPostfix(pktype);
+      int32_t lat = 0;
+      if (pktype == DataPackCommonKey::DPCK_DATASET_TYPE_LOG) {
+        if (data.cd->hasKey(MetadataServerLoggingDefinitionKeyRPC::PARAM_NODE_LOGGING_LOG_TIMESTAMP)) {
+          ts  = data.cd->getInt64Value(MetadataServerLoggingDefinitionKeyRPC::PARAM_NODE_LOGGING_LOG_TIMESTAMP);
+          lat = now - ts;
+          if (lat > SKIP_OLDER_THAN) {
+            ERR <<  data.key << " log too old: " << lat << " ms, skipping...";
+            return;
+          }
+          data.cd->addInt32Value(DataPackCommonKey::NODE_MDS_TIMEDIFF, lat);
         }
-    //  DBG<<"Queue:"<<CObjectProcessingPriorityQueue<CDataWrapper>::queueSize()<<" LOG:"<<data.cd->getJSONString();
-        if(CObjectProcessingPriorityQueue<CDataWrapper>::queueSize()<MAX_LOG_QUEUE){
+        //  DBG<<"Queue:"<<CObjectProcessingPriorityQueue<CDataWrapper>::queueSize()<<" LOG:"<<data.cd->getJSONString();
+        if (CObjectProcessingPriorityQueue<CDataWrapper>::queueSize() < MAX_LOG_QUEUE) {
           CDWShrdPtr ptr(data.cd.release());
-          CObjectProcessingPriorityQueue<CDataWrapper>::push(ptr,0);
+          CObjectProcessingPriorityQueue<CDataWrapper>::push(ptr, 0);
         } else {
-          ERR<<kp<<"] too many logs on queue for DB:"<<CObjectProcessingPriorityQueue<CDataWrapper>::queueSize();
+          ERR <<  data.key << "] too many logs on queue for DB:" << CObjectProcessingPriorityQueue<CDataWrapper>::queueSize();
           return;
         }
-    } else if(data.cd->hasKey(DataPackCommonKey::DPCK_TIMESTAMP)){
-        ts=data.cd->getInt64Value(DataPackCommonKey::DPCK_TIMESTAMP);
-        lat=(now-ts);
-        data.cd->addInt32Value(DataPackCommonKey::NODE_MDS_TIMEDIFF,lat );
-        if((pktype==DataPackCommonKey::DPCK_DATASET_TYPE_HEALTH)){
-          if(lat>(chaos::common::constants::HBTimersTimeoutinMSec*2)){
-         // health too old
+      } else if (data.cd->hasKey(DataPackCommonKey::DPCK_TIMESTAMP)) {
+        ts  = data.cd->getInt64Value(DataPackCommonKey::DPCK_TIMESTAMP);
+        lat = (now - ts);
+        if ((pktype == DataPackCommonKey::DPCK_DATASET_TYPE_HEALTH)) {
+          if (lat > (chaos::common::constants::HBTimersTimeoutinMSec * 2)) {
+            // health too old
             return;
           }
-        } else if((pktype==DataPackCommonKey::DPCK_DATASET_TYPE_OUTPUT)||(pktype==DataPackCommonKey::DPCK_DATASET_TYPE_INPUT)){
-              if(((st==0) || (st==DataServiceNodeDefinitionType::DSStorageTypeLive))){
-              if(lat>SKIP_OLDER_THAN){
-                 ERR<<kp<<" too old: "<<lat<< " ms, skipping...";
+        } else if ((pktype == DataPackCommonKey::DPCK_DATASET_TYPE_OUTPUT) || (pktype == DataPackCommonKey::DPCK_DATASET_TYPE_INPUT)) {
+          if (((st == 0) || (st == DataServiceNodeDefinitionType::DSStorageTypeLive))) {
+            if (lat > SKIP_OLDER_THAN) {
+              ERR <<  data.key << " too old: " << lat << " ms, skipping...";
               // output too old
-                return;
-              }
-          data.cd->removeKey(DataServiceNodeDefinitionKey::DS_STORAGE_TYPE);
-          data.cd->removeKey(DataPackCommonKey::DPCK_DATASET_TYPE);
-          
-        } 
-       
-   
+              return;
+            }
+            data.cd->removeKey(DataServiceNodeDefinitionKey::DS_STORAGE_TYPE);
+            data.cd->removeKey(DataPackCommonKey::DPCK_DATASET_TYPE);
+          }
+        }
+        data.cd->addInt32Value(DataPackCommonKey::NODE_MDS_TIMEDIFF, lat);
       }
-    }
-    
-    QueryDataConsumer::consumePutEvent(kp, (uint8_t)st, meta_tag_set, *(data.cd.get()));
-  }
-  } catch(const chaos::CException& e ){
-    ERR<<"Chaos Exception caught processing key:"<<data.key<<" ("<<data.off<<","<<data.par<<") error:"<<e.what();
-  } catch(...){
-    ERR<<"Unknown Exception caught processing key:"<<data.key<<" ("<<data.off<<","<<data.par<<")";
+      ChaosStringSetConstSPtr meta_tag_set;
 
+      if (data.cd->hasKey(ControlUnitNodeDefinitionKey::CONTROL_UNIT_DATASET_TAG)) {
+        ChaosStringSet* tag = new ChaosStringSet();
+        tag->insert(data.cd->getStringValue(ControlUnitNodeDefinitionKey::CONTROL_UNIT_DATASET_TAG));
+        meta_tag_set.reset(tag);
+      }
+      QueryDataConsumer::consumePutEvent(data.cd->getStringValue(NodeDefinitionKey::NODE_UNIQUE_ID) + datasetTypeToPostfix(pktype), (uint8_t)st, meta_tag_set, *(data.cd.get()));
+    }
+  } catch (const chaos::CException& e) {
+    ERR << "Chaos Exception caught processing key:" << data.key << " (" << data.off << "," << data.par << ") error:" << e.what();
+  } catch (...) {
+    ERR << "Unknown Exception caught processing key:" << data.key << " (" << data.off << "," << data.par << ")";
   }
 }
-  void QueryDataMsgPSConsumer::processBufferElement(chaos::common::data::CDWShrdPtr log_entry){
-      chaos::metadata_service::api::logging::SubmitEntryBase se;
+void QueryDataMsgPSConsumer::processBufferElement(chaos::common::data::CDWShrdPtr log_entry) {
+  chaos::metadata_service::api::logging::SubmitEntryBase se;
   //      DBG<<"Queue:"<<CObjectProcessingPriorityQueue<CDataWrapper>::queueSize()<<" WRITE ";
 
-      se.execute(log_entry->clone());        
-      
-  }
+  se.execute(log_entry->clone());
+}
 
-void QueryDataMsgPSConsumer::messageError( chaos::common::message::ele_t& data) {
-  ChaosStringSetConstSPtr meta_tag_set;
-      boost::mutex::scoped_lock ll(map_m);
-    std::string path=data.key;
-    std::replace(path.begin(), path.end(), '.', '/');
+void QueryDataMsgPSConsumer::messageError(chaos::common::message::ele_t& data) {
+  ChaosStringSetConstSPtr   meta_tag_set;
+  boost::mutex::scoped_lock ll(map_m);
+  std::string               path = data.key;
+  std::replace(path.begin(), path.end(), '.', '/');
 
   //  std::map<std::string, uint64_t>::iterator i=alive_map.find(path);
-    if(data.cd.get()&&data.cd->hasKey("msg")&&data.cd->hasKey("err")){
-      ERR<<"key:"<<data.key<<" ["<<path<<"] err msg:"<<data.cd->getStringValue("msg")<<" err:"<<data.cd->getInt32Value("err");
-    }
+  if (data.cd.get() && data.cd->hasKey("msg") && data.cd->hasKey("err")) {
+    ERR << "key:" << data.key << " [" << path << "] err msg:" << data.cd->getStringValue("msg") << " err:" << data.cd->getInt32Value("err");
+  }
   /*  if(i!=alive_map.end()){
       DBG<<" removing from alive list:"<<i->first;
       alive_map.erase(i);
@@ -168,14 +159,12 @@ void QueryDataMsgPSConsumer::messageError( chaos::common::message::ele_t& data)
       DBG<<path<<" is not in the alive list";
 
     }*/
-
 }
 
 void QueryDataMsgPSConsumer::init(void* init_data) {
-
   QueryDataConsumer::init(init_data);
   msgbroker = GlobalConfiguration::getInstance()->getOption<std::string>(InitOption::OPT_MSG_BROKER_SERVER);
-  DBG<<"Initialize, my broker is:"<<msgbroker;
+  DBG << "Initialize, my broker is:" << msgbroker;
 
   CObjectProcessingPriorityQueue<CDataWrapper>::init(1);
 
@@ -184,7 +173,7 @@ void QueryDataMsgPSConsumer::init(void* init_data) {
   cons->addHandler(chaos::common::message::MessagePublishSubscribeBase::ONARRIVE, boost::bind(&QueryDataMsgPSConsumer::messageHandler, this, _1));
   cons->addHandler(chaos::common::message::MessagePublishSubscribeBase::ONERROR, boost::bind(&QueryDataMsgPSConsumer::messageError, this, _1));
 
-/*
+  /*
   if (cons->setOption("auto.offset.reset", "earliest") != 0) {
     throw chaos::CException(-1, "cannot set offset:" + cons->getLastError(), __PRETTY_FUNCTION__);
   }
@@ -196,43 +185,42 @@ void QueryDataMsgPSConsumer::init(void* init_data) {
     throw chaos::CException(-1, "cannot initialize Publish Subscribe:" + cons->getLastError(), __PRETTY_FUNCTION__);
   }
 }
-void QueryDataMsgPSConsumer::subscribeProcess(int attempt){
-DBG << "Starting SubscribeProcess";
+void QueryDataMsgPSConsumer::subscribeProcess(int attempt) {
+  DBG << "Starting SubscribeProcess";
 
-api::node::NodeSearch node;
-sleep(10);
+  api::node::NodeSearch node;
+  sleep(10);
 
-while(attempt--){
-  std::vector<std::string> nodes=node.search("",(chaos::NodeType::NodeSearchType)(((int)chaos::NodeType::node_type_ceu )| ((int)chaos::NodeType::node_type_agent)| ((int)chaos::NodeType::node_type_us))); // search CEU
+  while (attempt--) {
+    std::vector<std::string> nodes = node.search("", (chaos::NodeType::NodeSearchType)(((int)chaos::NodeType::node_type_ceu) | ((int)chaos::NodeType::node_type_agent) | ((int)chaos::NodeType::node_type_us)));  // search CEU
 
-  DBG <<"] Found " << nodes.size()<< " to subscribe";
+    DBG << "] Found " << nodes.size() << " to subscribe";
 
-  for(std::vector<std::string>::iterator i=nodes.begin();i!=nodes.end();i++){
-    if(i->size()){
-      DBG <<"] Subscribing to:" << *i;
+    for (std::vector<std::string>::iterator i = nodes.begin(); i != nodes.end(); i++) {
+      if (i->size()) {
+        DBG << "] Subscribing to:" << *i;
 
-      if (cons->subscribe(*i) != 0) {
-          ERR <<" cannot subscribe to :" << *i<<" err:"<<cons->getLastError();
-                  
-      } else {
-          DBG <<"] Subscribed to:" << *i;
-          
+        if (cons->subscribe(*i) != 0) {
+          ERR << " cannot subscribe to :" << *i << " err:" << cons->getLastError();
+
+        } else {
+          DBG << "] Subscribed to:" << *i;
+        }
       }
-    }  
+    }
   }
 }
-} 
 void QueryDataMsgPSConsumer::start() {
   DBG << "Starting Msg consumer";
 
   cons->start();
-  boost::thread(&QueryDataMsgPSConsumer::subscribeProcess, this,1);
+  boost::thread(&QueryDataMsgPSConsumer::subscribeProcess, this, 1);
 
- /* std::string keysub="CHAOS_LOG";
+  /* std::string keysub="CHAOS_LOG";
   if (cons->subscribe(keysub) != 0) {
       ERR <<" cannot subscribe to :" << keysub<<" err:"<<cons->getLastError();
               
-  }*/ 
+  }*/
 }
 
 void QueryDataMsgPSConsumer::stop() {
@@ -252,49 +240,24 @@ int QueryDataMsgPSConsumer::consumeHealthDataEvent(const std::string&
                                                    const ChaosStringSetConstSPtr meta_tag_set,
                                                    BufferSPtr                    channel_data) {
   int err = 0;
-  
-    boost::mutex::scoped_lock ll(map_m);
-    
-    
-/*    bool isACUEU=(channel_data.get()==NULL)||(health_data_pack.hasKey(chaos::ControlUnitHealtDefinitionValue::CU_HEALT_OUTPUT_DATASET_PUSH_RATE));
 
-    if(isACUEU){
-      DBG << "Received healt from:"<<key<<" is:"<<((channel_data.get()==NULL)?"registration":"normal");
+  boost::mutex::scoped_lock ll(map_m);
 
-      std::string rootname = key;
-      size_t      pos      = key.find(NodeHealtDefinitionKey::HEALT_KEY_POSTFIX);
-      if (pos != std::string::npos) {
-        rootname.erase(pos, strlen(NodeHealtDefinitionKey::HEALT_KEY_POSTFIX));
-      }
-      if(alive_map.find(rootname)==alive_map.end()){
-        if (cons->subscribe(rootname) != 0) {
-              ERR <<"] cannot subscribe to :" << rootname<<" err:"<<cons->getLastError();
-              
-            } else {
-              alive_map[rootname]= TimingUtil::getTimeStamp();
-
-              DBG <<"] Subscribed to:" << rootname<<" at:"<<alive_map[rootname];
-            }
-      }
+  if (channel_data.get() == NULL || channel_data->data() == NULL) {
+    // DBG<<"Empty health for:\""<<key<<"\" registration pack";
+    if (alive_map.find(key) == alive_map.end()) {
+      if (cons->subscribe(key) != 0) {
+        ERR << "] cannot subscribe to :" << key << " err:" << cons->getLastError();
 
-      
-    }
-  }*/
-  if(channel_data.get()==NULL || channel_data->data()==NULL){
-   // DBG<<"Empty health for:\""<<key<<"\" registration pack";
-    if(alive_map.find(key)==alive_map.end()){
-        if (cons->subscribe(key) != 0) {
-              ERR <<"] cannot subscribe to :" << key<<" err:"<<cons->getLastError();
-              
-            } else {
-              alive_map[key]= TimingUtil::getTimeStamp();
+      } else {
+        alive_map[key] = TimingUtil::getTimeStamp();
 
-              DBG <<"] Subscribed to:" << key<<" at:"<<alive_map[key];
-            }
+        DBG << "] Subscribed to:" << key << " at:" << alive_map[key];
       }
+    }
     return 0;
   }
-  CDataWrapper health_data_pack((char *)channel_data->data());
+  CDataWrapper health_data_pack((char*)channel_data->data());
 
   return QueryDataConsumer::consumeHealthDataEvent(key, hst_tag, meta_tag_set, channel_data);
 }
@@ -384,229 +347,5 @@ int QueryDataMsgPSConsumer::consumeLogEntries(const std::string&       node_name
   return QueryDataConsumer::consumeLogEntries(node_name, log_entries);
 }
 
-/*
-int QueryDataMsgPSConsumer::consumeDataCloudQuery(DirectIODeviceChannelHeaderOpcodeQueryDataCloud& query_header,
-                                             const std::string& search_key,
-                                             const ChaosStringSet& meta_tags,
-                                             const ChaosStringSet& projection_keys,
-                                             const uint64_t search_start_ts,
-                                             const uint64_t search_end_ts,
-                                             SearchSequence& last_element_found_seq,
-                                             QueryResultPage& page_element_found) {
-    
-    int err = 0;
-    //execute the query
-    ObjectStorageDataAccess *obj_storage_da = DriverPoolManager::getInstance()->getObjectStorageDrv().getDataAccess<object_storage::abstraction::ObjectStorageDataAccess>();
-    if((err = obj_storage_da->findObject(search_key,
-                                         meta_tags,
-                                         projection_keys,
-                                         search_start_ts,
-                                         search_end_ts,
-                                         query_header.field.record_for_page,
-                                         page_element_found,
-                                         last_element_found_seq))) {
-        ERR << CHAOS_FORMAT("Error performing cloud query with code %1%", %err);
-    }
-    return err;
-}
-
-
-int QueryDataMsgPSConsumer::consumeDataIndexCloudQuery(opcode_headers::DirectIODeviceChannelHeaderOpcodeQueryDataCloud& query_header,
-                                                  const std::string& search_key,
-                                                  const ChaosStringSet& meta_tags,
-                                                  const ChaosStringSet& projection_keys,
-                                                  const uint64_t search_start_ts,
-                                                  const uint64_t search_end_ts,
-                                                  opcode_headers::SearchSequence& last_element_found_seq,
-                                                  opcode_headers::QueryResultPage& page_element_found) {
-    int err = 0;
-    //execute the query
-    DataSearch search;
-    search.key = search_key;
-    search.meta_tags = meta_tags;
-    search.projection_keys = projection_keys;
-    search.timestamp_from = search_start_ts;
-    search.timestamp_to = search_end_ts;
-    search.page_len = query_header.field.record_for_page;
-    ObjectStorageDataAccess *obj_storage_da = DriverPoolManager::getInstance()->getObjectStorageDrv().getDataAccess<object_storage::abstraction::ObjectStorageDataAccess>();
-    if((err = obj_storage_da->findObjectIndex(search,
-                                              page_element_found,
-                                              last_element_found_seq))) {
-        ERR << CHAOS_FORMAT("Error performing cloud query with code %1%", %err);
-    }
-    return err;
-}
-
-int QueryDataMsgPSConsumer::consumeGetEvent(chaos::common::data::BufferSPtr key_data,
-                                       uint32_t key_len,
-                                       opcode_headers::DirectIODeviceChannelHeaderGetOpcodeResult& result_header,
-                                       chaos::common::data::BufferSPtr& result_value) {
-    int err = 0;
-    std::string key(key_data->data(),
-                    key_data->size());
-    //debug check
-    //protected access to cached driver
-    CacheDriver& cache_slot = DriverPoolManager::getInstance()->getCacheDrv();
-    
-    //get data
-    err = cache_slot.getData(key,
-                             result_value);
-    if((err == 0 )&&
-       result_value &&
-       result_value->size()) {
-        result_header.value_len = (uint32_t)result_value->size();
-    }
-    return err;
-}
-
-int QueryDataMsgPSConsumer::consumeGetEvent(opcode_headers::DirectIODeviceChannelHeaderMultiGetOpcode& header,
-                                       const ChaosStringVector& keys,
-                                       opcode_headers::DirectIODeviceChannelHeaderMultiGetOpcodeResult& result_header,
-                                       BufferSPtr& result_value,
-                                       uint32_t& result_value_len) {
-    int err = 0;
-    //debug check
-    //protected access to cached driver
-    CacheDriver& cache_slot = DriverPoolManager::getInstance()->getCacheDrv();
-    try{
-        //get data
-        DataBuffer data_buffer;
-        MultiCacheData multi_cached_data;
-        err = cache_slot.getData(keys,
-                                 multi_cached_data);
-        for(ChaosStringVectorConstIterator it = keys.begin(),
-            end = keys.end();
-            it != end;
-            it++) {
-            const CacheData& cached_element = multi_cached_data[*it];
-            if(!cached_element ||
-               cached_element->size() == 0) {
-                //element has not been found so we need to create and empty cdata wrapper
-                CDataWrapper tmp;
-                int size = 0;
-                const char * d_ptr = tmp.getBSONRawData(size);
-                data_buffer.writeByte(d_ptr,
-                                      size);
-            } else {
-                data_buffer.writeByte(cached_element->data(),
-                                      (int32_t)cached_element->size());
-            }
-        }
-        
-        result_header.number_of_result = (uint32_t)multi_cached_data.size();
-        result_value_len = data_buffer.getCursorLocation();
-        result_value = ChaosMakeSharedPtr<Buffer>(data_buffer.release(), result_value_len, result_value_len, true);
-        
-    } catch(...) {}
-    return err;
-}
-
-int QueryDataMsgPSConsumer::getDataByIndex(const chaos::common::data::VectorCDWShrdPtr& indexes,
-                                      chaos::common::data::VectorCDWShrdPtr& found_data) {
-    
-    ObjectStorageDataAccess *obj_storage_da = DriverPoolManager::getInstance()->getObjectStorageDrv().getDataAccess<object_storage::abstraction::ObjectStorageDataAccess>();
-    std::for_each(indexes.begin(), indexes.end(), [&obj_storage_da, &found_data](const CDWShrdPtr& index){
-        int err = 0;
-        CDWShrdPtr data;
-        if((err = obj_storage_da->getObjectByIndex(index, data)) == 0) {
-            found_data.push_back(data);
-        } else {
-            ERR << CHAOS_FORMAT("Error fetching data using index(%1%) with code %2%", %data->getJSONString()%err);
-        }
-    });
-    return 0;
-}
-
-int QueryDataMsgPSConsumer::consumeDataCloudDelete(const std::string& search_key,
-                                              uint64_t start_ts,
-                                              uint64_t end_ts){
-    int err = 0;
-    VectorObject reuslt_object_found;
-    ObjectStorageDataAccess *obj_storage_da = DriverPoolManager::getInstance()->getObjectStorageDrv().getDataAccess<object_storage::abstraction::ObjectStorageDataAccess>();
-    if((err = obj_storage_da->deleteObject(search_key,
-                                           start_ts,
-                                           end_ts))) {
-        ERR << CHAOS_FORMAT("Error performing cloud query with code %1%", %err);
-    }
-    return err;
-}
-int QueryDataMsgPSConsumer::countDataCloud(const std::string& search_key,
-                                       uint64_t start_ts,
-                                       uint64_t end_ts,
-                                       uint64_t& count){
-    int err = 0;
-    ObjectStorageDataAccess *obj_storage_da = DriverPoolManager::getInstance()->getObjectStorageDrv().getDataAccess<object_storage::abstraction::ObjectStorageDataAccess>();
-    if((err = obj_storage_da->countObject(search_key,
-                                           start_ts,
-                                           end_ts,count))) {
-        ERR << CHAOS_FORMAT("Error performing count cloud query with code %1%", %err);
-    }
-    return err;
-
-}
-            
-#pragma mark DirectIOSystemAPIServerChannelHandler
-// Return the dataset for a producerkey ona specific snapshot
-int QueryDataMsgPSConsumer::consumeGetDatasetSnapshotEvent(opcode_headers::DirectIOSystemAPIChannelOpcodeNDGSnapshotHeader& header,
-                                                      const std::string& producer_id,
-                                                      chaos::common::data::BufferSPtr& channel_found_data,
-                                                      DirectIOSystemAPISnapshotResultHeader &result_header) {
-    int err = 0;
-    std::string channel_type;
-    //CHAOS_ASSERT(api_result)
-    SnapshotDataAccess *s_da = DriverPoolManager::getInstance()->getPersistenceDataAccess<SnapshotDataAccess>();
-    
-    //trduce int to postfix channel type
-    switch(header.field.channel_type) {
-        case 0:
-            channel_type = DataPackPrefixID::OUTPUT_DATASET_POSTFIX;
-            break;
-        case 1:
-            channel_type = DataPackPrefixID::INPUT_DATASET_POSTFIX;
-            break;
-        case 2:
-            channel_type = DataPackPrefixID::CUSTOM_DATASET_POSTFIX;
-            break;
-        case 3:
-            channel_type = DataPackPrefixID::SYSTEM_DATASET_POSTFIX;
-            break;
-            
-    }
-    
-    if((err = s_da->snapshotGetDatasetForProducerKey(header.field.snap_name,
-                                                     producer_id,
-                                                     channel_type,
-                                                     channel_found_data))) {
-        std::strcpy(result_header.error_message, "Error retriving the snapshoted dataaset for producer key");
-        ERR << result_header.error_message << "[" << header.field.snap_name << "/" << producer_id<<"]";
-    } else {
-        if(channel_found_data &&
-           channel_found_data->size()) {
-            result_header.error = 0;
-            std::strcpy(result_header.error_message, "Snapshot found");
-        } else {
-            result_header.error = -2;
-            std::strcpy(result_header.error_message, "Channel data not found in snapshot");
-            
-        }
-    }
-    return err;
-}
-
-int QueryDataMsgPSConsumer::consumeLogEntries(const std::string& node_name,
-                                         const ChaosStringVector& log_entries) {
-    int err = 0;
-    for(ChaosStringVectorConstIterator it = log_entries.begin(),
-        end = log_entries.end();
-        it != end;
-        it++) {
-        AgentDataAccess *a_da = DriverPoolManager::getInstance()->getPersistenceDataAccess<AgentDataAccess>();
-        if((err = a_da->pushLogEntry(node_name, *it))){
-            ERR << CHAOS_FORMAT("Error push entry for node %1%", %node_name);
-        }
-    }
-    return err;
-}
-*/
 }  // namespace metadata_service
 }  // namespace chaos
\ No newline at end of file
diff --git a/chaos/common/message/impl/kafka/rdk/MessagePSRDKafkaConsumer.cpp b/chaos/common/message/impl/kafka/rdk/MessagePSRDKafkaConsumer.cpp
index 3137ecd57..2f2a77f6e 100644
--- a/chaos/common/message/impl/kafka/rdk/MessagePSRDKafkaConsumer.cpp
+++ b/chaos/common/message/impl/kafka/rdk/MessagePSRDKafkaConsumer.cpp
@@ -244,6 +244,10 @@ void MessagePSRDKafkaConsumer::poll() {
       d.par = rkm->partition;
       try {
         d.cd = chaos::common::data::CDWUniquePtr(new chaos::common::data::CDataWrapper((const char*)rkm->payload, rkm->len));
+        if(d.cd.get()==NULL){
+          MRDERR_<<" invalid bson "<<d.key;
+          return;
+        }
       } catch (chaos::CException& e) {
         stats.errs++;
         std::stringstream ss;
-- 
GitLab