Skip to content
Snippets Groups Projects
Commit f3b018a0 authored by Andrea Michelotti's avatar Andrea Michelotti
Browse files

fix and update

parent 5d1af3bb
No related branches found
No related tags found
No related merge requests found
...@@ -59,33 +59,36 @@ using namespace chaos::common::direct_io::channel; ...@@ -59,33 +59,36 @@ using namespace chaos::common::direct_io::channel;
//constructor //constructor
QueryDataConsumer::QueryDataConsumer() QueryDataConsumer::QueryDataConsumer()
: server_endpoint(NULL), device_channel(NULL), system_api_channel(NULL), device_data_worker_index(0), storage_queue_push_timeout(ChaosMetadataService::getInstance()->setting.worker_setting.queue_push_timeout) {} : server_endpoint(NULL), device_channel(NULL), system_api_channel(NULL), archive_workers(0),device_data_worker_index(0), storage_queue_push_timeout(ChaosMetadataService::getInstance()->setting.worker_setting.queue_push_timeout) {}
QueryDataConsumer::~QueryDataConsumer() {} QueryDataConsumer::~QueryDataConsumer() {}
void QueryDataConsumer::init(void* init_data) { void QueryDataConsumer::init(void* init_data) {
//get new chaos direct io endpoint //get new chaos direct io endpoint
server_endpoint = NetworkBroker::getInstance()->getDirectIOServerEndpoint(); server_endpoint = NetworkBroker::getInstance()->getDirectIOServerEndpoint();
if(server_endpoint==NULL){ archive_workers=ChaosMetadataService::getInstance()->setting.worker_setting.instances;
INFO << "DirectIO disabled";
return; if (server_endpoint != NULL) {
} // if (!server_endpoint) throw chaos::CException(-2, "Invalid server endpoint", __FUNCTION__);
// if (!server_endpoint) throw chaos::CException(-2, "Invalid server endpoint", __FUNCTION__); INFO << "QueryDataConsumer initialized with endpoint " << server_endpoint->getRouteIndex();
INFO << "QueryDataConsumer initialized with endpoint " << server_endpoint->getRouteIndex();
INFO << "Allocating DirectIODeviceServerChannel"; INFO << "Allocating DirectIODeviceServerChannel";
device_channel = (DirectIODeviceServerChannel*)server_endpoint->getNewChannelInstance("DirectIODeviceServerChannel"); device_channel = (DirectIODeviceServerChannel*)server_endpoint->getNewChannelInstance("DirectIODeviceServerChannel");
if (!device_channel) throw chaos::CException(-3, "Error allocating device server channel", __FUNCTION__); if (!device_channel) throw chaos::CException(-3, "Error allocating device server channel", __FUNCTION__);
device_channel->setHandler(this); device_channel->setHandler(this);
INFO << "Allocating DirectIOSystemAPIServerChannel"; INFO << "Allocating DirectIOSystemAPIServerChannel";
system_api_channel = (DirectIOSystemAPIServerChannel*)server_endpoint->getNewChannelInstance("DirectIOSystemAPIServerChannel"); system_api_channel = (DirectIOSystemAPIServerChannel*)server_endpoint->getNewChannelInstance("DirectIOSystemAPIServerChannel");
if (!system_api_channel) throw chaos::CException(-4, "Error allocating system api server channel", __FUNCTION__); if (!system_api_channel) throw chaos::CException(-4, "Error allocating system api server channel", __FUNCTION__);
system_api_channel->setHandler(this); system_api_channel->setHandler(this);
} else {
INFO << "DirectIO disabled";
}
INFO << "Archive workers:"<<archive_workers;
//device data worker instances //device data worker instances
for (int idx = 0; for (int idx = 0;
idx < ChaosMetadataService::getInstance()->setting.worker_setting.instances; idx < archive_workers;
idx++) { idx++) {
DataWorkerSharedPtr tmp; DataWorkerSharedPtr tmp;
#if CHAOS_PROMETHEUS #if CHAOS_PROMETHEUS
...@@ -111,7 +114,7 @@ void QueryDataConsumer::deinit() { ...@@ -111,7 +114,7 @@ void QueryDataConsumer::deinit() {
} }
INFO << "Deallocating device push data worker list"; INFO << "Deallocating device push data worker list";
for (int idx = 0; idx < ChaosMetadataService::getInstance()->setting.worker_setting.instances; idx++) { for (int idx = 0; idx <archive_workers; idx++) {
INFO << "Release device worker " << idx; INFO << "Release device worker " << idx;
device_data_worker[idx]->stop(); device_data_worker[idx]->stop();
device_data_worker[idx]->deinit(); device_data_worker[idx]->deinit();
...@@ -122,17 +125,20 @@ int QueryDataConsumer::consumePutEvent(const std::string& key, ...@@ -122,17 +125,20 @@ int QueryDataConsumer::consumePutEvent(const std::string& key,
const uint8_t hst_tag, const uint8_t hst_tag,
const ChaosStringSetConstSPtr meta_tag_set, const ChaosStringSetConstSPtr meta_tag_set,
chaos::common::data::CDataWrapper& data_pack) { chaos::common::data::CDataWrapper& data_pack) {
int err = 0; int err = 0;
//data_pack.addInt64Value(NodeHealtDefinitionKey::NODE_HEALT_MDS_TIMESTAMP, now); //data_pack.addInt64Value(NodeHealtDefinitionKey::NODE_HEALT_MDS_TIMESTAMP, now);
BufferSPtr channel_data_injected(data_pack.getBSONDataBuffer().release()); chaos::common::data::BufferUPtr ptr = data_pack.getBSONDataBuffer();
if (ptr.get() == NULL) {
ERR << key << " empty packet";
return -1;
}
BufferSPtr channel_data_injected(ptr.release());
DataServiceNodeDefinitionType::DSStorageType storage_type = static_cast<DataServiceNodeDefinitionType::DSStorageType>(hst_tag); DataServiceNodeDefinitionType::DSStorageType storage_type = static_cast<DataServiceNodeDefinitionType::DSStorageType>(hst_tag);
//! if tag is == 1 the datapack is in liveonly //! if tag is == 1 the datapack is in liveonly
if (storage_type & DataServiceNodeDefinitionType::DSStorageTypeLive) { if (storage_type & DataServiceNodeDefinitionType::DSStorageTypeLive && channel_data_injected.get()) {
//protected access to cached driver //protected access to cached driver
CacheDriver& cache_slot = DriverPoolManager::getInstance()->getCacheDrv(); CacheDriver& cache_slot = DriverPoolManager::getInstance()->getCacheDrv();
err = cache_slot.putData(key, err = cache_slot.putData(key,
...@@ -150,10 +156,10 @@ int QueryDataConsumer::consumePutEvent(const std::string& key, ...@@ -150,10 +156,10 @@ int QueryDataConsumer::consumePutEvent(const std::string& key,
ERR << "Error pushing datapack into logstorage driver"; ERR << "Error pushing datapack into logstorage driver";
} }
} }
if (!err && if (!err && (storage_type & (DataServiceNodeDefinitionType::DSStorageTypeHistory | DataServiceNodeDefinitionType::DSStorageTypeFile))) {
(storage_type & (DataServiceNodeDefinitionType::DSStorageTypeHistory | DataServiceNodeDefinitionType::DSStorageTypeFile))) {
//compute the index to use for the data worker //compute the index to use for the data worker
uint32_t index_to_use = device_data_worker_index++ % ChaosMetadataService::getInstance()->setting.worker_setting.instances; uint32_t index_to_use;
index_to_use = device_data_worker_index++ % archive_workers;
CHAOS_ASSERT(device_data_worker[index_to_use].get()) CHAOS_ASSERT(device_data_worker[index_to_use].get())
//create storage job information //create storage job information
auto job = ChaosMakeSharedPtr<DeviceSharedWorkerJob>(); auto job = ChaosMakeSharedPtr<DeviceSharedWorkerJob>();
......
...@@ -64,7 +64,7 @@ namespace chaos{ ...@@ -64,7 +64,7 @@ namespace chaos{
boost::atomic<uint16_t> device_data_worker_index; boost::atomic<uint16_t> device_data_worker_index;
int64_t storage_queue_push_timeout; int64_t storage_queue_push_timeout;
DataWorkerVec device_data_worker; DataWorkerVec device_data_worker;
int archive_workers;
//---------------- DirectIODeviceServerChannelHandler ----------------------- //---------------- DirectIODeviceServerChannelHandler -----------------------
protected: protected:
int consumePutEvent(const std::string& key, int consumePutEvent(const std::string& key,
......
This diff is collapsed.
...@@ -244,6 +244,10 @@ void MessagePSRDKafkaConsumer::poll() { ...@@ -244,6 +244,10 @@ void MessagePSRDKafkaConsumer::poll() {
d.par = rkm->partition; d.par = rkm->partition;
try { try {
d.cd = chaos::common::data::CDWUniquePtr(new chaos::common::data::CDataWrapper((const char*)rkm->payload, rkm->len)); d.cd = chaos::common::data::CDWUniquePtr(new chaos::common::data::CDataWrapper((const char*)rkm->payload, rkm->len));
if(d.cd.get()==NULL){
MRDERR_<<" invalid bson "<<d.key;
return;
}
} catch (chaos::CException& e) { } catch (chaos::CException& e) {
stats.errs++; stats.errs++;
std::stringstream ss; std::stringstream ss;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment