/******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** * PndMQTopix4Sink.cxx * * @since 2014-10-10 * @author A. Rybalchenko */ #include #include #include #include #include "PndMQTopix4DigiToHit.h" #include "PndMQStatus.h" #include "baseMQtools.h" #include "FairMQLogger.h" #include "mrfdata_8b.h" #include "PndSdsDigiTopix4.h" #include using namespace std; PndMQTopix4DigiToHit::PndMQTopix4DigiToHit() : fHasBoostSerialization(false), fClusterFinder(20, 32, 1.8), fHitProducer(0.01, 0.01, 20, 32), fEventBuilder(50), fClusterSize(11), fStatusOutput(true) { using namespace baseMQ::tools::resolve; bool checkOutputClass = false; if (is_same::value) { if (has_BoostSerialization::value == 1) { checkOutputClass = true; fHasBoostSerialization = true; } } LOG(INFO) << "HasBoostSerialization: " << fHasBoostSerialization; } PndMQTopix4DigiToHit::~PndMQTopix4DigiToHit() { } void PndMQTopix4DigiToHit::Run() { if(fHasBoostSerialization){ FairMQChannel& dataInChannel = fChannels.at("data-in").at(0); FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0); bool statusChannelPresent = false; if (fStatusOutput){ try { (fChannels.at("status-out")); } catch (...){ fStatusOutput = false; LOG(INFO) << "No Status-Out channel!"; } } while (CheckCurrentState(RUNNING)) { std::unique_ptr header(fTransportFactory->CreateMessage()); std::unique_ptr msg(fTransportFactory->CreateMessage()); int status = PndMQStatus::UNDEFINED; if (dataInChannel.Receive(header) > 0) { status = *(static_cast(header->GetData())); if (dataInChannel.ExpectsAnotherPart()) { if (dataInChannel.Receive(msg) > 0) { string msgStr(static_cast(msg->GetData()), msg->GetSize()); istringstream ibuffer(msgStr); boost::archive::binary_iarchive InputArchive(ibuffer); try { InputArchive >> fTopixDigis; } catch (boost::archive::archive_exception& e) { LOG(ERROR) << e.what(); } } } } // LOG(INFO) << "InputData: "; // for (auto itr : fTopixDigis){ // LOG(INFO) << std::setw(12) << std::hex << itr.GetTimeStamp(); // } fEventBuilder.FillData(fTopixDigis); std::vector > separatedData = fEventBuilder.GetSeparatedData(); for (auto & eventIter : separatedData){ std::deque hits; if (eventIter.size() > 1){ std::vector > cluster = fClusterFinder.GetClusters(eventIter); // LOG(INFO) << "Multiple Event Found with " << eventIter.size() << " digs and " << cluster.size() << " clusters"; for (auto clusterIter : cluster){ std::vector clusterDigis; for (auto digiIter : clusterIter){ clusterDigis.push_back((eventIter)[digiIter]); // LOG(INFO) << "Digis " << digiIter << " TimeStamp: " << (eventIter)[digiIter].GetTimeStamp(); // LOG(INFO) << "ClusterDigis: " << clusterDigis.rbegin()->GetTimeStamp(); } //PndSdsHit myHit = fDummy.GetHit(clusterDigis); //fHitProducer.GetHit(clusterDigis); if (clusterDigis.size() < fClusterSize.size() - 1) //counts the sizes of clusters. All above fClusterSize.size() -1 are added to last bin fClusterSize[clusterDigis.size()]++; else fClusterSize[fClusterSize.size() - 1]++; hits.push_back(fHitProducer.GetHit(clusterDigis)); } } else { //PndSdsHit myHit = fDummy.GetHit(eventIter); //fHitProducer.GetHit(eventIter); hits.push_back(fHitProducer.GetHit(eventIter)); } fTopixHitsEvent.push_back(hits); } if (status == PndMQStatus::STOP){ LOG(INFO) << "Received STOP-Signal!" << std::endl; } if (fTopixHitsEvent.size() < 1000 && status == PndMQStatus::RUNNING) continue; std::deque< std::deque > outputData(fTopixHitsEvent.begin(), fTopixHitsEvent.size() > 1000 ? fTopixHitsEvent.begin() + 1000 : fTopixHitsEvent.end()); fTopixHitsEvent.erase( fTopixHitsEvent.begin(), fTopixHitsEvent.size() > 1000 ? fTopixHitsEvent.begin() + 1000 : fTopixHitsEvent.end() ); unique_ptr headerOut(fTransportFactory->CreateMessage(sizeof(int))); memcpy(headerOut->GetData(), &status, sizeof(int)); dataOutChannel.SendPart(headerOut); std::ostringstream obuffer; boost::archive::binary_oarchive OutputArchive(obuffer); OutputArchive << outputData; std::string* strMsg = new std::string(obuffer.str()); unique_ptr msg2(NewMessage(const_cast(strMsg->c_str()), strMsg->length(), free_string, strMsg)); Send(msg2, "data-out", 0); if (fStatusOutput == true){ unique_ptr headerOut2(fTransportFactory->CreateMessage(sizeof(int))); memcpy(headerOut->GetData(), &status, sizeof(int)); fChannels.at("status-out").at(0).SendPart(headerOut2); std::ostringstream obuffer2; boost::archive::binary_oarchive OutputArchive2(obuffer2); OutputArchive2 << fClusterSize; int outputSize2 = obuffer2.str().length(); unique_ptr msg3(fTransportFactory->CreateMessage(outputSize2)); memcpy(msg3->GetData(), obuffer2.str().c_str(), outputSize2); fChannels.at("status-out").at(0).Send(msg3); } // for (auto outerIter : fTopixHitsEvent){ // for (auto innerIter : outerIter) { // if (innerIter.GetTimeStamp() > 24008560000 && innerIter.GetTimeStamp() < 24008570000) // LOG(INFO)<< TString::Format("%12.0f",innerIter.GetTimeStamp()).Data(); // } // } fTopixDigis.clear(); // fTopixHitsEvent.clear(); } } }