/******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** * PndMQTopix4TrackFitter.cxx * * @since 2014-10-10 * @author A. Rybalchenko */ #include #include #include #include #include "PndMQTopix4TrackFitter.h" #include "PndMQStatus.h" #include "baseMQtools.h" #include "FairMQLogger.h" #include #include using namespace std; PndMQTopix4TrackFitter::PndMQTopix4TrackFitter() : fHasBoostSerialization(false), fEventNr(0), fNtracksPerEvent(11,0) { //gSystem->ResetSignal(kSigInterrupt); //gSystem->ResetSignal(kSigTermination); using namespace baseMQ::tools::resolve; // coverity[pointless_expression]: suppress coverity warnings on apparant if(const). if (has_BoostSerialization::value == 1) fHasBoostSerialization = true; } //void PndMQTopix4TrackFitter::CustomCleanup(void *data, void *object) //{ // delete (string*)object; //} void PndMQTopix4TrackFitter::Run() { LOG(INFO) << "Boost Serialization "<< fHasBoostSerialization; if (fHasBoostSerialization){ const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0); const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0); const FairMQChannel& statusChannel = fChannels.at("status-out").at(0); int receivedMsgs = 0; PndMQStraightLineTrackFinder trackFinder; while (CheckCurrentState(RUNNING)) { FairMQMessagePtr header = fTransportFactory->CreateMessage(); FairMQMessagePtr msg = fTransportFactory->CreateMessage(); if (dataInChannel.Receive(header) > 0) { int status = *(static_cast(header->GetData())); if (dataInChannel.ExpectsAnotherPart()) { if (dataInChannel.Receive(msg)){ string msgStr(static_cast(msg->GetData()), msg->GetSize()); istringstream ibuffer(msgStr); boost::archive::binary_iarchive InputArchive(ibuffer); try { InputArchive >> fTopixData; } catch (boost::archive::archive_exception& e) { LOG(ERROR) << e.what(); } // LOG(INFO) << "TopixData: " << fTopixData.size(); // for (auto iter : fTopixData){ // LOG(INFO) << iter.GetTimeStamp(); // } if (fTopixData.size() > 0){ for (auto eventData : fTopixData){ std::vector trackData = trackFinder.FindTracks(eventData, fEventNr); if (trackData.size() > 0 && trackData.size() < 10){ fNtracksPerEvent[trackData.size()]++; } else if (trackData.size() > 9) { fNtracksPerEvent[10]++; } //LOG(INFO) << "TrackData: " << trackData.size(); //for(auto track : trackData) // LOG(INFO) << "PndMQTopix4TrackFinder Track: " << track.GetTrackCand().GetNHits(); if (trackData.size() > 0) fOutputData.push_back(trackData); fEventNr++; } unique_ptr headerCpy(fTransportFactory->CreateMessage(sizeof(int))); headerCpy->Copy(header); dataOutChannel.SendPart(headerCpy); std::ostringstream obuffer; boost::archive::binary_oarchive OutputArchive(obuffer); OutputArchive << fOutputData; int outputSize = obuffer.str().length(); unique_ptr msg2(fTransportFactory->CreateMessage(outputSize)); memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize); dataOutChannel.Send(msg2); //LOG(INFO) << "Data: " << fTopixData.size() << " " << timeOfLast; // LOG(INFO) << "Output: " << fOutputData.size() << " timeOfLast: " << timeOfLast; // for(auto itr : fOutputData) // LOG(INFO) << itr.GetTimeStamp(); fTopixData.clear(); fOutputData.clear(); } } } receivedMsgs++; if (receivedMsgs % 100 == 0){ LOG(INFO) << receivedMsgs << " : Number of Tracks per Event: " << fNtracksPerEvent[0] << "/" << fNtracksPerEvent[1] << "/"<< fNtracksPerEvent[2] << "/"<< fNtracksPerEvent[3] << "/"<< fNtracksPerEvent[4] << "/" << fNtracksPerEvent[5]; std::unique_ptr headerCopy(fTransportFactory->CreateMessage(sizeof(int))); int flag = PndMQStatus::RUNNING; memcpy(headerCopy->GetData(), &flag, sizeof(int)); statusChannel.SendPart(headerCopy); std::ostringstream obuffer; boost::archive::binary_oarchive OutputArchive(obuffer); //fPndSdsDigiTopix4Vector = frames.front(); OutputArchive << fNtracksPerEvent; int outputSize = obuffer.str().length(); unique_ptr msg2(fTransportFactory->CreateMessage(outputSize)); memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize); //unique_ptr msg2(fTransportFactory->CreateMessage(const_cast(obuffer.str().c_str()), outputSize, CustomCleanup, &obuffer)); statusChannel.Send(msg2); } // LOG(INFO) << "Received Message: " << receivedMsgs++ << " Size: " << msg->GetSize(); if (status == PndMQStatus::STOP){ LOG(INFO) << "STOP-Signal Received!"; unique_ptr headerCpy(fTransportFactory->CreateMessage(sizeof(int))); headerCpy->Copy(header); dataOutChannel.Send(headerCpy); unique_ptr headerCpy2(fTransportFactory->CreateMessage(sizeof(int))); headerCpy2->Copy(header); statusChannel.Send(headerCpy2); } } } } } PndMQTopix4TrackFitter::~PndMQTopix4TrackFitter() { }