Skip to content
Snippets Groups Projects
Commit 7a6c2630 authored by Andrea Michelotti's avatar Andrea Michelotti
Browse files

adding kafka into netbroker

parent 83d42aa3
No related branches found
No related tags found
No related merge requests found
......@@ -108,6 +108,7 @@ namespace chaos {
*/
void updateLiveCache(const std::string& name,int64_t te);
/**
* @brief remove storage data to from
*
......
......@@ -280,7 +280,7 @@ int QueryDataMsgPSConsumer::consumeHealthDataEvent(const std::string&
}
}*/
if(channel_data.get()==NULL || channel_data->data()==NULL){
DBG<<"Empty health for:\""<<key<<"\" registration pack";
// DBG<<"Empty health for:\""<<key<<"\" registration pack";
if(alive_map.find(key)==alive_map.end()){
if (cons->subscribe(key) != 0) {
ERR <<"] cannot subscribe to :" << key<<" err:"<<cons->getLastError();
......
......@@ -67,6 +67,8 @@ namespace chaos {
chaos::common::data::CDWUniquePtr _registrationAck(chaos::common::data::CDWUniquePtr data);
public:
std::string nodeuid;
//! Constructor Method
/*!
This method call the \ref GlobalConfiguration::preParseStartupParameters method, starting the
......
......@@ -9,106 +9,107 @@
#endif
#ifdef KAFKA_ASIO_ENABLE
#include "impl/kafka/asio/MessagePSKafkaAsioProducer.h"
#include "impl/kafka/asio/MessagePSKafkaAsioConsumer.h"
#include "impl/kafka/asio/MessagePSKafkaAsioProducer.h"
#endif
#include <chaos/common/global.h>
namespace chaos {
namespace common {
namespace message {
boost::mutex MessagePSDriver::io;
std::map<std::string,producer_uptr_t> MessagePSDriver::producer_drv_m;
std::map<std::string,consumer_uptr_t> MessagePSDriver::consumer_drv_m;
producer_uptr_t MessagePSDriver::getProducerDriver(const std::string&drvname,const std::string& k){
boost::mutex::scoped_lock ll(io);
std::map<std::string,producer_uptr_t>::iterator i=producer_drv_m.find(drvname);
if(i!=producer_drv_m.end()){
MRDDBG_<<drvname<<"] returning allocated producer:"<<std::hex<<i->second.get();
return i->second;
}
producer_uptr_t ret;
namespace common {
namespace message {
boost::mutex MessagePSDriver::io;
std::map<std::string, producer_uptr_t> MessagePSDriver::producer_drv_m;
std::map<std::string, consumer_uptr_t> MessagePSDriver::consumer_drv_m;
producer_uptr_t MessagePSDriver::getNewProducerDriver(const std::string& drvname, const std::string& k) {
producer_uptr_t ret;
#ifdef KAFKA_RDK_ENABLE
if((drvname=="KAFKA-RDK") || (drvname=="kafka-rdk")){
ret.reset(new kafka::rdk::MessagePSKafkaProducer());
producer_drv_m["kafka-rdk"]=ret;
}
if ((drvname == "KAFKA-RDK") || (drvname == "kafka-rdk")) {
ret.reset(new kafka::rdk::MessagePSKafkaProducer());
producer_drv_m["kafka-rdk"] = ret;
}
#endif
#ifdef KAFKA_ASIO_ENABLE
if((drvname=="KAFKA-ASIO")||(drvname=="kafka-asio")){
ret.reset(new kafka::asio::MessagePSKafkaAsioProducer());
producer_drv_m["kafka-asio"]=ret;
}
if ((drvname == "KAFKA-ASIO") || (drvname == "kafka-asio")) {
ret.reset(new kafka::asio::MessagePSKafkaAsioProducer());
producer_drv_m["kafka-asio"] = ret;
}
#endif
if(ret.get()==NULL){
throw chaos::CException(-5,"cannot find a producer driver for:"+drvname,__PRETTY_FUNCTION__);
}
MRDDBG_<<drvname<<"] created producer:"<<std::hex<<ret.get();
if(GlobalConfiguration::getInstance()->hasOption(InitOption::OPT_MSG_PRODUCER_KVP)){
std::vector<std::string> opt=GlobalConfiguration::getInstance()->getOption< std::vector<std::string> >(InitOption::OPT_MSG_PRODUCER_KVP);
std::map<std::string,std::string> kv;
GlobalConfiguration::fillKVParameter(kv ,opt,"");
for(std::map<std::string,std::string>::iterator i=kv.begin();i!=kv.end();i++){
ret->setOption(i->first,i->second);
}
}
return ret;
if (ret.get() == NULL) {
throw chaos::CException(-5, "cannot find a producer driver for:" + drvname, __PRETTY_FUNCTION__);
}
MRDDBG_ << drvname << "] created producer:" << std::hex << ret.get();
if (GlobalConfiguration::getInstance()->hasOption(InitOption::OPT_MSG_PRODUCER_KVP)) {
std::vector<std::string> opt = GlobalConfiguration::getInstance()->getOption<std::vector<std::string> >(InitOption::OPT_MSG_PRODUCER_KVP);
std::map<std::string, std::string> kv;
GlobalConfiguration::fillKVParameter(kv, opt, "");
for (std::map<std::string, std::string>::iterator i = kv.begin(); i != kv.end(); i++) {
ret->setOption(i->first, i->second);
}
consumer_uptr_t MessagePSDriver::getConsumerDriver(const std::string&drvname,const std::string& gid,const std::string& k){
std::map<std::string,consumer_uptr_t>::iterator i=consumer_drv_m.find(drvname);
consumer_uptr_t ret;
if(i!=consumer_drv_m.end()){
MRDDBG_<<drvname<<"] returning allocated consumer:"<<std::hex<<i->second.get();
return i->second;
}
#ifdef KAFKA_RDK_ENABLE
if((drvname=="KAFKA-RDK") || (drvname=="kafka-rdk")){
ret.reset(new kafka::rdk::MessagePSRDKafkaConsumer(gid,k));
consumer_drv_m["kafka-rdk"]=ret;
}
#endif
#ifdef KAFKA_ASIO_ENABLE
if((drvname=="KAFKA-ASIO")||(drvname=="kafka-asio")){
ret.reset(new kafka::asio::MessagePSKafkaAsioConsumer(gid,k));
consumer_drv_m["kafka-asio"]=ret;
}
#endif
}
return ret;
}
producer_uptr_t MessagePSDriver::getProducerDriver(const std::string& drvname, const std::string& k) {
producer_uptr_t ret;
boost::mutex::scoped_lock ll(io);
std::map<std::string, producer_uptr_t>::iterator i = producer_drv_m.find(drvname);
if (i != producer_drv_m.end()) {
MRDDBG_ << drvname << "] returning allocated producer:" << std::hex << i->second.get();
return i->second;
}
ret = getNewProducerDriver(drvname, k);
producer_drv_m[drvname] = ret;
return ret;
}
consumer_uptr_t MessagePSDriver::getNewConsumerDriver(const std::string& drvname, const std::string& gid, const std::string& k) {
consumer_uptr_t ret;
if(ret.get()==NULL){
throw chaos::CException(-5,"cannot find a consumer driver for:"+drvname,__PRETTY_FUNCTION__);
}
MRDDBG_<<drvname<<"] created consumer:"<<std::hex<<ret.get();
if(GlobalConfiguration::getInstance()->hasOption(InitOption::OPT_MSG_CONSUMER_KVP)){
std::vector<std::string> opt=GlobalConfiguration::getInstance()->getOption< std::vector<std::string> >(InitOption::OPT_MSG_CONSUMER_KVP);
std::map<std::string,std::string> kv;
GlobalConfiguration::fillKVParameter(kv ,opt,"");
for(std::map<std::string,std::string>::iterator i=kv.begin();i!=kv.end();i++){
ret->setOption(i->first,i->second);
}
}
return ret;
}
}
}
}
#ifdef KAFKA_RDK_ENABLE
if ((drvname == "KAFKA-RDK") || (drvname == "kafka-rdk")) {
ret.reset(new kafka::rdk::MessagePSRDKafkaConsumer(gid, k));
}
#endif
#ifdef KAFKA_ASIO_ENABLE
if ((drvname == "KAFKA-ASIO") || (drvname == "kafka-asio")) {
ret.reset(new kafka::asio::MessagePSKafkaAsioConsumer(gid, k));
}
#endif
if (ret.get() == NULL) {
throw chaos::CException(-5, "cannot find a consumer driver for:" + drvname, __PRETTY_FUNCTION__);
}
MRDDBG_ << drvname << "] created consumer:" << std::hex << ret.get();
if (GlobalConfiguration::getInstance()->hasOption(InitOption::OPT_MSG_CONSUMER_KVP)) {
std::vector<std::string> opt = GlobalConfiguration::getInstance()->getOption<std::vector<std::string> >(InitOption::OPT_MSG_CONSUMER_KVP);
std::map<std::string, std::string> kv;
GlobalConfiguration::fillKVParameter(kv, opt, "");
for (std::map<std::string, std::string>::iterator i = kv.begin(); i != kv.end(); i++) {
ret->setOption(i->first, i->second);
}
}
return ret;
}
consumer_uptr_t MessagePSDriver::getConsumerDriver(const std::string& drvname, const std::string& gid, const std::string& k) {
std::map<std::string, consumer_uptr_t>::iterator i = consumer_drv_m.find(drvname);
consumer_uptr_t ret;
if (i != consumer_drv_m.end()) {
MRDDBG_ << drvname << "] returning allocated consumer:" << std::hex << i->second.get();
return i->second;
}
ret = MessagePSDriver::getNewConsumerDriver(drvname, gid, k);
consumer_drv_m[drvname] = ret;
return ret;
}
} // namespace message
} // namespace common
} // namespace chaos
......@@ -16,6 +16,8 @@ class MessagePSDriver {
static std::map<std::string,consumer_uptr_t> consumer_drv_m;
public:
static producer_uptr_t getNewProducerDriver(const std::string&drvname,const std::string& k="");
static consumer_uptr_t getNewConsumerDriver(const std::string&drvname,const std::string& gid,const std::string& k="");
static producer_uptr_t getProducerDriver(const std::string&drvname,const std::string& k="");
static consumer_uptr_t getConsumerDriver(const std::string&drvname,const std::string& gid,const std::string& k="");
......
......@@ -90,10 +90,27 @@ void NetworkBroker::init(void *initData) {
if(!globalConfiguration) {
throw CException(-1, "No global configuraiton found", __PRETTY_FUNCTION__);
throw CException(-1, "No global configuration found", __PRETTY_FUNCTION__);
}
MB_LAPP << "Configuration:"<<globalConfiguration->getCompliantJSONString();
if (GlobalConfiguration::getInstance()->getConfiguration()->hasKey(InitOption::OPT_MSG_BROKER_SERVER)) {
std::string msgbrokerdrv = "kafka-rdk";
msgbrokerdrv = GlobalConfiguration::getInstance()->getOption<std::string>(InitOption::OPT_MSG_BROKER_DRIVER);
std::string msgbroker = GlobalConfiguration::getInstance()->getConfiguration()->getStringValue(InitOption::OPT_MSG_BROKER_SERVER);
MB_LAPP << "Initializing producer/consumer based on "<<msgbroker;
prod=chaos::common::message::MessagePSDriver::getNewProducerDriver(msgbrokerdrv);
prod->addServer(msgbroker);
if (prod->applyConfiguration() != 0) {
throw chaos::CException(-1, "cannot initialize Publish Subscribe Producer:" + prod->getLastError(), __PRETTY_FUNCTION__);
}
prod->start();
cons=chaos::common::message::MessagePSDriver::getNewConsumerDriver(msgbrokerdrv,"");
cons->addServer(msgbroker);
}
//---------------------------- D I R E C T I/O ----------------------------
if(globalConfiguration->hasKey(common::direct_io::DirectIOConfigurationKey::DIRECT_IO_IMPL_TYPE)) {
MB_LAPP << "Setup DirectIO sublayer";
......@@ -101,7 +118,7 @@ void NetworkBroker::init(void *initData) {
//construct the rpc server and client name
string direct_io_server_impl = direct_io_impl+"DirectIOServer";
direct_io_client_impl = direct_io_impl + "DirectIOClient";
MB_LAPP << "Trying to initilize DirectIO Server: " << direct_io_server_impl;
MB_LAPP << "Trying to initialize DirectIO Server: " << direct_io_server_impl;
direct_io_server = ObjectFactoryRegister<common::direct_io::DirectIOServer>::getInstance()->getNewInstanceByName(direct_io_server_impl);
if(!direct_io_server) throw CException(-2, "Error creating direct io server implementation:"+direct_io_server_impl, __PRETTY_FUNCTION__);
......
......@@ -31,6 +31,7 @@
//#include <chaos/common/network/PerformanceManagment.h>
#include <chaos/common/utility/StartableService.h>
#include <chaos/common/network/CNodeNetworkAddress.h>
#include <chaos/common/message/MessagePSDriver.h>
namespace chaos {
......@@ -120,6 +121,10 @@ namespace chaos {
//! Rpc server for message listening
chaos::RpcServer *rpc_server;
// publish subscribe
chaos::common::message::producer_uptr_t prod;
chaos::common::message::consumer_uptr_t cons;
//rpc action dispatcher
AbstractCommandDispatcher *rpc_dispatcher;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment