From 426e28466795ae95c57cb0e4c77ada30e2b486ce Mon Sep 17 00:00:00 2001
From: amichelo <andrea.michelotti@lnf.infn.it>
Date: Wed, 20 Oct 2021 14:02:08 +0200
Subject: [PATCH] subscribe after other services

---
 ChaosMetadataService/ChaosMetadataService.cpp | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)

diff --git a/ChaosMetadataService/ChaosMetadataService.cpp b/ChaosMetadataService/ChaosMetadataService.cpp
index 72e97f06d..a4bcbc115 100644
--- a/ChaosMetadataService/ChaosMetadataService.cpp
+++ b/ChaosMetadataService/ChaosMetadataService.cpp
@@ -228,10 +228,7 @@ void ChaosMetadataService::start()  {
     try {
         ChaosCommon<ChaosMetadataService>::start();
         StartableService::startImplementation(MDSBatchExecutor::getInstance(), "MDSBatchExecutor", __PRETTY_FUNCTION__);
-        #if defined(KAFKA_RDK_ENABLE) || defined(KAFKA_ASIO_ENABLE)
-        message_consumer.start( __PRETTY_FUNCTION__);
-
-        #endif
+        
         //start batch system
         data_consumer.start( __PRETTY_FUNCTION__);
         LAPP_ <<"\n----------------------------------------------------------------------"<<
@@ -244,6 +241,8 @@ void ChaosMetadataService::start()  {
         //register this process on persistence database
         persistence::data_access::DataServiceDataAccess *ds_da = DriverPoolManager::getInstance()->getPersistenceDataAccess<persistence::data_access::DataServiceDataAccess>();
         std::string unique_uid=NetworkBroker::getInstance()->getRPCUrl();
+        LCND_LDBG<<"-----REGISTERING ---";
+
         ds_da->registerNode(setting.ha_zone_name,
                             unique_uid,
                             NetworkBroker::getInstance()->getDirectIOUrl(),
@@ -255,12 +254,21 @@ void ChaosMetadataService::start()  {
                                                                                    chaos::common::constants::HBTimersTimeoutinMSec);
 
 
+        
         StartableService::startImplementation(HealtManagerDirect::getInstance(), "HealtManagerDirect", __PRETTY_FUNCTION__);
         HealtManagerDirect::getInstance()->addNewNode(unique_uid);
         HealtManagerDirect::getInstance()->addNodeMetricValue(unique_uid,
                                                             NodeHealtDefinitionKey::NODE_HEALT_STATUS,
                                                             NodeHealtDefinitionValue::NODE_HEALT_STATUS_START);
-            
+        #if defined(KAFKA_RDK_ENABLE) || defined(KAFKA_ASIO_ENABLE)
+
+        sleep(chaos::common::constants::HBTimersTimeoutinMSec/1000);
+        LCND_LDBG<<"-----SUBSCRIBING---";
+
+        message_consumer.start( __PRETTY_FUNCTION__);
+
+        #endif 
+        
         waitCloseSemaphore.wait();
     } catch (CException& ex) {
         DECODE_CHAOS_EXCEPTION(ex)
-- 
GitLab