From 7a6c2630277456817b55db76d100c24619dcbc84 Mon Sep 17 00:00:00 2001
From: amichelo <andrea.michelotti@lnf.infn.it>
Date: Mon, 25 Oct 2021 12:45:54 +0200
Subject: [PATCH] adding kafka into netbroker

---
 ChaosMetadataService/ChaosMetadataService.h   |   1 +
 .../QueryDataMsgPSConsumer.cpp                |   2 +-
 chaos/common/ChaosCommon.h                    |   2 +
 chaos/common/message/MessagePSDriver.cpp      | 179 +++++++++---------
 chaos/common/message/MessagePSDriver.h        |   2 +
 chaos/common/network/NetworkBroker.cpp        |  23 ++-
 chaos/common/network/NetworkBroker.h          |   5 +
 7 files changed, 121 insertions(+), 93 deletions(-)

diff --git a/ChaosMetadataService/ChaosMetadataService.h b/ChaosMetadataService/ChaosMetadataService.h
index baef5670a..121181bf3 100644
--- a/ChaosMetadataService/ChaosMetadataService.h
+++ b/ChaosMetadataService/ChaosMetadataService.h
@@ -108,6 +108,7 @@ namespace chaos {
              */
             void updateLiveCache(const std::string& name,int64_t te);
 
+
             /**
              * @brief remove storage data to from
              * 
diff --git a/ChaosMetadataService/QueryDataMsgPSConsumer.cpp b/ChaosMetadataService/QueryDataMsgPSConsumer.cpp
index b5ce96d27..c1e7c91bb 100644
--- a/ChaosMetadataService/QueryDataMsgPSConsumer.cpp
+++ b/ChaosMetadataService/QueryDataMsgPSConsumer.cpp
@@ -280,7 +280,7 @@ int QueryDataMsgPSConsumer::consumeHealthDataEvent(const std::string&
     }
   }*/
   if(channel_data.get()==NULL || channel_data->data()==NULL){
-    DBG<<"Empty health for:\""<<key<<"\" registration pack";
+   // DBG<<"Empty health for:\""<<key<<"\" registration pack";
     if(alive_map.find(key)==alive_map.end()){
         if (cons->subscribe(key) != 0) {
               ERR <<"] cannot subscribe to :" << key<<" err:"<<cons->getLastError();
diff --git a/chaos/common/ChaosCommon.h b/chaos/common/ChaosCommon.h
index e4c6a0221..f00a20f3c 100644
--- a/chaos/common/ChaosCommon.h
+++ b/chaos/common/ChaosCommon.h
@@ -67,6 +67,8 @@ namespace chaos {
         chaos::common::data::CDWUniquePtr _registrationAck(chaos::common::data::CDWUniquePtr data);
 
     public:
+    
+        std::string nodeuid;
         //! Constructor Method
         /*!
          This method call the \ref GlobalConfiguration::preParseStartupParameters method, starting the
diff --git a/chaos/common/message/MessagePSDriver.cpp b/chaos/common/message/MessagePSDriver.cpp
index 41a3e01d9..dd1dc6652 100644
--- a/chaos/common/message/MessagePSDriver.cpp
+++ b/chaos/common/message/MessagePSDriver.cpp
@@ -9,106 +9,107 @@
 
 #endif
 #ifdef KAFKA_ASIO_ENABLE
-#include "impl/kafka/asio/MessagePSKafkaAsioProducer.h"
 #include "impl/kafka/asio/MessagePSKafkaAsioConsumer.h"
+#include "impl/kafka/asio/MessagePSKafkaAsioProducer.h"
 
 #endif
 #include <chaos/common/global.h>
 
 namespace chaos {
-    namespace common {
-        namespace message {
-
-   boost::mutex MessagePSDriver::io;
-   std::map<std::string,producer_uptr_t> MessagePSDriver::producer_drv_m;
-   std::map<std::string,consumer_uptr_t> MessagePSDriver::consumer_drv_m;
-      
-    producer_uptr_t MessagePSDriver::getProducerDriver(const std::string&drvname,const std::string& k){
-    boost::mutex::scoped_lock ll(io);
-    std::map<std::string,producer_uptr_t>::iterator i=producer_drv_m.find(drvname);
-    if(i!=producer_drv_m.end()){
-        MRDDBG_<<drvname<<"] returning allocated producer:"<<std::hex<<i->second.get();
-        return i->second;
-    }
-producer_uptr_t ret;
+namespace common {
+namespace message {
+
+boost::mutex                           MessagePSDriver::io;
+std::map<std::string, producer_uptr_t> MessagePSDriver::producer_drv_m;
+std::map<std::string, consumer_uptr_t> MessagePSDriver::consumer_drv_m;
+producer_uptr_t                        MessagePSDriver::getNewProducerDriver(const std::string& drvname, const std::string& k) {
+  producer_uptr_t ret;
 #ifdef KAFKA_RDK_ENABLE
 
-                if((drvname=="KAFKA-RDK") || (drvname=="kafka-rdk")){
-                    ret.reset(new kafka::rdk::MessagePSKafkaProducer());
-                    producer_drv_m["kafka-rdk"]=ret;
-                }
+  if ((drvname == "KAFKA-RDK") || (drvname == "kafka-rdk")) {
+    ret.reset(new kafka::rdk::MessagePSKafkaProducer());
+    producer_drv_m["kafka-rdk"] = ret;
+  }
 #endif
 #ifdef KAFKA_ASIO_ENABLE
-                if((drvname=="KAFKA-ASIO")||(drvname=="kafka-asio")){
-                    ret.reset(new kafka::asio::MessagePSKafkaAsioProducer());
-                    producer_drv_m["kafka-asio"]=ret;
-
-                }
+  if ((drvname == "KAFKA-ASIO") || (drvname == "kafka-asio")) {
+    ret.reset(new kafka::asio::MessagePSKafkaAsioProducer());
+    producer_drv_m["kafka-asio"] = ret;
+  }
 #endif
-            if(ret.get()==NULL){
-                throw chaos::CException(-5,"cannot find a producer driver for:"+drvname,__PRETTY_FUNCTION__);
-
-            }
-            MRDDBG_<<drvname<<"] created producer:"<<std::hex<<ret.get();
-
-            if(GlobalConfiguration::getInstance()->hasOption(InitOption::OPT_MSG_PRODUCER_KVP)){
-                     std::vector<std::string> opt=GlobalConfiguration::getInstance()->getOption< std::vector<std::string> >(InitOption::OPT_MSG_PRODUCER_KVP);
-                    std::map<std::string,std::string> kv;
-                    GlobalConfiguration::fillKVParameter(kv ,opt,"");
-                    for(std::map<std::string,std::string>::iterator i=kv.begin();i!=kv.end();i++){
-                        ret->setOption(i->first,i->second);
-                    }
-
-            }
-            return ret;
-
-                
+  if (ret.get() == NULL) {
+    throw chaos::CException(-5, "cannot find a producer driver for:" + drvname, __PRETTY_FUNCTION__);
+  }
+  MRDDBG_ << drvname << "] created producer:" << std::hex << ret.get();
+
+  if (GlobalConfiguration::getInstance()->hasOption(InitOption::OPT_MSG_PRODUCER_KVP)) {
+    std::vector<std::string>           opt = GlobalConfiguration::getInstance()->getOption<std::vector<std::string> >(InitOption::OPT_MSG_PRODUCER_KVP);
+    std::map<std::string, std::string> kv;
+    GlobalConfiguration::fillKVParameter(kv, opt, "");
+    for (std::map<std::string, std::string>::iterator i = kv.begin(); i != kv.end(); i++) {
+      ret->setOption(i->first, i->second);
     }
-     consumer_uptr_t MessagePSDriver::getConsumerDriver(const std::string&drvname,const std::string& gid,const std::string& k){
-             std::map<std::string,consumer_uptr_t>::iterator i=consumer_drv_m.find(drvname);
-
-         consumer_uptr_t ret;
-          if(i!=consumer_drv_m.end()){
-                MRDDBG_<<drvname<<"] returning allocated consumer:"<<std::hex<<i->second.get();
-
-                return i->second;
-        }
-   #ifdef KAFKA_RDK_ENABLE
-
-                if((drvname=="KAFKA-RDK") || (drvname=="kafka-rdk")){
-                    ret.reset(new kafka::rdk::MessagePSRDKafkaConsumer(gid,k));
-                    consumer_drv_m["kafka-rdk"]=ret;
-                }
-    #endif
-    #ifdef KAFKA_ASIO_ENABLE
-                if((drvname=="KAFKA-ASIO")||(drvname=="kafka-asio")){
-                    ret.reset(new kafka::asio::MessagePSKafkaAsioConsumer(gid,k));
-                    consumer_drv_m["kafka-asio"]=ret;
-                }
-    #endif
+  }
+  return ret;
+}
+
+producer_uptr_t MessagePSDriver::getProducerDriver(const std::string& drvname, const std::string& k) {
+  producer_uptr_t ret;
+
+  boost::mutex::scoped_lock                        ll(io);
+  std::map<std::string, producer_uptr_t>::iterator i = producer_drv_m.find(drvname);
+  if (i != producer_drv_m.end()) {
+    MRDDBG_ << drvname << "] returning allocated producer:" << std::hex << i->second.get();
+    return i->second;
+  }
+  ret                     = getNewProducerDriver(drvname, k);
+  producer_drv_m[drvname] = ret;
+  return ret;
+}
+consumer_uptr_t MessagePSDriver::getNewConsumerDriver(const std::string& drvname, const std::string& gid, const std::string& k) {
+  consumer_uptr_t ret;
 
-            if(ret.get()==NULL){
-                throw chaos::CException(-5,"cannot find a consumer driver for:"+drvname,__PRETTY_FUNCTION__);
-
-            }
-            MRDDBG_<<drvname<<"] created consumer:"<<std::hex<<ret.get();
-
-            if(GlobalConfiguration::getInstance()->hasOption(InitOption::OPT_MSG_CONSUMER_KVP)){
-                     std::vector<std::string> opt=GlobalConfiguration::getInstance()->getOption< std::vector<std::string> >(InitOption::OPT_MSG_CONSUMER_KVP);
-                    std::map<std::string,std::string> kv;
-                    GlobalConfiguration::fillKVParameter(kv ,opt,"");
-                    for(std::map<std::string,std::string>::iterator i=kv.begin();i!=kv.end();i++){
-
-                        ret->setOption(i->first,i->second);
-                    }
-
-            }
-            return ret;
-
-     }
-
-
-        }
-        }
-        }
+#ifdef KAFKA_RDK_ENABLE
 
+  if ((drvname == "KAFKA-RDK") || (drvname == "kafka-rdk")) {
+    ret.reset(new kafka::rdk::MessagePSRDKafkaConsumer(gid, k));
+  }
+#endif
+#ifdef KAFKA_ASIO_ENABLE
+  if ((drvname == "KAFKA-ASIO") || (drvname == "kafka-asio")) {
+    ret.reset(new kafka::asio::MessagePSKafkaAsioConsumer(gid, k));
+  }
+#endif
+  if (ret.get() == NULL) {
+    throw chaos::CException(-5, "cannot find a consumer driver for:" + drvname, __PRETTY_FUNCTION__);
+  }
+  MRDDBG_ << drvname << "] created consumer:" << std::hex << ret.get();
+
+  if (GlobalConfiguration::getInstance()->hasOption(InitOption::OPT_MSG_CONSUMER_KVP)) {
+    std::vector<std::string>           opt = GlobalConfiguration::getInstance()->getOption<std::vector<std::string> >(InitOption::OPT_MSG_CONSUMER_KVP);
+    std::map<std::string, std::string> kv;
+    GlobalConfiguration::fillKVParameter(kv, opt, "");
+    for (std::map<std::string, std::string>::iterator i = kv.begin(); i != kv.end(); i++) {
+      ret->setOption(i->first, i->second);
+    }
+  }
+  return ret;
+}
+
+consumer_uptr_t MessagePSDriver::getConsumerDriver(const std::string& drvname, const std::string& gid, const std::string& k) {
+  std::map<std::string, consumer_uptr_t>::iterator i = consumer_drv_m.find(drvname);
+
+  consumer_uptr_t ret;
+  if (i != consumer_drv_m.end()) {
+    MRDDBG_ << drvname << "] returning allocated consumer:" << std::hex << i->second.get();
+
+    return i->second;
+  }
+  ret                     = MessagePSDriver::getNewConsumerDriver(drvname, gid, k);
+  consumer_drv_m[drvname] = ret;
+  return ret;
+}
+
+}  // namespace message
+}  // namespace common
+}  // namespace chaos
diff --git a/chaos/common/message/MessagePSDriver.h b/chaos/common/message/MessagePSDriver.h
index a046df5fe..f1fbce39f 100644
--- a/chaos/common/message/MessagePSDriver.h
+++ b/chaos/common/message/MessagePSDriver.h
@@ -16,6 +16,8 @@ class MessagePSDriver {
     static std::map<std::string,consumer_uptr_t> consumer_drv_m;
 
     public:
+    static producer_uptr_t getNewProducerDriver(const std::string&drvname,const std::string& k="");
+    static consumer_uptr_t getNewConsumerDriver(const std::string&drvname,const std::string& gid,const std::string& k="");
     static producer_uptr_t getProducerDriver(const std::string&drvname,const std::string& k="");
     static consumer_uptr_t getConsumerDriver(const std::string&drvname,const std::string& gid,const std::string& k="");
 
diff --git a/chaos/common/network/NetworkBroker.cpp b/chaos/common/network/NetworkBroker.cpp
index f7218d2e8..ccd8481e3 100644
--- a/chaos/common/network/NetworkBroker.cpp
+++ b/chaos/common/network/NetworkBroker.cpp
@@ -90,10 +90,27 @@ void NetworkBroker::init(void *initData) {
     
     
     if(!globalConfiguration) {
-        throw CException(-1, "No global configuraiton found", __PRETTY_FUNCTION__);
+        throw CException(-1, "No global configuration found", __PRETTY_FUNCTION__);
     }
     MB_LAPP << "Configuration:"<<globalConfiguration->getCompliantJSONString();
-    
+    if (GlobalConfiguration::getInstance()->getConfiguration()->hasKey(InitOption::OPT_MSG_BROKER_SERVER)) {
+        std::string msgbrokerdrv = "kafka-rdk";
+        msgbrokerdrv = GlobalConfiguration::getInstance()->getOption<std::string>(InitOption::OPT_MSG_BROKER_DRIVER);
+
+        std::string msgbroker = GlobalConfiguration::getInstance()->getConfiguration()->getStringValue(InitOption::OPT_MSG_BROKER_SERVER);
+        MB_LAPP << "Initializing producer/consumer based on "<<msgbroker;
+
+        prod=chaos::common::message::MessagePSDriver::getNewProducerDriver(msgbrokerdrv);
+        prod->addServer(msgbroker);
+
+        if (prod->applyConfiguration() != 0) {
+        throw chaos::CException(-1, "cannot initialize Publish Subscribe Producer:" + prod->getLastError(), __PRETTY_FUNCTION__);
+        }
+        prod->start();
+        cons=chaos::common::message::MessagePSDriver::getNewConsumerDriver(msgbrokerdrv,"");
+        cons->addServer(msgbroker);
+
+  }
     //---------------------------- D I R E C T I/O ----------------------------
     if(globalConfiguration->hasKey(common::direct_io::DirectIOConfigurationKey::DIRECT_IO_IMPL_TYPE)) {
         MB_LAPP  << "Setup DirectIO sublayer";
@@ -101,7 +118,7 @@ void NetworkBroker::init(void *initData) {
         //construct the rpc server and client name
         string direct_io_server_impl = direct_io_impl+"DirectIOServer";
         direct_io_client_impl = direct_io_impl + "DirectIOClient";
-        MB_LAPP  << "Trying to initilize DirectIO Server: " << direct_io_server_impl;
+        MB_LAPP  << "Trying to initialize DirectIO Server: " << direct_io_server_impl;
         direct_io_server = ObjectFactoryRegister<common::direct_io::DirectIOServer>::getInstance()->getNewInstanceByName(direct_io_server_impl);
         if(!direct_io_server) throw CException(-2, "Error creating direct io server implementation:"+direct_io_server_impl, __PRETTY_FUNCTION__);
         
diff --git a/chaos/common/network/NetworkBroker.h b/chaos/common/network/NetworkBroker.h
index fa35b9f18..dfbfa3467 100644
--- a/chaos/common/network/NetworkBroker.h
+++ b/chaos/common/network/NetworkBroker.h
@@ -31,6 +31,7 @@
 //#include <chaos/common/network/PerformanceManagment.h>
 #include <chaos/common/utility/StartableService.h>
 #include <chaos/common/network/CNodeNetworkAddress.h>
+#include <chaos/common/message/MessagePSDriver.h>
 
 namespace chaos {
 	
@@ -120,6 +121,10 @@ namespace chaos {
 				//! Rpc server for message listening
                 chaos::RpcServer *rpc_server;
                 
+				// publish subscribe
+				chaos::common::message::producer_uptr_t prod;
+              	chaos::common::message::consumer_uptr_t cons;
+
 				//rpc action dispatcher
 				AbstractCommandDispatcher *rpc_dispatcher;
 				
-- 
GitLab