/* * Sampler.cpp * * Created on: Sep 27, 2012 * Author: dklein */ #include "Sampler.h" #include "TList.h" #include "TObjString.h" #include "TClonesArray.h" #include #include "PndPayload.h" #include "parbase/FairParRootFileIo.h" #include "parbase/FairRuntimeDb.h" #include "TROOT.h" #include "pnd_sampler_tasks/SttHitLoader.h" #include "pnd_sampler_tasks/SdsPixelDigiLoader.h" #include "pnd_sampler_tasks/SdsStripDigiLoader.h" #include #include "Logger.h" #include #include namespace Highway { Sampler::Sampler() : fFairRunAna(new FairRunAna()), fSamplerTask(NULL), fInputFile(""), fBranch(""), fParFile(""), fEventRate(1) { } Sampler::~Sampler() { delete fSamplerTask; delete fFairRunAna; } void Sampler::Init() { DEVICE::Init(); if (fBranch == PndBranch::SttHit) { fSamplerTask = new PndSamplerTasks::SttHitLoader(); } else if (fBranch == PndBranch::MvdPixelDigis) { fSamplerTask = new PndSamplerTasks::SdsPixelDigiLoader(); } else if (fBranch == PndBranch::MvdStripDigis) { fSamplerTask = new PndSamplerTasks::SdsStripDigiLoader(); } fSamplerTask->SetBranch(fBranch); TString rootlogon_macro = TString(getenv("VMCWORKDIR")) + "/gconfig/rootlogon.C"; gROOT->LoadMacro(rootlogon_macro.Data()); gROOT->ProcessLine("rootlogon()"); fFairRunAna->SetInputFile(TString(fInputFile)); fFairRunAna->SetOutputFile("dummy.out"); fFairRunAna->AddTask(fSamplerTask); FairRuntimeDb* rtdb = fFairRunAna->GetRuntimeDb(); FairParRootFileIo* parInput1 = new FairParRootFileIo(); parInput1->open(TString(fParFile).Data()); rtdb->setFirstInput(parInput1); // read complete file and extract digis. fFairRunAna->Init(); fFairRunAna->Run(0, 0); } void Sampler::Run() { Logger::GetInstance()->Log(Logger::INFO, ">>>>>>> Run <<<<<<<"); std::this_thread::sleep_for(std::chrono::seconds(1)); Message* event = NULL; std::thread logger([&]() { DEVICE::LogSocketRates();}); std::thread resetEventCounter([&]() { ResetEventCounter();}); while (true) { for (Message *readEvent : *fSamplerTask->GetOutput()) { event = new Message(); event->Copy(readEvent); fPayloadOutputs->at(0)->Send(event); delete event; --fEventCounter; while (fEventCounter == 0) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } } logger.join(); resetEventCounter.join(); } void Sampler::ResetEventCounter() { while (true) { fEventCounter = fEventRate / 100; std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } void Sampler::Log(int intervalInMs) { std::chrono::high_resolution_clock::time_point t0; std::chrono::high_resolution_clock::time_point t1; long unsigned int bytes = fPayloadOutputs->at(0)->GetBytesTx(); long unsigned int messages = fPayloadOutputs->at(0)->GetMessagesTx(); long unsigned int bytesNew; long unsigned int messagesNew; double megabytesPerSecond = (bytesNew - bytes) / (1024 * 1024); double messagesPerSecond = (messagesNew - messages); t0 = std::chrono::high_resolution_clock::now(); while (true) { std::this_thread::sleep_for(std::chrono::milliseconds(intervalInMs)); t1 = std::chrono::high_resolution_clock::now(); bytesNew = fPayloadOutputs->at(0)->GetBytesTx(); messagesNew = fPayloadOutputs->at(0)->GetMessagesTx(); std::chrono::milliseconds timeSinceLastLog = std::chrono::duration_cast(t1 - t0); megabytesPerSecond = ((double) (bytesNew - bytes) / (1024. * 1024.)) / (double) timeSinceLastLog.count() * 1000.; messagesPerSecond = (double) (messagesNew - messages) / (double) timeSinceLastLog.count() * 1000.; std::stringstream logmsg; logmsg << "send " << messagesPerSecond << " msg/s, " << megabytesPerSecond << " MB/s"; Logger::GetInstance()->Log(Logger::DEBUG, logmsg.str()); bytes = bytesNew; messages = messagesNew; t0 = t1; } } void Sampler::SetProperty(int key, std::string value, int slot/*= 0*/) { switch (key) { case InputFile: fInputFile = value; break; case ParFile: fParFile = value; break; case Branch: fBranch = value; break; default: DEVICE::SetProperty(key, value, slot); break; } } std::string Sampler::GetProperty(int key, std::string default_/*= ""*/, int slot/*= 0*/) { switch (key) { case InputFile: return fInputFile; case ParFile: return fParFile; case Branch: return fBranch; default: return DEVICE::GetProperty(key, default_, slot); } } void Sampler::SetProperty(int key, int value, int slot/*= 0*/) { switch (key) { case EventRate: fEventRate = value; break; default: DEVICE::SetProperty(key, value, slot); break; } } int Sampler::GetProperty(int key, int default_/*= 0*/, int slot/*= 0*/) { switch (key) { case EventRate: return fEventRate; default: return DEVICE::GetProperty(key, default_, slot); } } } /* namespace Highway */