diff --git a/ChaosMetadataService/ChaosMetadataService.h b/ChaosMetadataService/ChaosMetadataService.h
index baef5670a5b81562e299a37d2f159fcec1f222f5..121181bf3f71fca3f54de1c29c7a363d4e9e37ef 100644
--- a/ChaosMetadataService/ChaosMetadataService.h
+++ b/ChaosMetadataService/ChaosMetadataService.h
@@ -108,6 +108,7 @@ namespace chaos {
              */
             void updateLiveCache(const std::string& name,int64_t te);
 
+
             /**
              * @brief remove storage data to from
              * 
diff --git a/ChaosMetadataService/QueryDataMsgPSConsumer.cpp b/ChaosMetadataService/QueryDataMsgPSConsumer.cpp
index b5ce96d27d8b706e9df4124a38aae2855c9b0d42..c1e7c91bb32fb9f7a4524dc15939ed3c0a19a1c8 100644
--- a/ChaosMetadataService/QueryDataMsgPSConsumer.cpp
+++ b/ChaosMetadataService/QueryDataMsgPSConsumer.cpp
@@ -280,7 +280,7 @@ int QueryDataMsgPSConsumer::consumeHealthDataEvent(const std::string&
     }
   }*/
   if(channel_data.get()==NULL || channel_data->data()==NULL){
-    DBG<<"Empty health for:\""<<key<<"\" registration pack";
+   // DBG<<"Empty health for:\""<<key<<"\" registration pack";
     if(alive_map.find(key)==alive_map.end()){
         if (cons->subscribe(key) != 0) {
               ERR <<"] cannot subscribe to :" << key<<" err:"<<cons->getLastError();
diff --git a/chaos/common/ChaosCommon.h b/chaos/common/ChaosCommon.h
index e4c6a0221d8703d71c2c6181e852766d838dc46e..f00a20f3c26fc5641f8d8191f7d23363bdc0ca19 100644
--- a/chaos/common/ChaosCommon.h
+++ b/chaos/common/ChaosCommon.h
@@ -67,6 +67,8 @@ namespace chaos {
         chaos::common::data::CDWUniquePtr _registrationAck(chaos::common::data::CDWUniquePtr data);
 
     public:
+    
+        std::string nodeuid;
         //! Constructor Method
         /*!
          This method call the \ref GlobalConfiguration::preParseStartupParameters method, starting the
diff --git a/chaos/common/message/MessagePSDriver.cpp b/chaos/common/message/MessagePSDriver.cpp
index 41a3e01d91350812854ab52e331e366dc025c00d..dd1dc665238dfbb888b85bb4dfe31260dfdcf2d3 100644
--- a/chaos/common/message/MessagePSDriver.cpp
+++ b/chaos/common/message/MessagePSDriver.cpp
@@ -9,106 +9,107 @@
 
 #endif
 #ifdef KAFKA_ASIO_ENABLE
-#include "impl/kafka/asio/MessagePSKafkaAsioProducer.h"
 #include "impl/kafka/asio/MessagePSKafkaAsioConsumer.h"
+#include "impl/kafka/asio/MessagePSKafkaAsioProducer.h"
 
 #endif
 #include <chaos/common/global.h>
 
 namespace chaos {
-    namespace common {
-        namespace message {
-
-   boost::mutex MessagePSDriver::io;
-   std::map<std::string,producer_uptr_t> MessagePSDriver::producer_drv_m;
-   std::map<std::string,consumer_uptr_t> MessagePSDriver::consumer_drv_m;
-      
-    producer_uptr_t MessagePSDriver::getProducerDriver(const std::string&drvname,const std::string& k){
-    boost::mutex::scoped_lock ll(io);
-    std::map<std::string,producer_uptr_t>::iterator i=producer_drv_m.find(drvname);
-    if(i!=producer_drv_m.end()){
-        MRDDBG_<<drvname<<"] returning allocated producer:"<<std::hex<<i->second.get();
-        return i->second;
-    }
-producer_uptr_t ret;
+namespace common {
+namespace message {
+
+boost::mutex                           MessagePSDriver::io;
+std::map<std::string, producer_uptr_t> MessagePSDriver::producer_drv_m;
+std::map<std::string, consumer_uptr_t> MessagePSDriver::consumer_drv_m;
+producer_uptr_t                        MessagePSDriver::getNewProducerDriver(const std::string& drvname, const std::string& k) {
+  producer_uptr_t ret;
 #ifdef KAFKA_RDK_ENABLE
 
-                if((drvname=="KAFKA-RDK") || (drvname=="kafka-rdk")){
-                    ret.reset(new kafka::rdk::MessagePSKafkaProducer());
-                    producer_drv_m["kafka-rdk"]=ret;
-                }
+  if ((drvname == "KAFKA-RDK") || (drvname == "kafka-rdk")) {
+    ret.reset(new kafka::rdk::MessagePSKafkaProducer());
+    producer_drv_m["kafka-rdk"] = ret;
+  }
 #endif
 #ifdef KAFKA_ASIO_ENABLE
-                if((drvname=="KAFKA-ASIO")||(drvname=="kafka-asio")){
-                    ret.reset(new kafka::asio::MessagePSKafkaAsioProducer());
-                    producer_drv_m["kafka-asio"]=ret;
-
-                }
+  if ((drvname == "KAFKA-ASIO") || (drvname == "kafka-asio")) {
+    ret.reset(new kafka::asio::MessagePSKafkaAsioProducer());
+    producer_drv_m["kafka-asio"] = ret;
+  }
 #endif
-            if(ret.get()==NULL){
-                throw chaos::CException(-5,"cannot find a producer driver for:"+drvname,__PRETTY_FUNCTION__);
-
-            }
-            MRDDBG_<<drvname<<"] created producer:"<<std::hex<<ret.get();
-
-            if(GlobalConfiguration::getInstance()->hasOption(InitOption::OPT_MSG_PRODUCER_KVP)){
-                     std::vector<std::string> opt=GlobalConfiguration::getInstance()->getOption< std::vector<std::string> >(InitOption::OPT_MSG_PRODUCER_KVP);
-                    std::map<std::string,std::string> kv;
-                    GlobalConfiguration::fillKVParameter(kv ,opt,"");
-                    for(std::map<std::string,std::string>::iterator i=kv.begin();i!=kv.end();i++){
-                        ret->setOption(i->first,i->second);
-                    }
-
-            }
-            return ret;
-
-                
+  if (ret.get() == NULL) {
+    throw chaos::CException(-5, "cannot find a producer driver for:" + drvname, __PRETTY_FUNCTION__);
+  }
+  MRDDBG_ << drvname << "] created producer:" << std::hex << ret.get();
+
+  if (GlobalConfiguration::getInstance()->hasOption(InitOption::OPT_MSG_PRODUCER_KVP)) {
+    std::vector<std::string>           opt = GlobalConfiguration::getInstance()->getOption<std::vector<std::string> >(InitOption::OPT_MSG_PRODUCER_KVP);
+    std::map<std::string, std::string> kv;
+    GlobalConfiguration::fillKVParameter(kv, opt, "");
+    for (std::map<std::string, std::string>::iterator i = kv.begin(); i != kv.end(); i++) {
+      ret->setOption(i->first, i->second);
     }
-     consumer_uptr_t MessagePSDriver::getConsumerDriver(const std::string&drvname,const std::string& gid,const std::string& k){
-             std::map<std::string,consumer_uptr_t>::iterator i=consumer_drv_m.find(drvname);
-
-         consumer_uptr_t ret;
-          if(i!=consumer_drv_m.end()){
-                MRDDBG_<<drvname<<"] returning allocated consumer:"<<std::hex<<i->second.get();
-
-                return i->second;
-        }
-   #ifdef KAFKA_RDK_ENABLE
-
-                if((drvname=="KAFKA-RDK") || (drvname=="kafka-rdk")){
-                    ret.reset(new kafka::rdk::MessagePSRDKafkaConsumer(gid,k));
-                    consumer_drv_m["kafka-rdk"]=ret;
-                }
-    #endif
-    #ifdef KAFKA_ASIO_ENABLE
-                if((drvname=="KAFKA-ASIO")||(drvname=="kafka-asio")){
-                    ret.reset(new kafka::asio::MessagePSKafkaAsioConsumer(gid,k));
-                    consumer_drv_m["kafka-asio"]=ret;
-                }
-    #endif
+  }
+  return ret;
+}
+
+producer_uptr_t MessagePSDriver::getProducerDriver(const std::string& drvname, const std::string& k) {
+  producer_uptr_t ret;
+
+  boost::mutex::scoped_lock                        ll(io);
+  std::map<std::string, producer_uptr_t>::iterator i = producer_drv_m.find(drvname);
+  if (i != producer_drv_m.end()) {
+    MRDDBG_ << drvname << "] returning allocated producer:" << std::hex << i->second.get();
+    return i->second;
+  }
+  ret                     = getNewProducerDriver(drvname, k);
+  producer_drv_m[drvname] = ret;
+  return ret;
+}
+consumer_uptr_t MessagePSDriver::getNewConsumerDriver(const std::string& drvname, const std::string& gid, const std::string& k) {
+  consumer_uptr_t ret;
 
-            if(ret.get()==NULL){
-                throw chaos::CException(-5,"cannot find a consumer driver for:"+drvname,__PRETTY_FUNCTION__);
-
-            }
-            MRDDBG_<<drvname<<"] created consumer:"<<std::hex<<ret.get();
-
-            if(GlobalConfiguration::getInstance()->hasOption(InitOption::OPT_MSG_CONSUMER_KVP)){
-                     std::vector<std::string> opt=GlobalConfiguration::getInstance()->getOption< std::vector<std::string> >(InitOption::OPT_MSG_CONSUMER_KVP);
-                    std::map<std::string,std::string> kv;
-                    GlobalConfiguration::fillKVParameter(kv ,opt,"");
-                    for(std::map<std::string,std::string>::iterator i=kv.begin();i!=kv.end();i++){
-
-                        ret->setOption(i->first,i->second);
-                    }
-
-            }
-            return ret;
-
-     }
-
-
-        }
-        }
-        }
+#ifdef KAFKA_RDK_ENABLE
 
+  if ((drvname == "KAFKA-RDK") || (drvname == "kafka-rdk")) {
+    ret.reset(new kafka::rdk::MessagePSRDKafkaConsumer(gid, k));
+  }
+#endif
+#ifdef KAFKA_ASIO_ENABLE
+  if ((drvname == "KAFKA-ASIO") || (drvname == "kafka-asio")) {
+    ret.reset(new kafka::asio::MessagePSKafkaAsioConsumer(gid, k));
+  }
+#endif
+  if (ret.get() == NULL) {
+    throw chaos::CException(-5, "cannot find a consumer driver for:" + drvname, __PRETTY_FUNCTION__);
+  }
+  MRDDBG_ << drvname << "] created consumer:" << std::hex << ret.get();
+
+  if (GlobalConfiguration::getInstance()->hasOption(InitOption::OPT_MSG_CONSUMER_KVP)) {
+    std::vector<std::string>           opt = GlobalConfiguration::getInstance()->getOption<std::vector<std::string> >(InitOption::OPT_MSG_CONSUMER_KVP);
+    std::map<std::string, std::string> kv;
+    GlobalConfiguration::fillKVParameter(kv, opt, "");
+    for (std::map<std::string, std::string>::iterator i = kv.begin(); i != kv.end(); i++) {
+      ret->setOption(i->first, i->second);
+    }
+  }
+  return ret;
+}
+
+consumer_uptr_t MessagePSDriver::getConsumerDriver(const std::string& drvname, const std::string& gid, const std::string& k) {
+  std::map<std::string, consumer_uptr_t>::iterator i = consumer_drv_m.find(drvname);
+
+  consumer_uptr_t ret;
+  if (i != consumer_drv_m.end()) {
+    MRDDBG_ << drvname << "] returning allocated consumer:" << std::hex << i->second.get();
+
+    return i->second;
+  }
+  ret                     = MessagePSDriver::getNewConsumerDriver(drvname, gid, k);
+  consumer_drv_m[drvname] = ret;
+  return ret;
+}
+
+}  // namespace message
+}  // namespace common
+}  // namespace chaos
diff --git a/chaos/common/message/MessagePSDriver.h b/chaos/common/message/MessagePSDriver.h
index a046df5febdda4d0eb85d89063cb47336ad37d29..f1fbce39f648baa593b2109a9b6ee0a8c2bf165d 100644
--- a/chaos/common/message/MessagePSDriver.h
+++ b/chaos/common/message/MessagePSDriver.h
@@ -16,6 +16,8 @@ class MessagePSDriver {
     static std::map<std::string,consumer_uptr_t> consumer_drv_m;
 
     public:
+    static producer_uptr_t getNewProducerDriver(const std::string&drvname,const std::string& k="");
+    static consumer_uptr_t getNewConsumerDriver(const std::string&drvname,const std::string& gid,const std::string& k="");
     static producer_uptr_t getProducerDriver(const std::string&drvname,const std::string& k="");
     static consumer_uptr_t getConsumerDriver(const std::string&drvname,const std::string& gid,const std::string& k="");
 
diff --git a/chaos/common/network/NetworkBroker.cpp b/chaos/common/network/NetworkBroker.cpp
index f7218d2e8aec51a67cea893706b03f0ca8248acc..ccd8481e34cc41afe7be39d322a2e16bf47ffd32 100644
--- a/chaos/common/network/NetworkBroker.cpp
+++ b/chaos/common/network/NetworkBroker.cpp
@@ -90,10 +90,27 @@ void NetworkBroker::init(void *initData) {
     
     
     if(!globalConfiguration) {
-        throw CException(-1, "No global configuraiton found", __PRETTY_FUNCTION__);
+        throw CException(-1, "No global configuration found", __PRETTY_FUNCTION__);
     }
     MB_LAPP << "Configuration:"<<globalConfiguration->getCompliantJSONString();
-    
+    if (GlobalConfiguration::getInstance()->getConfiguration()->hasKey(InitOption::OPT_MSG_BROKER_SERVER)) {
+        std::string msgbrokerdrv = "kafka-rdk";
+        msgbrokerdrv = GlobalConfiguration::getInstance()->getOption<std::string>(InitOption::OPT_MSG_BROKER_DRIVER);
+
+        std::string msgbroker = GlobalConfiguration::getInstance()->getConfiguration()->getStringValue(InitOption::OPT_MSG_BROKER_SERVER);
+        MB_LAPP << "Initializing producer/consumer based on "<<msgbroker;
+
+        prod=chaos::common::message::MessagePSDriver::getNewProducerDriver(msgbrokerdrv);
+        prod->addServer(msgbroker);
+
+        if (prod->applyConfiguration() != 0) {
+        throw chaos::CException(-1, "cannot initialize Publish Subscribe Producer:" + prod->getLastError(), __PRETTY_FUNCTION__);
+        }
+        prod->start();
+        cons=chaos::common::message::MessagePSDriver::getNewConsumerDriver(msgbrokerdrv,"");
+        cons->addServer(msgbroker);
+
+  }
     //---------------------------- D I R E C T I/O ----------------------------
     if(globalConfiguration->hasKey(common::direct_io::DirectIOConfigurationKey::DIRECT_IO_IMPL_TYPE)) {
         MB_LAPP  << "Setup DirectIO sublayer";
@@ -101,7 +118,7 @@ void NetworkBroker::init(void *initData) {
         //construct the rpc server and client name
         string direct_io_server_impl = direct_io_impl+"DirectIOServer";
         direct_io_client_impl = direct_io_impl + "DirectIOClient";
-        MB_LAPP  << "Trying to initilize DirectIO Server: " << direct_io_server_impl;
+        MB_LAPP  << "Trying to initialize DirectIO Server: " << direct_io_server_impl;
         direct_io_server = ObjectFactoryRegister<common::direct_io::DirectIOServer>::getInstance()->getNewInstanceByName(direct_io_server_impl);
         if(!direct_io_server) throw CException(-2, "Error creating direct io server implementation:"+direct_io_server_impl, __PRETTY_FUNCTION__);
         
diff --git a/chaos/common/network/NetworkBroker.h b/chaos/common/network/NetworkBroker.h
index fa35b9f18e0111fe3b57d7dab7600f6704c05471..dfbfa3467d683105a1635a50957097156a72490c 100644
--- a/chaos/common/network/NetworkBroker.h
+++ b/chaos/common/network/NetworkBroker.h
@@ -31,6 +31,7 @@
 //#include <chaos/common/network/PerformanceManagment.h>
 #include <chaos/common/utility/StartableService.h>
 #include <chaos/common/network/CNodeNetworkAddress.h>
+#include <chaos/common/message/MessagePSDriver.h>
 
 namespace chaos {
 	
@@ -120,6 +121,10 @@ namespace chaos {
 				//! Rpc server for message listening
                 chaos::RpcServer *rpc_server;
                 
+				// publish subscribe
+				chaos::common::message::producer_uptr_t prod;
+              	chaos::common::message::consumer_uptr_t cons;
+
 				//rpc action dispatcher
 				AbstractCommandDispatcher *rpc_dispatcher;