/******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** * PndMQTopix4Sink.cxx * * @since 2014-10-10 * @author A. Rybalchenko */ #include #include #include #include #include "PndMQTopix4DigiToHit.h" #include "PndMQStatus.h" #include "baseMQtools.h" #include "FairMQLogger.h" #include "mrfdata_8b.h" #include "PndSdsDigiTopix4.h" #include using namespace std; PndMQTopix4DigiToHit::PndMQTopix4DigiToHit() : fHasBoostSerialization(false), fClusterFinder(20, 32, 1.8), fHitProducer(0.01, 0.01, 20, 32), fEventBuilder(50), fClusterSize(11), fStatusOutput(true) { using namespace baseMQ::tools::resolve; bool checkOutputClass = false; if (is_same::value) { if (has_BoostSerialization::value == 1) { checkOutputClass = true; fHasBoostSerialization = true; } } LOG(INFO) << "HasBoostSerialization: " << fHasBoostSerialization; } PndMQTopix4DigiToHit::~PndMQTopix4DigiToHit() { } void PndMQTopix4DigiToHit::Run() { if(fHasBoostSerialization){ FairMQChannel& dataInChannel = fChannels.at("data-in").at(0); FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0); bool statusChannelPresent = false; if (fStatusOutput){ try { (fChannels.at("status-out")); } catch (...){ fStatusOutput = false; LOG(INFO) << "No Status-Out channel!"; } } while (CheckCurrentState(RUNNING)) { std::unique_ptr header(fTransportFactory->CreateMessage()); std::unique_ptr msg(fTransportFactory->CreateMessage()); int status = PndMQStatus::UNDEFINED; if (dataInChannel.Receive(header) > 0) { status = *(static_cast(header->GetData())); if (dataInChannel.ExpectsAnotherPart()) { if (dataInChannel.Receive(msg) > 0) { string msgStr(static_cast(msg->GetData()), msg->GetSize()); istringstream ibuffer(msgStr); boost::archive::binary_iarchive InputArchive(ibuffer); try { InputArchive >> fTopixDigis; } catch (boost::archive::archive_exception& e) { LOG(ERROR) << e.what(); } } } } // LOG(INFO) << "InputData: "; // for (auto itr : fTopixDigis){ // LOG(INFO) << std::setw(12) << std::hex << itr.GetTimeStamp(); // } fEventBuilder.FillData(fTopixDigis); std::vector > separatedData = fEventBuilder.GetSeparatedData(); for (auto & eventIter : separatedData){ std::vector hits; if (eventIter.size() > 1){ std::vector > cluster = fClusterFinder.GetClusters(eventIter); // LOG(INFO) << "Multiple Event Found with " << eventIter.size() << " digs and " << cluster.size() << " clusters"; for (auto clusterIter : cluster){ std::vector clusterDigis; for (auto digiIter : clusterIter){ clusterDigis.push_back((eventIter)[digiIter]); // LOG(INFO) << "Digis " << digiIter << " TimeStamp: " << (eventIter)[digiIter].GetTimeStamp(); // LOG(INFO) << "ClusterDigis: " << clusterDigis.rbegin()->GetTimeStamp(); } //PndSdsHit myHit = fDummy.GetHit(clusterDigis); //fHitProducer.GetHit(clusterDigis); if (clusterDigis.size() < fClusterSize.size() - 1) //counts the sizes of clusters. All above fClusterSize.size() -1 are added to last bin fClusterSize[clusterDigis.size()]++; else fClusterSize[fClusterSize.size() - 1]++; hits.push_back(fHitProducer.GetHit(clusterDigis)); } } else { //PndSdsHit myHit = fDummy.GetHit(eventIter); //fHitProducer.GetHit(eventIter); hits.push_back(fHitProducer.GetHit(eventIter)); } fTopixHitsEvent.push_back(hits); } if (status == PndMQStatus::STOP){ LOG(INFO) << "Received STOP-Signal!" << std::endl; } unique_ptr headerOut(fTransportFactory->CreateMessage(sizeof(int))); memcpy(headerOut->GetData(), &status, sizeof(int)); dataOutChannel.SendPart(headerOut); std::ostringstream obuffer; boost::archive::binary_oarchive OutputArchive(obuffer); OutputArchive << fTopixHitsEvent; int outputSize = obuffer.str().length(); unique_ptr msg2(fTransportFactory->CreateMessage(outputSize)); memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize); dataOutChannel.Send(msg2); if (fStatusOutput == true){ unique_ptr headerOut2(fTransportFactory->CreateMessage(sizeof(int))); memcpy(headerOut->GetData(), &status, sizeof(int)); fChannels.at("status-out").at(0).SendPart(headerOut2); std::ostringstream obuffer2; boost::archive::binary_oarchive OutputArchive2(obuffer2); OutputArchive2 << fClusterSize; int outputSize2 = obuffer2.str().length(); unique_ptr msg3(fTransportFactory->CreateMessage(outputSize2)); memcpy(msg3->GetData(), obuffer2.str().c_str(), outputSize2); fChannels.at("status-out").at(0).Send(msg3); } // LOG(INFO) << "OutputEventBuilding: "; // for (auto outerIter : fTopixHitsEvent){ // LOG(INFO) << "Event: "; // for (auto innerIter : outerIter) { // LOG(INFO)<< innerIter; // } // } fTopixDigis.clear(); fTopixHitsEvent.clear(); } } }