Skip to content
Snippets Groups Projects
Unverified Commit 1bd6f097 authored by Claudio Bisegni's avatar Claudio Bisegni
Browse files

create the Chaos Data Export tool for exporting data in binary and JSON

parent 5c77de69
No related branches found
No related tags found
No related merge requests found
......@@ -43,7 +43,10 @@ message(STATUS "Configure Chaos Node Directory Service")
ADD_SUBDIRECTORY(ChaosNodeDirectory bin/bin_cnd)
message(STATUS "Configure Chaos Data Service")
ADD_SUBDIRECTORY(ChaosDataService bin/bin_chst)
ADD_SUBDIRECTORY(ChaosDataService bin/bin_cds)
message(STATUS "Configure Chaos Data Export")
ADD_SUBDIRECTORY(ChaosDataExport bin/chaos_cde)
message(STATUS "Configure Chaos CLI")
ADD_SUBDIRECTORY(example/ChaosCLI bin/chaos_cli)
......
......@@ -9,17 +9,17 @@ IF( ($ENV{CHAOS32}) OR (BUILD_FORCE_32) )
ENDIF()
ADD_DEFINITIONS(-g -O2 -Wall)
SET(chaos_cli_src main.cpp)
SET(chaos_cde_src cde.cpp)
INCLUDE_DIRECTORIES(. ${PROJECT_SOURCE_DIR}/../../usr/local/include /usr/local/include /usr/include ${PROJECT_SOURCE_DIR}/example/ChaosCLI/)
LINK_DIRECTORIES(${PROJECT_SOURCE_DIR}/../../usr/local/lib /usr/local/lib)
ADD_EXECUTABLE(ChaosCLI ${chaos_cli_src})
ADD_EXECUTABLE(ChaosDataExport ${chaos_cde_src})
SET(ChaosCLILib $ENV{CHAOS_LINK_LIBRARY})
separate_arguments(ChaosCLILib)
SET(ChaosDataExportLib $ENV{CHAOS_LINK_LIBRARY})
separate_arguments(ChaosDataExportLib)
TARGET_LINK_LIBRARIES(ChaosCLI chaos_uitoolkit chaos_common pthread dl ${ChaosCLILib})
TARGET_LINK_LIBRARIES(ChaosDataExport chaos_uitoolkit chaos_common pthread dl ${ChaosCLILib})
INSTALL_TARGETS(/bin ChaosCLI)
\ No newline at end of file
INSTALL_TARGETS(/bin ChaosDataExport)
\ No newline at end of file
......@@ -7,7 +7,7 @@
objects = {
/* Begin PBXBuildFile section */
328B4E8E19C9AEF9008D1331 /* main.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 328B4E8D19C9AEF9008D1331 /* main.cpp */; };
328B4E8E19C9AEF9008D1331 /* cde.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 328B4E8D19C9AEF9008D1331 /* cde.cpp */; };
/* End PBXBuildFile section */
/* Begin PBXCopyFilesBuildPhase section */
......@@ -25,7 +25,7 @@
/* Begin PBXFileReference section */
328B4E7F19C9AE35008D1331 /* ChaosDataExport */ = {isa = PBXFileReference; explicitFileType = "compiled.mach-o.executable"; includeInIndex = 0; path = ChaosDataExport; sourceTree = BUILT_PRODUCTS_DIR; };
328B4E8C19C9AEF9008D1331 /* CMakeList.txt */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = CMakeList.txt; sourceTree = SOURCE_ROOT; };
328B4E8D19C9AEF9008D1331 /* main.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = main.cpp; sourceTree = SOURCE_ROOT; };
328B4E8D19C9AEF9008D1331 /* cde.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = cde.cpp; sourceTree = SOURCE_ROOT; };
/* End PBXFileReference section */
/* Begin PBXFrameworksBuildPhase section */
......@@ -59,7 +59,7 @@
isa = PBXGroup;
children = (
328B4E8C19C9AEF9008D1331 /* CMakeList.txt */,
328B4E8D19C9AEF9008D1331 /* main.cpp */,
328B4E8D19C9AEF9008D1331 /* cde.cpp */,
);
path = ChaosDataExport;
sourceTree = "<group>";
......@@ -115,7 +115,7 @@
isa = PBXSourcesBuildPhase;
buildActionMask = 2147483647;
files = (
328B4E8E19C9AEF9008D1331 /* main.cpp in Sources */,
328B4E8E19C9AEF9008D1331 /* cde.cpp in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
};
......@@ -197,6 +197,7 @@
buildSettings = {
CLANG_CXX_LANGUAGE_STANDARD = "compiler-default";
CLANG_CXX_LIBRARY = "libstdc++";
CONFIGURATION_BUILD_DIR = "$(SRCROOT)/../../usr/local/bin";
HEADER_SEARCH_PATHS = (
../../,
/usr/local/include,
......@@ -229,6 +230,7 @@
buildSettings = {
CLANG_CXX_LANGUAGE_STANDARD = "compiler-default";
CLANG_CXX_LIBRARY = "libstdc++";
CONFIGURATION_BUILD_DIR = "$(SRCROOT)/../../usr/local/bin";
HEADER_SEARCH_PATHS = (
../../,
/usr/local/include,
......
/*
* cde.cpp
* !CHOAS
* Created by Bisegni Claudio.
*
* Copyright 2012 INFN, National Institute of Nuclear Physics
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <iostream>
#include <fstream>
#include <unistd.h>
#include <chaos/common/utility/TimingUtil.h>
#include <chaos/common/network/CNodeNetworkAddress.h>
#include <chaos/ui_toolkit/ChaosUIToolkit.h>
#include <chaos/ui_toolkit/LowLevelApi/LLRpcApi.h>
#include <chaos/ui_toolkit/HighLevelApi/HLDataApi.h>
#include <chaos/common/bson/bson.h>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/date_time/c_local_time_adjustor.hpp>
using namespace std;
using namespace chaos;
using namespace chaos::ui;
using namespace bson;
using namespace boost;
#define OPT_CU_ID "device_id"
#define OPT_TIMEOUT "timeout"
#define OPT_DST_FILE "dest_file"
#define OPT_DST_TYPE "dest_type"
#define OPT_START_TIME "start_time"
#define OPT_END_TIME "end_time"
void printPercendDone(int percend_done) {
std::cout << " " << setfill('0') << setw(4) << percend_done << "& " <<std::flush;
}
void printStep() {
std::cout << "." <<std::flush;
}
int computePercent(uint64_t done, uint64_t all) {
int result = ((double)done/(double)all)*100;
return result;
}
int main(int argc, char* argv[])
{
char buf[255];
uint32_t timeout;
string device_id;
string dst_file;
bool dest_type;
string start_time;
string end_time;
std::string err_str;
std::ostream *destination_stream = NULL;
std::ofstream destination_file;
uint64_t start_ts = 0;
uint64_t end_ts = 0;
int retry = 0;
uint32_t cicle_number = 0;
CDeviceNetworkAddress deviceNetworkAddress;
try{
//! [UIToolkit Attribute Init]
ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<string>(OPT_CU_ID, "The identification string of the device", &device_id);
ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<uint32_t>(OPT_TIMEOUT, "Timeout for wait the answer in milliseconds", 2000, &timeout);
ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<string>(OPT_DST_FILE, "Destination file for save found datapack", &dst_file);
ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<bool>(OPT_DST_TYPE, "Destination date type binary(true) or string(false)", true, &dest_type);
ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<string>(OPT_START_TIME, "Time for first datapack to find [format from %Y-%m-%dT%H:%M:%S.%f to %Y]", &start_time);
ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<string>(OPT_END_TIME, "Time for last datapack to find [format from %Y-%m-%dT%H:%M:%S.%f to %Y]", &end_time);
//! [UIToolkit Attribute Init]
//! [UIToolkit Init]
ChaosUIToolkit::getInstance()->init(argc, argv);
//! [UIToolkit Init]
if(!ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->hasOption(OPT_CU_ID)){
throw CException(-1, "invalid device identification string", "check param");
}
//get the timestamp for query boundary
if(ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->hasOption(OPT_DST_FILE)){
if(!chaos::TimingUtil::dateWellFormat(start_time)) {
throw CException(-2, "Invalid star date format", "check date");
}
start_ts = chaos::TimingUtil::getTimestampFromString(start_time);
}
if(ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->hasOption(OPT_START_TIME)){
if(!chaos::TimingUtil::dateWellFormat(end_time)) {
throw CException(-2, "Invalid end date format", "check date");
}
end_ts = chaos::TimingUtil::getTimestampFromString(end_time);
}
//get the timestamp for query boundary
if(!ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->hasOption(OPT_DST_FILE)){
getcwd(buf, 255);
dst_file.assign(buf, strlen(buf));
dst_file += "/"+device_id+".exp";
}
std::basic_ios<char>::openmode dst_file_mode = ios_base::out;
if(dest_type) {
dst_file_mode |= ios_base::binary;
}
destination_file.open(dst_file.c_str(), dst_file_mode);
if(!destination_file.good()) {
err_str = "Error opening destination file ";
err_str.append(buf, strlen(buf));
throw CException(1, err_str, string("check param"));
}
destination_stream = &destination_file;
//we can allocate the channel
std::cout << "Acquiring controller" << std::endl;
DeviceController *controller = HLDataApi::getInstance()->getControllerForDeviceID(device_id, timeout);
if(!controller) throw CException(4, "Error allcoating decive controller", "device controller creation");
chaos::common::io::QueryFuture *query_future = NULL;
std::cout << "Start Query" << std::endl;
controller->executeTimeIntervallQuery(start_ts, end_ts, &query_future);
if(query_future) {
do {
auto_ptr<CDataWrapper> q_result(query_future->getDataPack(true, timeout));
if(q_result.get()) {
retry = 0;
//get serialization buffer by type
auto_ptr<chaos::common::data::SerializationBuffer> ser(dest_type?q_result->getBSONData():q_result->getJSONData());
//write data
destination_stream->write(ser->getBufferPtr(), ser->getBufferLen());
} else {
break;
}
cicle_number++;
if(!(cicle_number % 10)) {
printStep();
}
if(!(cicle_number % 100)) {
printPercendDone(computePercent(query_future->getCurrentElementIndex(), query_future->getTotalElementFound()));
}
} while((query_future->getCurrentElementIndex() < query_future->getTotalElementFound()) && retry < 3);
//print last percent
printPercendDone(computePercent(query_future->getCurrentElementIndex(), query_future->getTotalElementFound()));
//release the query
controller->releaseQuery(query_future);
}
} catch (CException& e) {
std::cerr << e.errorCode << " - "<< e.errorDomain << " - " << e.errorMessage << std::endl;
} catch (...) {
std::cerr << "General error " << std::endl;
}
try {
//! [UIToolkit Deinit]
ChaosUIToolkit::getInstance()->deinit();
//! [UIToolkit Deinit]
} catch (CException& e) {
std::cerr << e.errorCode << " - "<< e.errorDomain << " - " << e.errorMessage << std::endl;
}
std::cout << std::endl << "Export done"<< std::endl;
return 0;
}
//
// main.cpp
// ChaosDataExport
//
// Created by Claudio Bisegni on 17/09/14.
// Copyright (c) 2014 infn. All rights reserved.
//
#include <iostream>
int main(int argc, const char * argv[])
{
// insert code here...
std::cout << "Hello, World!\n";
return 0;
}
......@@ -265,6 +265,8 @@ int QueryEngine::sendDataToClient(DataCloudQuery *query,
//send data packet
if(!err) {
//previoous iteration has not failed
(*it)->delete_on_dispose = false;
err = (int)connection_info_ptr->channel->sendResultToQueryDataCloud(query->query_id,
query->vfs_query->getNumberOfElementFound(),
++query->total_data_pack_sent,
......@@ -278,7 +280,7 @@ int QueryEngine::sendDataToClient(DataCloudQuery *query,
err = -1;
} else {
//at tis point memory is managed by async direct io system
(*it)->delete_on_dispose = false;
}
}
//delete the datapack
......
......@@ -168,7 +168,7 @@ DirectIOClientConnection *ZMQDirectIOClient::getNewConnection(std::string server
int err = 0;
const int output_buffer_dim = 1;
const int linger_period = 500;
const int timeout = 200;
const int timeout = 1000;
const int min_reconnection_ivl = 100;
const int max_reconnection_ivl = 500;
......
......@@ -107,11 +107,15 @@ int64_t ZMQDirectIOClientConnection::writeToSocket(void *socket,
DirectIOForwarder::freeSentData,
new DisposeSentMemoryInfo(header_deallocation_handler, DisposeSentMemoryInfo::SentPartHeader, sending_opcode));
err = zmq_sendmsg(socket, &msg_header_data, _send_no_wait_flag);
if(err > 0) {
err = 0; //keep error to default behaviour
}
zmq_msg_close(&msg_header_data);
break;
case DIRECT_IO_CHANNEL_PART_DATA_ONLY:
err = zmq_send(socket, &data_pack->header, DIRECT_IO_HEADER_SIZE, _send_more_no_wait_flag);
if(err == -1) {
DirectIOForwarder::freeSentData(data_pack->channel_data, new DisposeSentMemoryInfo(data_deallocation_handler, DisposeSentMemoryInfo::SentPartData, sending_opcode));
delete (data_pack);
return err;
}
......@@ -121,15 +125,16 @@ int64_t ZMQDirectIOClientConnection::writeToSocket(void *socket,
DirectIOForwarder::freeSentData,
new DisposeSentMemoryInfo(data_deallocation_handler, DisposeSentMemoryInfo::SentPartData, sending_opcode));
err = zmq_sendmsg(socket, &msg_data, _send_no_wait_flag);
if(err == -1) {
ZMQDIO_CONNECTION_LERR_ << "Error sending data only";
if(err > 0) {
err = 0; //keep error to default behaviour
}
err = zmq_msg_close(&msg_data);
zmq_msg_close(&msg_data);
break;
case DIRECT_IO_CHANNEL_PART_HEADER_DATA:
err = zmq_send(socket, &data_pack->header, DIRECT_IO_HEADER_SIZE, _send_more_no_wait_flag);
if(err == -1) {
DirectIOForwarder::freeSentData(data_pack->channel_data, new DisposeSentMemoryInfo(data_deallocation_handler, DisposeSentMemoryInfo::SentPartData, sending_opcode));
delete (data_pack);
return err;
}
......
//
// endian.h
// CHAOSFramework
//
// Created by Claudio Bisegni on 3/25/13.
// Copyright (c) 2013 INFN. All rights reserved.
//
/*
* endian.h
* !CHOAS
* Created by Bisegni Claudio.
*
* Copyright 2012 INFN, National Institute of Nuclear Physics
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CHAOSFramework_endian_h
#define CHAOSFramework_endian_h
......
......@@ -36,11 +36,27 @@ void QueryFuture::pushDataPack(cc_data::CDataWrapper *received_datapack, uint64_
waith_for_get_data_Semaphore.unlock();
}
cc_data::CDataWrapper *QueryFuture::getDataPack(bool wait) {
cc_data::CDataWrapper *QueryFuture::getDataPack(bool wait, uint32_t timeout) {
cc_data::CDataWrapper *result = NULL;
while(!data_pack_queue.pop(result) && wait) {
waith_for_get_data_Semaphore.unlock();
if(timeout) {
//wait only once an the return
if(!data_pack_queue.pop(result) && wait) {
//we have no data wait the timeout to receck
waith_for_get_data_Semaphore.wait(timeout);
//try to get next
data_pack_queue.pop(result);
}
}else{
//cicle and waith untile we have a data
while(!data_pack_queue.pop(result) && wait) {
waith_for_get_data_Semaphore.wait();
}
}
waith_for_get_data_Semaphore.unlock();
return result;
}
......
......@@ -52,7 +52,7 @@ namespace chaos {
void pushDataPack(cc_data::CDataWrapper *received_datapack, uint64_t _total_found_element);
public:
cc_data::CDataWrapper *getDataPack(bool wait = true);
cc_data::CDataWrapper *getDataPack(bool wait = true, uint32_t timeout = 0);
const std::string& getQueryID();
......
/*
/*
* TimingUtil.h
* !CHOAS
* Created by Bisegni Claudio.
*
*
* Copyright 2012 INFN, National Institute of Nuclear Physics
*
* Licensed under the Apache License, Version 2.0 (the "License");
......@@ -25,6 +25,20 @@
namespace chaos {
using namespace boost::posix_time;
const std::locale formats[] = {
std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y-%m-%dT%H:%M:%S.%f")),
std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y-%m-%d %H:%M:%S.%f")),
std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y-%m-%d %H:%M:%S")),
std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y-%m-%dT%H:%M:%S")),
std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y-%m-%d %H:%M")),
std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y-%m-%dT%H:%M")),
std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y-%m-%d %H")),
std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y-%m-%dT%H")),
std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y-%m-%d")),
std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y-%m")),
std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y"))};
const size_t formats_n = sizeof(formats)/sizeof(formats[0]);
/*
Class for give some method util for timing purpose
*/
......@@ -41,11 +55,34 @@ namespace chaos {
return (boost::posix_time::microsec_clock::universal_time()-EPOCH).total_milliseconds();
}
static bool dateWellFormat(const std::string& timestamp) {
boost::posix_time::ptime time;
size_t i=0;
for(; i<formats_n; ++i) {
std::istringstream is(timestamp);
is.imbue(formats[i]);
is >> time;
if(time != boost::posix_time::ptime()) break;
}
return i != formats_n;
}
//!convert string timestamp to uint64 ["2012-02-20T00:26:39Z"]
static inline uint64_t getTimestampFromString(const std::string& timestamp) {
boost::posix_time::ptime t = boost::posix_time::time_from_string(timestamp);
return (t-EPOCH).total_milliseconds();
boost::posix_time::ptime time;
size_t i=0;
for(; i<formats_n; ++i) {
std::istringstream is(timestamp);
is.imbue(formats[i]);
is >> time;
if(time != boost::posix_time::ptime()) break;
}
if(i != formats_n) {
return (time-EPOCH).total_milliseconds();
} else {
return 0;
}
}
};
};
}
#endif
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment