diff --git a/ChaosMetadataService/ChaosMetadataService.cpp b/ChaosMetadataService/ChaosMetadataService.cpp index 72e97f06d39994f2015a6ef6d1e2ca0ddb2976ee..a4bcbc115b60fd02ef089d93eaa18d15b0dfeaad 100644 --- a/ChaosMetadataService/ChaosMetadataService.cpp +++ b/ChaosMetadataService/ChaosMetadataService.cpp @@ -228,10 +228,7 @@ void ChaosMetadataService::start() { try { ChaosCommon<ChaosMetadataService>::start(); StartableService::startImplementation(MDSBatchExecutor::getInstance(), "MDSBatchExecutor", __PRETTY_FUNCTION__); - #if defined(KAFKA_RDK_ENABLE) || defined(KAFKA_ASIO_ENABLE) - message_consumer.start( __PRETTY_FUNCTION__); - - #endif + //start batch system data_consumer.start( __PRETTY_FUNCTION__); LAPP_ <<"\n----------------------------------------------------------------------"<< @@ -244,6 +241,8 @@ void ChaosMetadataService::start() { //register this process on persistence database persistence::data_access::DataServiceDataAccess *ds_da = DriverPoolManager::getInstance()->getPersistenceDataAccess<persistence::data_access::DataServiceDataAccess>(); std::string unique_uid=NetworkBroker::getInstance()->getRPCUrl(); + LCND_LDBG<<"-----REGISTERING ---"; + ds_da->registerNode(setting.ha_zone_name, unique_uid, NetworkBroker::getInstance()->getDirectIOUrl(), @@ -255,12 +254,21 @@ void ChaosMetadataService::start() { chaos::common::constants::HBTimersTimeoutinMSec); + StartableService::startImplementation(HealtManagerDirect::getInstance(), "HealtManagerDirect", __PRETTY_FUNCTION__); HealtManagerDirect::getInstance()->addNewNode(unique_uid); HealtManagerDirect::getInstance()->addNodeMetricValue(unique_uid, NodeHealtDefinitionKey::NODE_HEALT_STATUS, NodeHealtDefinitionValue::NODE_HEALT_STATUS_START); - + #if defined(KAFKA_RDK_ENABLE) || defined(KAFKA_ASIO_ENABLE) + + sleep(chaos::common::constants::HBTimersTimeoutinMSec/1000); + LCND_LDBG<<"-----SUBSCRIBING---"; + + message_consumer.start( __PRETTY_FUNCTION__); + + #endif + waitCloseSemaphore.wait(); } catch (CException& ex) { DECODE_CHAOS_EXCEPTION(ex)