diff --git a/chaos/common/message/MessagePublishSubscribeBase.cpp b/chaos/common/message/MessagePublishSubscribeBase.cpp index efbfd5ef01662513814d7999044abe93d0772b0a..b714e578deab7189cc02db606a870791d7ef782d 100644 --- a/chaos/common/message/MessagePublishSubscribeBase.cpp +++ b/chaos/common/message/MessagePublishSubscribeBase.cpp @@ -36,7 +36,38 @@ namespace chaos { } return impl->applyConfiguration(); }*/ + int MessagePublishSubscribeBase::addHandler(eventTypes ev,msgHandler cb,bool add){ + if(add){ + MRDDBG_<<"Register handler "<<ev<<" @"<<std::hex<<cb; + handlers[ev]=cb; + } else if(handlers.count(ev)){ + MRDDBG_<<"UNRegister handler "<<ev; + + handlers.erase(ev); + } + return 0; + } + int MessagePublishSubscribeBase::addHandler(const std::string& ev,msgHandler cb,bool add){ + std::string key=ev; + if(key.size()==0){ + return -1; + } + std::replace(key.begin(), key.end(), '/', '.'); + std::replace(key.begin(), key.end(), ':', '.'); + if(add){ + + topic_handlers[key]=cb; + MRDDBG_<<topic_handlers.size()<<" + Register handler on key:"<<ev<<" topic:"<<key<<" @"<<std::hex<<cb; + + } else if(topic_handlers.count(key)){ + topic_handlers.erase(key); + + MRDDBG_<<topic_handlers.size()<<" - UNRegister handler on key:"<<ev<<" topic:"<<key; + + } + return 0; + } int MessagePublishSubscribeBase::applyConfiguration(){ MRDDBG_<<"NOT IMPLEMENTED"; diff --git a/chaos/common/message/MessagePublishSubscribeBase.h b/chaos/common/message/MessagePublishSubscribeBase.h index c03efbfa00a7e440ad9d1866754b2981c779f5f2..f476a7e8bf94f54fe26c226128c1e396986978c2 100644 --- a/chaos/common/message/MessagePublishSubscribeBase.h +++ b/chaos/common/message/MessagePublishSubscribeBase.h @@ -85,28 +85,8 @@ namespace chaos { * * @param ev */ - int addHandler(eventTypes ev,msgHandler cb,bool add=true){ - if(add){ - handlers[ev]=cb; - } else if(handlers.count(ev)){ - handlers.erase(ev); - } - return 0; - } - int addHandler(const std::string& ev,msgHandler cb,bool add=true){ - std::string key=ev; - if(key.size()==0){ - return -1; - } - std::replace(key.begin(), key.end(), '/', '.'); - std::replace(key.begin(), key.end(), ':', '.'); - if(add){ - topic_handlers[key]=cb; - } else if(topic_handlers.count(key)){ - topic_handlers.erase(key); - } - return 0; - } + int addHandler(eventTypes ev,msgHandler cb,bool add=true); + int addHandler(const std::string& ev,msgHandler cb,bool add=true); /** * @brief Enable synchronous if supported * diff --git a/chaos/common/message/impl/kafka/rdk/MessagePSRDKafkaConsumer.cpp b/chaos/common/message/impl/kafka/rdk/MessagePSRDKafkaConsumer.cpp index db3f3850b39a77464db0c28622f38c7319063886..f3efedee94e03c7ed33edaea0fb7a106d7c2b5bf 100644 --- a/chaos/common/message/impl/kafka/rdk/MessagePSRDKafkaConsumer.cpp +++ b/chaos/common/message/impl/kafka/rdk/MessagePSRDKafkaConsumer.cpp @@ -248,7 +248,7 @@ void MessagePSRDKafkaConsumer::poll() { } stats.counter++; - // MRDDBG_<<" message from:"<<rd_kafka_topic_name(rkm->rkt)<<" par:"<<rkm->partition<<" off:"<<rkm->offset; + // MRDDBG_<<" message from:"<<d.key<<" par:"<<d.par<<" off:"<<d.off <<" handler topic:"<<topic_handlers.count(d.key)<<" onarrive:"<<handlers[ONARRIVE]; /* Print the message key. */ /* if (rkm->key && is_printable(rkm->key, rkm->key_len)) @@ -309,45 +309,11 @@ void MessagePSRDKafkaConsumer::poll() { if(topic_handlers.count(d.key)){ topic_handlers[d.key](d); } else if(handlers[ONARRIVE]){ - handlers[ONARRIVE](d); - } + } - } /*else { - ele_t* ele = new ele_t(); - ele->key = rd_kafka_topic_name(rkm->rkt); - ele->off = rkm->offset; - ele->par = rkm->partition; - - try { - ele->cd = chaos::common::data::CDWUniquePtr(new chaos::common::data::CDataWrapper((const char*)rkm->payload, rkm->len)); - } catch (chaos::CException& e) { - stats.errs++; - std::stringstream ss; - ss<< rkm->offset << "," << rkm->partition << " invalid chaos packet from:" << rd_kafka_topic_name(rkm->rkt) << " len:" << rkm->len << " msg:" << e.what(); - MRDERR_ <<ss.str(); - //<<" string:"<<std::string((const char*)rkm->payload, rkm->len); - if (handlers[ONERROR]) { - ele_t d; - d.key = rd_kafka_topic_name(rkm->rkt); - d.off = rkm->offset; - d.par = rkm->partition; - d.cd = chaos::common::data::CDWUniquePtr(new chaos::common::data::CDataWrapper()); - d.cd->addStringValue("msg",ss.str()); - d.cd->addInt32Value("err",-1); - - handlers[ONERROR](d); - } - rd_kafka_message_destroy(rkm); - return; - } - msgs.push(ele); - que_elem++; - stats.oks++; - data_ready = true; - cond.notify_all(); - }*/ + } stats.oks++; } diff --git a/chaos/common/rpc/psm/PSMServer.cpp b/chaos/common/rpc/psm/PSMServer.cpp index 5e4174d212697ca4c6e8f83f85356b953a7b88f1..cdec9e44b3383da8be2e997e1c8470555f52ba93 100644 --- a/chaos/common/rpc/psm/PSMServer.cpp +++ b/chaos/common/rpc/psm/PSMServer.cpp @@ -85,7 +85,8 @@ void PSMServer::init(void *init_data) { cons->addServer(msgbroker); prod->addServer(msgbroker); // subscribe to the queue of commands - cons->addHandler(chaos::common::message::MessagePublishSubscribeBase::ONARRIVE, boost::bind(&PSMServer::messageHandler, this, _1)); + cons->addHandler(nodeuid + "_cmd", boost::bind(&PSMServer::messageHandler, this, _1)); + cons->addHandler(chaos::common::message::MessagePublishSubscribeBase::ONERROR, boost::bind(&PSMServer::messageError, this, _1)); cons->setOption("allow.auto.create.topics","true"); if (cons->applyConfiguration() != 0) { @@ -164,6 +165,8 @@ void PSMServer::start() { if(cfg->hasKey("ismds")){ PSMS_LAPP << "Subscribing to " <<chaos::common::constants::CHAOS_ADMIN_ADMIN_TOPIC; cons->subscribe(chaos::common::constants::CHAOS_ADMIN_ADMIN_TOPIC); + cons->addHandler(chaos::common::constants::CHAOS_ADMIN_ADMIN_TOPIC, boost::bind(&PSMServer::messageHandler, this, _1)); + } PSMS_LAPP << "Subscribing to " << nodeuid + chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX; cons->subscribe(nodeuid + chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX);