/* * PndMQDataDistributor.cxx * * Created on: 23.11.2016 * Author: Stockmanns */ #include #include "FairMQLogger.h" #include #include #include #include void free_string(void* /*data*/, void* hint) { delete static_cast(hint); } PndMQDataDistributor::PndMQDataDistributor() : fThreshold(1E9), fCurrentThreshold(20E9), fNOutputChannels(-1), fCurrentOutput(0) { OnData("data-in", &PndMQDataDistributor::HandleData); } PndMQDataDistributor::~PndMQDataDistributor() { } void PndMQDataDistributor::InitTask() { fNOutputChannels = fChannels.at("data-out").size(); } bool PndMQDataDistributor::HandleData(FairMQParts& parts, int /*index*/) { int status = PndMQStatus::UNDEFINED; bool nextOutput = false; status = *(static_cast(parts.At(0)->GetData())); if (parts.Size() > 1){ std::string msgStr(static_cast(parts[1].GetData()), parts[1].GetSize()); std::istringstream ibuffer(msgStr); boost::archive::binary_iarchive InputArchive(ibuffer); try { InputArchive >> fInputData; } catch (boost::archive::archive_exception& e) { LOG(ERROR) << e.what(); } for (auto eventItr : fInputData){ //todo if input data is larger than 2 * threshold you get data corruption if (eventItr.size() > 0){ // LOG(INFO) << "Current Threshold: " << fCurrentThreshold << " data : " << eventItr.front().GetTimeStamp(); if (eventItr.front().GetTimeStamp() < fCurrentThreshold){ fOutputData.push_back(eventItr); } else { fNextData.push_back(eventItr); nextOutput = true; } } } if (fOutputData.size() > 0){ SendData(fOutputData); fOutputData.clear(); } if (nextOutput == true){ nextOutput = false; fCurrentThreshold += fThreshold; fCurrentOutput = ((fCurrentOutput + 1 < fNOutputChannels) ? fCurrentOutput + 1 : 0); if (fNextData.size() > 0){ SendData(fNextData); fNextData.clear(); } } } if (status == PndMQStatus::STOP){ LOG(INFO) << "Received STOP-Signal!" << std::endl; return false; } return true; } void PndMQDataDistributor::SendData(std::deque >& data) { FairMQParts outputparts; std::unique_ptr headerCopy(fTransportFactory->CreateMessage(sizeof(int))); int flag = PndMQStatus::RUNNING; memcpy(headerCopy->GetData(), &flag, sizeof(int)); outputparts.AddPart(headerCopy); std::ostringstream obuffer; boost::archive::binary_oarchive OutputArchive(obuffer); //fPndSdsDigiTopix4Vector = frames.front(); OutputArchive << data; std::string* strMsg = new std::string(obuffer.str()); std::unique_ptr msg2(NewMessage(const_cast(strMsg->c_str()), strMsg->length(), free_string, strMsg)); //unique_ptr msg2(fTransportFactory->CreateMessage(const_cast(obuffer.str().c_str()), outputSize, CustomCleanup, &obuffer)); outputparts.AddPart(msg2); // LOG(INFO) << "SendData size: " << data.size() << " to: " << fCurrentOutput << " ts: " << data.front().front().GetTimeStamp(); Send(outputparts,"data-out", fCurrentOutput); }