From 708dfd22dc88d389d13ef726a7cd3454f4aec005 Mon Sep 17 00:00:00 2001 From: amichelo <andrea.michelotti@lnf.infn.it> Date: Wed, 1 Feb 2023 23:35:23 +0100 Subject: [PATCH] different handlers --- .../message/MessagePublishSubscribeBase.cpp | 31 ++++++++++++++ .../message/MessagePublishSubscribeBase.h | 24 +---------- .../kafka/rdk/MessagePSRDKafkaConsumer.cpp | 40 ++----------------- chaos/common/rpc/psm/PSMServer.cpp | 5 ++- 4 files changed, 40 insertions(+), 60 deletions(-) diff --git a/chaos/common/message/MessagePublishSubscribeBase.cpp b/chaos/common/message/MessagePublishSubscribeBase.cpp index efbfd5ef0..b714e578d 100644 --- a/chaos/common/message/MessagePublishSubscribeBase.cpp +++ b/chaos/common/message/MessagePublishSubscribeBase.cpp @@ -36,7 +36,38 @@ namespace chaos { } return impl->applyConfiguration(); }*/ + int MessagePublishSubscribeBase::addHandler(eventTypes ev,msgHandler cb,bool add){ + if(add){ + MRDDBG_<<"Register handler "<<ev<<" @"<<std::hex<<cb; + handlers[ev]=cb; + } else if(handlers.count(ev)){ + MRDDBG_<<"UNRegister handler "<<ev; + + handlers.erase(ev); + } + return 0; + } + int MessagePublishSubscribeBase::addHandler(const std::string& ev,msgHandler cb,bool add){ + std::string key=ev; + if(key.size()==0){ + return -1; + } + std::replace(key.begin(), key.end(), '/', '.'); + std::replace(key.begin(), key.end(), ':', '.'); + if(add){ + + topic_handlers[key]=cb; + MRDDBG_<<topic_handlers.size()<<" + Register handler on key:"<<ev<<" topic:"<<key<<" @"<<std::hex<<cb; + + } else if(topic_handlers.count(key)){ + topic_handlers.erase(key); + + MRDDBG_<<topic_handlers.size()<<" - UNRegister handler on key:"<<ev<<" topic:"<<key; + + } + return 0; + } int MessagePublishSubscribeBase::applyConfiguration(){ MRDDBG_<<"NOT IMPLEMENTED"; diff --git a/chaos/common/message/MessagePublishSubscribeBase.h b/chaos/common/message/MessagePublishSubscribeBase.h index c03efbfa0..f476a7e8b 100644 --- a/chaos/common/message/MessagePublishSubscribeBase.h +++ b/chaos/common/message/MessagePublishSubscribeBase.h @@ -85,28 +85,8 @@ namespace chaos { * * @param ev */ - int addHandler(eventTypes ev,msgHandler cb,bool add=true){ - if(add){ - handlers[ev]=cb; - } else if(handlers.count(ev)){ - handlers.erase(ev); - } - return 0; - } - int addHandler(const std::string& ev,msgHandler cb,bool add=true){ - std::string key=ev; - if(key.size()==0){ - return -1; - } - std::replace(key.begin(), key.end(), '/', '.'); - std::replace(key.begin(), key.end(), ':', '.'); - if(add){ - topic_handlers[key]=cb; - } else if(topic_handlers.count(key)){ - topic_handlers.erase(key); - } - return 0; - } + int addHandler(eventTypes ev,msgHandler cb,bool add=true); + int addHandler(const std::string& ev,msgHandler cb,bool add=true); /** * @brief Enable synchronous if supported * diff --git a/chaos/common/message/impl/kafka/rdk/MessagePSRDKafkaConsumer.cpp b/chaos/common/message/impl/kafka/rdk/MessagePSRDKafkaConsumer.cpp index db3f3850b..f3efedee9 100644 --- a/chaos/common/message/impl/kafka/rdk/MessagePSRDKafkaConsumer.cpp +++ b/chaos/common/message/impl/kafka/rdk/MessagePSRDKafkaConsumer.cpp @@ -248,7 +248,7 @@ void MessagePSRDKafkaConsumer::poll() { } stats.counter++; - // MRDDBG_<<" message from:"<<rd_kafka_topic_name(rkm->rkt)<<" par:"<<rkm->partition<<" off:"<<rkm->offset; + // MRDDBG_<<" message from:"<<d.key<<" par:"<<d.par<<" off:"<<d.off <<" handler topic:"<<topic_handlers.count(d.key)<<" onarrive:"<<handlers[ONARRIVE]; /* Print the message key. */ /* if (rkm->key && is_printable(rkm->key, rkm->key_len)) @@ -309,45 +309,11 @@ void MessagePSRDKafkaConsumer::poll() { if(topic_handlers.count(d.key)){ topic_handlers[d.key](d); } else if(handlers[ONARRIVE]){ - handlers[ONARRIVE](d); - } + } - } /*else { - ele_t* ele = new ele_t(); - ele->key = rd_kafka_topic_name(rkm->rkt); - ele->off = rkm->offset; - ele->par = rkm->partition; - - try { - ele->cd = chaos::common::data::CDWUniquePtr(new chaos::common::data::CDataWrapper((const char*)rkm->payload, rkm->len)); - } catch (chaos::CException& e) { - stats.errs++; - std::stringstream ss; - ss<< rkm->offset << "," << rkm->partition << " invalid chaos packet from:" << rd_kafka_topic_name(rkm->rkt) << " len:" << rkm->len << " msg:" << e.what(); - MRDERR_ <<ss.str(); - //<<" string:"<<std::string((const char*)rkm->payload, rkm->len); - if (handlers[ONERROR]) { - ele_t d; - d.key = rd_kafka_topic_name(rkm->rkt); - d.off = rkm->offset; - d.par = rkm->partition; - d.cd = chaos::common::data::CDWUniquePtr(new chaos::common::data::CDataWrapper()); - d.cd->addStringValue("msg",ss.str()); - d.cd->addInt32Value("err",-1); - - handlers[ONERROR](d); - } - rd_kafka_message_destroy(rkm); - return; - } - msgs.push(ele); - que_elem++; - stats.oks++; - data_ready = true; - cond.notify_all(); - }*/ + } stats.oks++; } diff --git a/chaos/common/rpc/psm/PSMServer.cpp b/chaos/common/rpc/psm/PSMServer.cpp index 5e4174d21..cdec9e44b 100644 --- a/chaos/common/rpc/psm/PSMServer.cpp +++ b/chaos/common/rpc/psm/PSMServer.cpp @@ -85,7 +85,8 @@ void PSMServer::init(void *init_data) { cons->addServer(msgbroker); prod->addServer(msgbroker); // subscribe to the queue of commands - cons->addHandler(chaos::common::message::MessagePublishSubscribeBase::ONARRIVE, boost::bind(&PSMServer::messageHandler, this, _1)); + cons->addHandler(nodeuid + "_cmd", boost::bind(&PSMServer::messageHandler, this, _1)); + cons->addHandler(chaos::common::message::MessagePublishSubscribeBase::ONERROR, boost::bind(&PSMServer::messageError, this, _1)); cons->setOption("allow.auto.create.topics","true"); if (cons->applyConfiguration() != 0) { @@ -164,6 +165,8 @@ void PSMServer::start() { if(cfg->hasKey("ismds")){ PSMS_LAPP << "Subscribing to " <<chaos::common::constants::CHAOS_ADMIN_ADMIN_TOPIC; cons->subscribe(chaos::common::constants::CHAOS_ADMIN_ADMIN_TOPIC); + cons->addHandler(chaos::common::constants::CHAOS_ADMIN_ADMIN_TOPIC, boost::bind(&PSMServer::messageHandler, this, _1)); + } PSMS_LAPP << "Subscribing to " << nodeuid + chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX; cons->subscribe(nodeuid + chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX); -- GitLab