Skip to content
Snippets Groups Projects
Commit 708dfd22 authored by Andrea Michelotti's avatar Andrea Michelotti
Browse files

different handlers

parent 6259324b
No related branches found
No related tags found
No related merge requests found
......@@ -36,7 +36,38 @@ namespace chaos {
}
return impl->applyConfiguration();
}*/
int MessagePublishSubscribeBase::addHandler(eventTypes ev,msgHandler cb,bool add){
if(add){
MRDDBG_<<"Register handler "<<ev<<" @"<<std::hex<<cb;
handlers[ev]=cb;
} else if(handlers.count(ev)){
MRDDBG_<<"UNRegister handler "<<ev;
handlers.erase(ev);
}
return 0;
}
int MessagePublishSubscribeBase::addHandler(const std::string& ev,msgHandler cb,bool add){
std::string key=ev;
if(key.size()==0){
return -1;
}
std::replace(key.begin(), key.end(), '/', '.');
std::replace(key.begin(), key.end(), ':', '.');
if(add){
topic_handlers[key]=cb;
MRDDBG_<<topic_handlers.size()<<" + Register handler on key:"<<ev<<" topic:"<<key<<" @"<<std::hex<<cb;
} else if(topic_handlers.count(key)){
topic_handlers.erase(key);
MRDDBG_<<topic_handlers.size()<<" - UNRegister handler on key:"<<ev<<" topic:"<<key;
}
return 0;
}
int MessagePublishSubscribeBase::applyConfiguration(){
MRDDBG_<<"NOT IMPLEMENTED";
......
......@@ -85,28 +85,8 @@ namespace chaos {
*
* @param ev
*/
int addHandler(eventTypes ev,msgHandler cb,bool add=true){
if(add){
handlers[ev]=cb;
} else if(handlers.count(ev)){
handlers.erase(ev);
}
return 0;
}
int addHandler(const std::string& ev,msgHandler cb,bool add=true){
std::string key=ev;
if(key.size()==0){
return -1;
}
std::replace(key.begin(), key.end(), '/', '.');
std::replace(key.begin(), key.end(), ':', '.');
if(add){
topic_handlers[key]=cb;
} else if(topic_handlers.count(key)){
topic_handlers.erase(key);
}
return 0;
}
int addHandler(eventTypes ev,msgHandler cb,bool add=true);
int addHandler(const std::string& ev,msgHandler cb,bool add=true);
/**
* @brief Enable synchronous if supported
*
......
......@@ -248,7 +248,7 @@ void MessagePSRDKafkaConsumer::poll() {
}
stats.counter++;
// MRDDBG_<<" message from:"<<rd_kafka_topic_name(rkm->rkt)<<" par:"<<rkm->partition<<" off:"<<rkm->offset;
// MRDDBG_<<" message from:"<<d.key<<" par:"<<d.par<<" off:"<<d.off <<" handler topic:"<<topic_handlers.count(d.key)<<" onarrive:"<<handlers[ONARRIVE];
/* Print the message key. */
/* if (rkm->key && is_printable(rkm->key, rkm->key_len))
......@@ -309,45 +309,11 @@ void MessagePSRDKafkaConsumer::poll() {
if(topic_handlers.count(d.key)){
topic_handlers[d.key](d);
} else if(handlers[ONARRIVE]){
handlers[ONARRIVE](d);
}
}
} /*else {
ele_t* ele = new ele_t();
ele->key = rd_kafka_topic_name(rkm->rkt);
ele->off = rkm->offset;
ele->par = rkm->partition;
try {
ele->cd = chaos::common::data::CDWUniquePtr(new chaos::common::data::CDataWrapper((const char*)rkm->payload, rkm->len));
} catch (chaos::CException& e) {
stats.errs++;
std::stringstream ss;
ss<< rkm->offset << "," << rkm->partition << " invalid chaos packet from:" << rd_kafka_topic_name(rkm->rkt) << " len:" << rkm->len << " msg:" << e.what();
MRDERR_ <<ss.str();
//<<" string:"<<std::string((const char*)rkm->payload, rkm->len);
if (handlers[ONERROR]) {
ele_t d;
d.key = rd_kafka_topic_name(rkm->rkt);
d.off = rkm->offset;
d.par = rkm->partition;
d.cd = chaos::common::data::CDWUniquePtr(new chaos::common::data::CDataWrapper());
d.cd->addStringValue("msg",ss.str());
d.cd->addInt32Value("err",-1);
handlers[ONERROR](d);
}
rd_kafka_message_destroy(rkm);
return;
}
msgs.push(ele);
que_elem++;
stats.oks++;
data_ready = true;
cond.notify_all();
}*/
}
stats.oks++;
}
......
......@@ -85,7 +85,8 @@ void PSMServer::init(void *init_data) {
cons->addServer(msgbroker);
prod->addServer(msgbroker);
// subscribe to the queue of commands
cons->addHandler(chaos::common::message::MessagePublishSubscribeBase::ONARRIVE, boost::bind(&PSMServer::messageHandler, this, _1));
cons->addHandler(nodeuid + "_cmd", boost::bind(&PSMServer::messageHandler, this, _1));
cons->addHandler(chaos::common::message::MessagePublishSubscribeBase::ONERROR, boost::bind(&PSMServer::messageError, this, _1));
cons->setOption("allow.auto.create.topics","true");
if (cons->applyConfiguration() != 0) {
......@@ -164,6 +165,8 @@ void PSMServer::start() {
if(cfg->hasKey("ismds")){
PSMS_LAPP << "Subscribing to " <<chaos::common::constants::CHAOS_ADMIN_ADMIN_TOPIC;
cons->subscribe(chaos::common::constants::CHAOS_ADMIN_ADMIN_TOPIC);
cons->addHandler(chaos::common::constants::CHAOS_ADMIN_ADMIN_TOPIC, boost::bind(&PSMServer::messageHandler, this, _1));
}
PSMS_LAPP << "Subscribing to " << nodeuid + chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX;
cons->subscribe(nodeuid + chaos::DataPackPrefixID::COMMAND_DATASET_POSTFIX);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment