diff --git a/CMakeLists.txt b/CMakeLists.txt index f95aec44072b473dd79251a92d1fd19ddb68136a..363c7550b9e37b5e0bb91c848dc2d74f366e8c7e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -43,7 +43,10 @@ message(STATUS "Configure Chaos Node Directory Service") ADD_SUBDIRECTORY(ChaosNodeDirectory bin/bin_cnd) message(STATUS "Configure Chaos Data Service") -ADD_SUBDIRECTORY(ChaosDataService bin/bin_chst) +ADD_SUBDIRECTORY(ChaosDataService bin/bin_cds) + +message(STATUS "Configure Chaos Data Export") +ADD_SUBDIRECTORY(ChaosDataExport bin/chaos_cde) message(STATUS "Configure Chaos CLI") ADD_SUBDIRECTORY(example/ChaosCLI bin/chaos_cli) diff --git a/ChaosDataExport/CMakeList.txt b/ChaosDataExport/CMakeList.txt index 04695ae61ae9caf822787f59cf97e13b67d59d4a..01a244889f9bb784161cddf02f29581be91f2d08 100644 --- a/ChaosDataExport/CMakeList.txt +++ b/ChaosDataExport/CMakeList.txt @@ -9,17 +9,17 @@ IF( ($ENV{CHAOS32}) OR (BUILD_FORCE_32) ) ENDIF() ADD_DEFINITIONS(-g -O2 -Wall) -SET(chaos_cli_src main.cpp) +SET(chaos_cde_src cde.cpp) INCLUDE_DIRECTORIES(. ${PROJECT_SOURCE_DIR}/../../usr/local/include /usr/local/include /usr/include ${PROJECT_SOURCE_DIR}/example/ChaosCLI/) LINK_DIRECTORIES(${PROJECT_SOURCE_DIR}/../../usr/local/lib /usr/local/lib) -ADD_EXECUTABLE(ChaosCLI ${chaos_cli_src}) +ADD_EXECUTABLE(ChaosDataExport ${chaos_cde_src}) -SET(ChaosCLILib $ENV{CHAOS_LINK_LIBRARY}) -separate_arguments(ChaosCLILib) +SET(ChaosDataExportLib $ENV{CHAOS_LINK_LIBRARY}) +separate_arguments(ChaosDataExportLib) -TARGET_LINK_LIBRARIES(ChaosCLI chaos_uitoolkit chaos_common pthread dl ${ChaosCLILib}) +TARGET_LINK_LIBRARIES(ChaosDataExport chaos_uitoolkit chaos_common pthread dl ${ChaosCLILib}) -INSTALL_TARGETS(/bin ChaosCLI) \ No newline at end of file +INSTALL_TARGETS(/bin ChaosDataExport) \ No newline at end of file diff --git a/ChaosDataExport/ChaosDataExport.xcodeproj/project.pbxproj b/ChaosDataExport/ChaosDataExport.xcodeproj/project.pbxproj index 5498e9d11c714f988dd1895ef0f7a6ec46f6576b..995756be178d081fe29ad19719be9b9791a646c3 100644 --- a/ChaosDataExport/ChaosDataExport.xcodeproj/project.pbxproj +++ b/ChaosDataExport/ChaosDataExport.xcodeproj/project.pbxproj @@ -7,7 +7,7 @@ objects = { /* Begin PBXBuildFile section */ - 328B4E8E19C9AEF9008D1331 /* main.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 328B4E8D19C9AEF9008D1331 /* main.cpp */; }; + 328B4E8E19C9AEF9008D1331 /* cde.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 328B4E8D19C9AEF9008D1331 /* cde.cpp */; }; /* End PBXBuildFile section */ /* Begin PBXCopyFilesBuildPhase section */ @@ -25,7 +25,7 @@ /* Begin PBXFileReference section */ 328B4E7F19C9AE35008D1331 /* ChaosDataExport */ = {isa = PBXFileReference; explicitFileType = "compiled.mach-o.executable"; includeInIndex = 0; path = ChaosDataExport; sourceTree = BUILT_PRODUCTS_DIR; }; 328B4E8C19C9AEF9008D1331 /* CMakeList.txt */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = CMakeList.txt; sourceTree = SOURCE_ROOT; }; - 328B4E8D19C9AEF9008D1331 /* main.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = main.cpp; sourceTree = SOURCE_ROOT; }; + 328B4E8D19C9AEF9008D1331 /* cde.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = cde.cpp; sourceTree = SOURCE_ROOT; }; /* End PBXFileReference section */ /* Begin PBXFrameworksBuildPhase section */ @@ -59,7 +59,7 @@ isa = PBXGroup; children = ( 328B4E8C19C9AEF9008D1331 /* CMakeList.txt */, - 328B4E8D19C9AEF9008D1331 /* main.cpp */, + 328B4E8D19C9AEF9008D1331 /* cde.cpp */, ); path = ChaosDataExport; sourceTree = "<group>"; @@ -115,7 +115,7 @@ isa = PBXSourcesBuildPhase; buildActionMask = 2147483647; files = ( - 328B4E8E19C9AEF9008D1331 /* main.cpp in Sources */, + 328B4E8E19C9AEF9008D1331 /* cde.cpp in Sources */, ); runOnlyForDeploymentPostprocessing = 0; }; @@ -197,6 +197,7 @@ buildSettings = { CLANG_CXX_LANGUAGE_STANDARD = "compiler-default"; CLANG_CXX_LIBRARY = "libstdc++"; + CONFIGURATION_BUILD_DIR = "$(SRCROOT)/../../usr/local/bin"; HEADER_SEARCH_PATHS = ( ../../, /usr/local/include, @@ -229,6 +230,7 @@ buildSettings = { CLANG_CXX_LANGUAGE_STANDARD = "compiler-default"; CLANG_CXX_LIBRARY = "libstdc++"; + CONFIGURATION_BUILD_DIR = "$(SRCROOT)/../../usr/local/bin"; HEADER_SEARCH_PATHS = ( ../../, /usr/local/include, diff --git a/ChaosDataExport/cde.cpp b/ChaosDataExport/cde.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e644915144dd4cec2ff4e44f0e1df206e59b3c78 --- /dev/null +++ b/ChaosDataExport/cde.cpp @@ -0,0 +1,194 @@ +/* + * cde.cpp + * !CHOAS + * Created by Bisegni Claudio. + * + * Copyright 2012 INFN, National Institute of Nuclear Physics + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdio.h> +#include <iostream> +#include <fstream> +#include <unistd.h> + +#include <chaos/common/utility/TimingUtil.h> +#include <chaos/common/network/CNodeNetworkAddress.h> +#include <chaos/ui_toolkit/ChaosUIToolkit.h> +#include <chaos/ui_toolkit/LowLevelApi/LLRpcApi.h> +#include <chaos/ui_toolkit/HighLevelApi/HLDataApi.h> + +#include <chaos/common/bson/bson.h> +#include <boost/date_time/posix_time/posix_time.hpp> +#include <boost/date_time/c_local_time_adjustor.hpp> + +using namespace std; +using namespace chaos; +using namespace chaos::ui; +using namespace bson; +using namespace boost; + +#define OPT_CU_ID "device_id" +#define OPT_TIMEOUT "timeout" +#define OPT_DST_FILE "dest_file" +#define OPT_DST_TYPE "dest_type" + +#define OPT_START_TIME "start_time" +#define OPT_END_TIME "end_time" + + +void printPercendDone(int percend_done) { + std::cout << " " << setfill('0') << setw(4) << percend_done << "& " <<std::flush; +} + +void printStep() { + std::cout << "." <<std::flush; +} + +int computePercent(uint64_t done, uint64_t all) { + int result = ((double)done/(double)all)*100; + return result; +} + +int main(int argc, char* argv[]) +{ + char buf[255]; + + uint32_t timeout; + string device_id; + string dst_file; + bool dest_type; + string start_time; + string end_time; + std::string err_str; + std::ostream *destination_stream = NULL; + std::ofstream destination_file; + + uint64_t start_ts = 0; + uint64_t end_ts = 0; + + int retry = 0; + uint32_t cicle_number = 0; + CDeviceNetworkAddress deviceNetworkAddress; + try{ + + //! [UIToolkit Attribute Init] + ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<string>(OPT_CU_ID, "The identification string of the device", &device_id); + ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<uint32_t>(OPT_TIMEOUT, "Timeout for wait the answer in milliseconds", 2000, &timeout); + ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<string>(OPT_DST_FILE, "Destination file for save found datapack", &dst_file); + ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<bool>(OPT_DST_TYPE, "Destination date type binary(true) or string(false)", true, &dest_type); + ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<string>(OPT_START_TIME, "Time for first datapack to find [format from %Y-%m-%dT%H:%M:%S.%f to %Y]", &start_time); + ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->addOption<string>(OPT_END_TIME, "Time for last datapack to find [format from %Y-%m-%dT%H:%M:%S.%f to %Y]", &end_time); + + //! [UIToolkit Attribute Init] + + //! [UIToolkit Init] + ChaosUIToolkit::getInstance()->init(argc, argv); + //! [UIToolkit Init] + + + if(!ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->hasOption(OPT_CU_ID)){ + throw CException(-1, "invalid device identification string", "check param"); + } + + //get the timestamp for query boundary + if(ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->hasOption(OPT_DST_FILE)){ + if(!chaos::TimingUtil::dateWellFormat(start_time)) { + throw CException(-2, "Invalid star date format", "check date"); + } + start_ts = chaos::TimingUtil::getTimestampFromString(start_time); + } + + if(ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->hasOption(OPT_START_TIME)){ + if(!chaos::TimingUtil::dateWellFormat(end_time)) { + throw CException(-2, "Invalid end date format", "check date"); + } + + end_ts = chaos::TimingUtil::getTimestampFromString(end_time); + } + + //get the timestamp for query boundary + if(!ChaosUIToolkit::getInstance()->getGlobalConfigurationInstance()->hasOption(OPT_DST_FILE)){ + getcwd(buf, 255); + dst_file.assign(buf, strlen(buf)); + dst_file += "/"+device_id+".exp"; + } + + std::basic_ios<char>::openmode dst_file_mode = ios_base::out; + if(dest_type) { + dst_file_mode |= ios_base::binary; + } + destination_file.open(dst_file.c_str(), dst_file_mode); + if(!destination_file.good()) { + err_str = "Error opening destination file "; + err_str.append(buf, strlen(buf)); + throw CException(1, err_str, string("check param")); + } + + destination_stream = &destination_file; + + //we can allocate the channel + std::cout << "Acquiring controller" << std::endl; + DeviceController *controller = HLDataApi::getInstance()->getControllerForDeviceID(device_id, timeout); + if(!controller) throw CException(4, "Error allcoating decive controller", "device controller creation"); + + chaos::common::io::QueryFuture *query_future = NULL; + std::cout << "Start Query" << std::endl; + controller->executeTimeIntervallQuery(start_ts, end_ts, &query_future); + if(query_future) { + do { + auto_ptr<CDataWrapper> q_result(query_future->getDataPack(true, timeout)); + if(q_result.get()) { + retry = 0; + //get serialization buffer by type + auto_ptr<chaos::common::data::SerializationBuffer> ser(dest_type?q_result->getBSONData():q_result->getJSONData()); + + //write data + destination_stream->write(ser->getBufferPtr(), ser->getBufferLen()); + } else { + break; + } + + cicle_number++; + if(!(cicle_number % 10)) { + printStep(); + } + if(!(cicle_number % 100)) { + printPercendDone(computePercent(query_future->getCurrentElementIndex(), query_future->getTotalElementFound())); + } + + } while((query_future->getCurrentElementIndex() < query_future->getTotalElementFound()) && retry < 3); + //print last percent + printPercendDone(computePercent(query_future->getCurrentElementIndex(), query_future->getTotalElementFound())); + //release the query + controller->releaseQuery(query_future); + } + } catch (CException& e) { + std::cerr << e.errorCode << " - "<< e.errorDomain << " - " << e.errorMessage << std::endl; + } catch (...) { + std::cerr << "General error " << std::endl; + } + + try { + //! [UIToolkit Deinit] + ChaosUIToolkit::getInstance()->deinit(); + //! [UIToolkit Deinit] + } catch (CException& e) { + std::cerr << e.errorCode << " - "<< e.errorDomain << " - " << e.errorMessage << std::endl; + } + + std::cout << std::endl << "Export done"<< std::endl; + return 0; +} + diff --git a/ChaosDataExport/main.cpp b/ChaosDataExport/main.cpp deleted file mode 100644 index b8001a38a5918867d68e461f88d0345666f06f88..0000000000000000000000000000000000000000 --- a/ChaosDataExport/main.cpp +++ /dev/null @@ -1,18 +0,0 @@ -// -// main.cpp -// ChaosDataExport -// -// Created by Claudio Bisegni on 17/09/14. -// Copyright (c) 2014 infn. All rights reserved. -// - -#include <iostream> - -int main(int argc, const char * argv[]) -{ - - // insert code here... - std::cout << "Hello, World!\n"; - return 0; -} - diff --git a/ChaosDataService/query_engine/QueryEngine.cpp b/ChaosDataService/query_engine/QueryEngine.cpp index 6b9c8fdd9e07b22227b14f79f5ee745a3a76d2cb..3fa7aeab6bd3b2b02c695997247fa97ab2f9722c 100644 --- a/ChaosDataService/query_engine/QueryEngine.cpp +++ b/ChaosDataService/query_engine/QueryEngine.cpp @@ -265,6 +265,8 @@ int QueryEngine::sendDataToClient(DataCloudQuery *query, //send data packet if(!err) { //previoous iteration has not failed + (*it)->delete_on_dispose = false; + err = (int)connection_info_ptr->channel->sendResultToQueryDataCloud(query->query_id, query->vfs_query->getNumberOfElementFound(), ++query->total_data_pack_sent, @@ -278,7 +280,7 @@ int QueryEngine::sendDataToClient(DataCloudQuery *query, err = -1; } else { //at tis point memory is managed by async direct io system - (*it)->delete_on_dispose = false; + } } //delete the datapack diff --git a/chaos/common/direct_io/impl/ZMQDirectIOClient.cpp b/chaos/common/direct_io/impl/ZMQDirectIOClient.cpp index 437e7385bebf5cfaa4dc389431d376e881ad2ae0..d2dcb2d4073b9d7bc649facdfde9ef2a3669b904 100644 --- a/chaos/common/direct_io/impl/ZMQDirectIOClient.cpp +++ b/chaos/common/direct_io/impl/ZMQDirectIOClient.cpp @@ -168,7 +168,7 @@ DirectIOClientConnection *ZMQDirectIOClient::getNewConnection(std::string server int err = 0; const int output_buffer_dim = 1; const int linger_period = 500; - const int timeout = 200; + const int timeout = 1000; const int min_reconnection_ivl = 100; const int max_reconnection_ivl = 500; diff --git a/chaos/common/direct_io/impl/ZMQDirectIOClientConnection.cpp b/chaos/common/direct_io/impl/ZMQDirectIOClientConnection.cpp index 62c5a4b2df18211d5c03d4c5551531a353abfa99..50ae8330da43b382b99350c2e0ed1e5e627b589e 100644 --- a/chaos/common/direct_io/impl/ZMQDirectIOClientConnection.cpp +++ b/chaos/common/direct_io/impl/ZMQDirectIOClientConnection.cpp @@ -107,11 +107,15 @@ int64_t ZMQDirectIOClientConnection::writeToSocket(void *socket, DirectIOForwarder::freeSentData, new DisposeSentMemoryInfo(header_deallocation_handler, DisposeSentMemoryInfo::SentPartHeader, sending_opcode)); err = zmq_sendmsg(socket, &msg_header_data, _send_no_wait_flag); + if(err > 0) { + err = 0; //keep error to default behaviour + } zmq_msg_close(&msg_header_data); break; case DIRECT_IO_CHANNEL_PART_DATA_ONLY: err = zmq_send(socket, &data_pack->header, DIRECT_IO_HEADER_SIZE, _send_more_no_wait_flag); if(err == -1) { + DirectIOForwarder::freeSentData(data_pack->channel_data, new DisposeSentMemoryInfo(data_deallocation_handler, DisposeSentMemoryInfo::SentPartData, sending_opcode)); delete (data_pack); return err; } @@ -121,15 +125,16 @@ int64_t ZMQDirectIOClientConnection::writeToSocket(void *socket, DirectIOForwarder::freeSentData, new DisposeSentMemoryInfo(data_deallocation_handler, DisposeSentMemoryInfo::SentPartData, sending_opcode)); err = zmq_sendmsg(socket, &msg_data, _send_no_wait_flag); - if(err == -1) { - ZMQDIO_CONNECTION_LERR_ << "Error sending data only"; + if(err > 0) { + err = 0; //keep error to default behaviour } - err = zmq_msg_close(&msg_data); + zmq_msg_close(&msg_data); break; case DIRECT_IO_CHANNEL_PART_HEADER_DATA: err = zmq_send(socket, &data_pack->header, DIRECT_IO_HEADER_SIZE, _send_more_no_wait_flag); if(err == -1) { + DirectIOForwarder::freeSentData(data_pack->channel_data, new DisposeSentMemoryInfo(data_deallocation_handler, DisposeSentMemoryInfo::SentPartData, sending_opcode)); delete (data_pack); return err; } diff --git a/chaos/common/endian.h b/chaos/common/endian.h index 664ef68af55eedc74634761898aa02363af31311..6723ad045a8dce89d932ef3512d885e5675cc526 100644 --- a/chaos/common/endian.h +++ b/chaos/common/endian.h @@ -1,10 +1,22 @@ -// -// endian.h -// CHAOSFramework -// -// Created by Claudio Bisegni on 3/25/13. -// Copyright (c) 2013 INFN. All rights reserved. -// +/* + * endian.h + * !CHOAS + * Created by Bisegni Claudio. + * + * Copyright 2012 INFN, National Institute of Nuclear Physics + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ #ifndef CHAOSFramework_endian_h #define CHAOSFramework_endian_h diff --git a/chaos/common/io/QueryFuture.cpp b/chaos/common/io/QueryFuture.cpp index 395cd8c600048d2ea2567eb1ad4e8fd3da051e92..3fb8e9d9cb62b6d1056f7d4be4ede56df3e21cb4 100644 --- a/chaos/common/io/QueryFuture.cpp +++ b/chaos/common/io/QueryFuture.cpp @@ -36,11 +36,27 @@ void QueryFuture::pushDataPack(cc_data::CDataWrapper *received_datapack, uint64_ waith_for_get_data_Semaphore.unlock(); } -cc_data::CDataWrapper *QueryFuture::getDataPack(bool wait) { +cc_data::CDataWrapper *QueryFuture::getDataPack(bool wait, uint32_t timeout) { cc_data::CDataWrapper *result = NULL; - while(!data_pack_queue.pop(result) && wait) { - waith_for_get_data_Semaphore.unlock(); + + if(timeout) { + //wait only once an the return + if(!data_pack_queue.pop(result) && wait) { + //we have no data wait the timeout to receck + waith_for_get_data_Semaphore.wait(timeout); + + //try to get next + data_pack_queue.pop(result); + } + }else{ + //cicle and waith untile we have a data + while(!data_pack_queue.pop(result) && wait) { + waith_for_get_data_Semaphore.wait(); + } } + + + waith_for_get_data_Semaphore.unlock(); return result; } diff --git a/chaos/common/io/QueryFuture.h b/chaos/common/io/QueryFuture.h index 288788a0d077968ef25c08eff71b8acce54d9a09..b6f4337965aeca038f490edf4a3acb0fe79cb649 100644 --- a/chaos/common/io/QueryFuture.h +++ b/chaos/common/io/QueryFuture.h @@ -52,7 +52,7 @@ namespace chaos { void pushDataPack(cc_data::CDataWrapper *received_datapack, uint64_t _total_found_element); public: - cc_data::CDataWrapper *getDataPack(bool wait = true); + cc_data::CDataWrapper *getDataPack(bool wait = true, uint32_t timeout = 0); const std::string& getQueryID(); diff --git a/chaos/common/utility/TimingUtil.h b/chaos/common/utility/TimingUtil.h index 6625f7adc65bbc8531acdeea0c4aca1ef5662b16..321c77b1bde899cd7fe7b6c09890c85bf7974e67 100644 --- a/chaos/common/utility/TimingUtil.h +++ b/chaos/common/utility/TimingUtil.h @@ -1,8 +1,8 @@ -/* +/* * TimingUtil.h * !CHOAS * Created by Bisegni Claudio. - * + * * Copyright 2012 INFN, National Institute of Nuclear Physics * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -25,6 +25,20 @@ namespace chaos { using namespace boost::posix_time; + const std::locale formats[] = { + std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y-%m-%dT%H:%M:%S.%f")), + std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y-%m-%d %H:%M:%S.%f")), + std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y-%m-%d %H:%M:%S")), + std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y-%m-%dT%H:%M:%S")), + std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y-%m-%d %H:%M")), + std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y-%m-%dT%H:%M")), + std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y-%m-%d %H")), + std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y-%m-%dT%H")), + std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y-%m-%d")), + std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y-%m")), + std::locale(std::locale::classic(),new boost::posix_time::time_input_facet("%Y"))}; + const size_t formats_n = sizeof(formats)/sizeof(formats[0]); + /* Class for give some method util for timing purpose */ @@ -41,11 +55,34 @@ namespace chaos { return (boost::posix_time::microsec_clock::universal_time()-EPOCH).total_milliseconds(); } + static bool dateWellFormat(const std::string& timestamp) { + boost::posix_time::ptime time; + size_t i=0; + for(; i<formats_n; ++i) { + std::istringstream is(timestamp); + is.imbue(formats[i]); + is >> time; + if(time != boost::posix_time::ptime()) break; + } + return i != formats_n; + } + //!convert string timestamp to uint64 ["2012-02-20T00:26:39Z"] static inline uint64_t getTimestampFromString(const std::string& timestamp) { - boost::posix_time::ptime t = boost::posix_time::time_from_string(timestamp); - return (t-EPOCH).total_milliseconds(); + boost::posix_time::ptime time; + size_t i=0; + for(; i<formats_n; ++i) { + std::istringstream is(timestamp); + is.imbue(formats[i]); + is >> time; + if(time != boost::posix_time::ptime()) break; + } + if(i != formats_n) { + return (time-EPOCH).total_milliseconds(); + } else { + return 0; + } } - }; + }; } #endif