/******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** * PndMQTopix4Sorter.cxx * * @since 2014-10-10 * @author A. Rybalchenko */ #include #include #include #include #include "PndMQTopix4Sorter.h" #include "PndMQStatus.h" #include "baseMQtools.h" #include "FairMQLogger.h" #include "PndSdsDigiTopix4.h" #include #include using namespace std; PndMQTopix4Sorter::PndMQTopix4Sorter() : fHasBoostSerialization(false) { //gSystem->ResetSignal(kSigInterrupt); //gSystem->ResetSignal(kSigTermination); using namespace baseMQ::tools::resolve; // coverity[pointless_expression]: suppress coverity warnings on apparant if(const). if (has_BoostSerialization::value == 1) fHasBoostSerialization = true; } //void PndMQTopix4Sorter::CustomCleanup(void *data, void *object) //{ // delete (string*)object; //} void PndMQTopix4Sorter::Run() { LOG(INFO) << "Boost Serialization "<< fHasBoostSerialization; if (fHasBoostSerialization){ FairMQChannel& dataInChannel = fChannels.at("data-in").at(0); FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0); int receivedMsgs = 0; PndMapSorterTpl sorter; while (CheckCurrentState(RUNNING)) { FairMQMessage* header = fTransportFactory->CreateMessage(); FairMQMessage* msg = fTransportFactory->CreateMessage(); if (dataInChannel.Receive(header) > 0) { int status = *(static_cast(header->GetData())); if (dataInChannel.ExpectsAnotherPart()) { if (dataInChannel.Receive(msg)){ string msgStr(static_cast(msg->GetData()), msg->GetSize()); istringstream ibuffer(msgStr); boost::archive::binary_iarchive InputArchive(ibuffer); try { InputArchive >> fTopixData; } catch (boost::archive::archive_exception& e) { LOG(ERROR) << e.what(); } // LOG(INFO) << "TopixData: " << fTopixData.size(); // for (auto iter : fTopixData){ // LOG(INFO) << iter.GetTimeStamp(); // } bool endSorting = false; double timeOfLast = 0; if (fTopixData.size() > 0){ for (auto iter : fTopixData){ if (iter.GetTimeStamp() > 0){ sorter.AddElement(iter, iter.GetTimeStamp()); timeOfLast = iter.GetTimeStamp(); } else { endSorting = true; // LOG(INFO) << "---END SORTING---"; } } if (endSorting == false){ sorter.WriteOutData(timeOfLast); fOutputData = sorter.GetOutputData(); sorter.DeleteOutputData(); } else if (endSorting == true || status == PndMQStatus::STOP){ LOG(INFO) << "EndSorting or STOP-Status " << status; sorter.WriteOutAll(); fOutputData = sorter.GetOutputData(); fOutputData.push_back(PndSdsDigiTopix4()); sorter.DeleteOutputData(); endSorting = false; } unique_ptr headerCpy(fTransportFactory->CreateMessage(sizeof(int))); headerCpy->Copy(header); dataOutChannel.SendPart(headerCpy); std::ostringstream obuffer; boost::archive::binary_oarchive OutputArchive(obuffer); OutputArchive << fOutputData; int outputSize = obuffer.str().length(); unique_ptr msg2(fTransportFactory->CreateMessage(outputSize)); memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize); dataOutChannel.Send(msg2); //LOG(INFO) << "Data: " << fTopixData.size() << " " << timeOfLast; // LOG(INFO) << "Output: " << fOutputData.size() << " timeOfLast: " << timeOfLast; // for(auto itr : fOutputData) // LOG(INFO) << itr.GetTimeStamp(); fTopixData.clear(); fOutputData.clear(); } delete(msg); delete (header); } } // LOG(INFO) << "Received Message: " << receivedMsgs++ << " Size: " << msg->GetSize(); if (status == PndMQStatus::STOP){ LOG(INFO) << "STOP-Signal Received!"; sorter.WriteOutAll(); fOutputData = sorter.GetOutputData(); fOutputData.push_back(PndSdsDigiTopix4()); sorter.DeleteOutputData(); unique_ptr headerCpy(fTransportFactory->CreateMessage(sizeof(int))); headerCpy->Copy(header); dataOutChannel.SendPart(headerCpy); std::ostringstream obuffer; boost::archive::binary_oarchive OutputArchive(obuffer); OutputArchive << fOutputData; int outputSize = obuffer.str().length(); unique_ptr msg2(fTransportFactory->CreateMessage(outputSize)); memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize); dataOutChannel.Send(msg2); } } } } } PndMQTopix4Sorter::~PndMQTopix4Sorter() { }