/******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** * PndMQTopix4Sink.cxx * * @since 2014-10-10 * @author A. Rybalchenko */ #include "PndMQHitEventDevice.h" #include #include #include #include #include #include #include "baseMQtools.h" #include "FairMQLogger.h" #include "mrfdata_8b.h" #include "PndSdsDigiTopix4.h" #include "PndMQStatus.h" using namespace std; PndMQHitEventDevice::PndMQHitEventDevice() : fHasBoostSerialization(false), fGlobalRunningStatus(true), fBuilder(0) { using namespace baseMQ::tools::resolve; bool checkOutputClass = false; if (is_same::value) { if (has_BoostSerialization::value == 1) { checkOutputClass = true; fHasBoostSerialization = true; } } LOG(INFO) << "HasBoostSerialization: " << fHasBoostSerialization; } PndMQHitEventDevice::~PndMQHitEventDevice() { delete(fBuilder); } void PndMQHitEventDevice::Run() { int numInputs = fChannels.at("data-in").size(); // store the channel references to avoid traversing the map on every loop iteration const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0); // const FairMQChannel& dataOutFileSink = fChannels.at("data-out").at(1); const FairMQChannel& statusChannel = fChannels.at("status-out").at(0); std::unique_ptr poller(fTransportFactory->CreatePoller(fChannels, { "data-in" })); // FairMQChannel* dataInChannels[fChannels.at("data-in").size()]; LOG(INFO) << "Number of Input Channels: " << numInputs; fBuilder = new PndMQHitsEventBuilder(numInputs); // for (int i = 0; i < numInputs; ++i) // { // dataInChannels[i] = &(fChannels.at("data-in").at(i)); // } fDataFromChannels.resize(numInputs); fRunningStatus.resize(numInputs); for(int channel = 0; channel < fRunningStatus.size(); channel++) fRunningStatus[channel] = true; // boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); std::vector fillLevel(numInputs,0); int eventCounter = 0; bool stopMessageOnce = true; while (CheckCurrentState(RUNNING)) { if ( fGlobalRunningStatus == true){ poller->Poll(-1); for (int channelNr = 0; channelNr < numInputs; channelNr++){ // LOG(INFO) << "---- Reading channel " << channelNr << " ----"; // if (fillLevel[channelNr] == 0 && fRunningStatus[channelNr] == true && poller->CheckInput("data-in", channelNr)){ // if (fillLevel[channelNr] == 0 && fRunningStatus[channelNr] == true && poller->CheckInput("data-in", channelNr)){ if (fRunningStatus[channelNr] == true && poller->CheckInput("data-in", channelNr)){ std::unique_ptr header(fTransportFactory->CreateMessage()); std::unique_ptr msg(fTransportFactory->CreateMessage()); // if (dataInChannels[channelNr]->Receive(header) > 0) if (Receive(header, "data-in", channelNr) > 0) { int status = *(static_cast(header->GetData())); if (status == PndMQStatus::RUNNING) fRunningStatus[channelNr] = true; else if (status == PndMQStatus::STOP){ fRunningStatus[channelNr] = false; LOG(INFO) << "STOP-Status received for channel: " << channelNr; } if (fChannels.at("data-in").at(channelNr).ExpectsAnotherPart()) { if (Receive(msg, "data-in", channelNr) > 0){ std::string msgStr(static_cast(msg->GetData()), msg->GetSize()); std::istringstream ibuffer(msgStr); boost::archive::binary_iarchive InputArchive(ibuffer); try { InputArchive >> fHitData; } catch (boost::archive::archive_exception& e) { LOG(ERROR) << e.what(); } // for (auto outerIter : fHitData){ // for (auto innerIter : outerIter) { // if (innerIter.GetTimeStamp() > 24008560000 && innerIter.GetTimeStamp() < 24008570000){ // LOG(INFO) << "Channel Nr: " << channelNr << " TimeStamp: " << TString::Format("%12.0f",fHitData.front().front().GetTimeStamp()).Data(); // } // } // } //fDataFromChannels[channelNr].push_back(fHitData); std::copy (fHitData.begin(), fHitData.end(), std::back_inserter(fDataFromChannels[channelNr])); //fDataFromChannels[channelNr].insert(fDataFromChannels[channelNr].end(), fHitData.begin(), fHitData.end()); // LOG(INFO) << "Data in channel " << fDataFromChannels[channelNr].size(); // if (fDataFromChannels[channelNr].size() > 0) // LOG(INFO) << "Last data in channel " << *(fDataFromChannels[channelNr].begin()->begin()); fHitData.clear(); } } } if (fillLevel[channelNr] == 0 && fRunningStatus[channelNr] == false) { fGlobalRunningStatus = false; LOG(INFO) << "GlobalRunningStatus set to false for channel " << channelNr; } } } fBuilder->AddData(fDataFromChannels); fEventData = fBuilder->GetEvents(); // if ((fEventData.size() > 0 && eventCounter++ % 100 == 0)){ LOG(INFO) << eventCounter << " nEvents: " << fEventData.size() << " hits in Event " << fEventData.front().size() << " timeStamp: " << TString::Format("%12.0f",fEventData.front().front().GetTimeStamp()).Data() << " sensorID " << fEventData.front().front().GetSensorID(); LOG(INFO) << "DataFromChannels: " << fDataFromChannels[0].size() << "/" << fDataFromChannels[1].size() << "/" << fDataFromChannels[2].size() << "/" << fDataFromChannels[3].size(); fSensorsInEvent = fBuilder->GetSensorsInEvent(); LOG(INFO) << "Number of sensor hits in one event: "; for (auto data : fSensorsInEvent) LOG(INFO) << data; std::unique_ptr headerCopy(fTransportFactory->CreateMessage(sizeof(int))); int flag = PndMQStatus::RUNNING; memcpy(headerCopy->GetData(), &flag, sizeof(int)); statusChannel.SendPart(headerCopy); std::ostringstream obuffer; boost::archive::binary_oarchive OutputArchive(obuffer); //fPndSdsDigiTopix4Vector = frames.front(); OutputArchive << fSensorsInEvent; int outputSize = obuffer.str().length(); unique_ptr msg(fTransportFactory->CreateMessage(outputSize)); memcpy(msg->GetData(), obuffer.str().c_str(), outputSize); //unique_ptr msg2(fTransportFactory->CreateMessage(const_cast(obuffer.str().c_str()), outputSize, CustomCleanup, &obuffer)); statusChannel.Send(msg); } std::unique_ptr headerCopy(fTransportFactory->CreateMessage(sizeof(int))); int flag = PndMQStatus::RUNNING; memcpy(headerCopy->GetData(), &flag, sizeof(int)); dataOutChannel.SendPart(headerCopy); std::ostringstream obuffer; boost::archive::binary_oarchive OutputArchive(obuffer); //fPndSdsDigiTopix4Vector = frames.front(); OutputArchive << fEventData; int outputSize = obuffer.str().length(); unique_ptr msg2(fTransportFactory->CreateMessage(outputSize)); memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize); //unique_ptr msg2(fTransportFactory->CreateMessage(const_cast(obuffer.str().c_str()), outputSize, CustomCleanup, &obuffer)); dataOutChannel.Send(msg2); // //// std::unique_ptr headerCopy2(fTransportFactory->CreateMessage(sizeof(int))); //// int flag2 = PndMQStatus::RUNNING; //// memcpy(headerCopy->GetData(), &flag2, sizeof(int)); //// dataOutFileSink.SendPart(headerCopy2); //// //// std::unique_ptr msgCopy(fTransportFactory->CreateMessage()); //// msgCopy->Copy(msg2); //// dataOutFileSink.Send(msgCopy); // fillLevel = fBuilder->GetInputDataLevel(); // // LOG(INFO) << "EventData.size() " << eventData.size(); // // for (auto eventIter : eventData){ // // LOG(INFO) << "Event"; // // for (auto dataIter : eventIter){ // // LOG(INFO) << dataIter.GetSensorID() << " " << dataIter.GetTimeStamp(); // // } // // } // for (int channelNr = 0; channelNr < fDataFromChannels.size(); channelNr++){ fDataFromChannels[channelNr].clear(); } } bool allStop = true; for (auto state : fRunningStatus){ if (state == true) allStop = false; } if (fGlobalRunningStatus == false && stopMessageOnce == true){ LOG(INFO) << "STOP-Signal received for one input"; std::unique_ptr headerCopy(fTransportFactory->CreateMessage(sizeof(int))); int flag = PndMQStatus::STOP; memcpy(headerCopy->GetData(), &flag, sizeof(int)); dataOutChannel.Send(headerCopy); stopMessageOnce = false; } } } void PndMQHitEventDevice::SetProperty(const int key, const string& value) { switch (key) { default: FairMQDevice::SetProperty(key, value); break; } } string PndMQHitEventDevice::GetProperty(const int key, const string& default_ /*= ""*/) { switch (key) { default: return FairMQDevice::GetProperty(key, default_); } } void PndMQHitEventDevice::SetProperty(const int key, const int value) { switch (key) { default: FairMQDevice::SetProperty(key, value); break; } } int PndMQHitEventDevice::GetProperty(const int key, const int default_ /*= 0*/) { switch (key) { default: return FairMQDevice::GetProperty(key, default_); } }