/** * CbmDeviceMonitorT0.cxx * * @since 2019-03-26 * @author F. Uhlig */ #include "CbmDeviceMonitorT0.h" #include "CbmMQDefs.h" #include "CbmMcbm2018MonitorAlgoT0.h" #include "StorableTimeslice.hpp" #include "FairMQLogger.h" #include "FairMQProgOptions.h" // device->fConfig #include "FairParGenericSet.h" #include "RootSerializer.h" #include "TNamed.h" #include "TList.h" #include "TCanvas.h" #include "TFile.h" #include "TH1.h" #include #include #include #include #include struct InitTaskError : std::runtime_error { using std::runtime_error::runtime_error; }; using namespace std; //static Int_t iMess=0; // const Int_t DetMask = 0x0001FFFF; (VF) unused Bool_t bMcbm2018MonitorTaskT0ResetHistos = kFALSE; CbmDeviceMonitorT0::CbmDeviceMonitorT0() : fNumMessages{0} , fAlgo{new CbmMcbm2018MonitorAlgoT0()} , fParCList{nullptr} { } void CbmDeviceMonitorT0::InitTask() try { // Get the information about created channels from the device // Check if the defined channels from the topology (by name) // are in the list of channels which are possible/allowed // for the device // The idea is to check at initilization if the devices are // properly connected. For the time beeing this is done with a // nameing convention. It is not avoided that someone sends other // data on this channel. //logger::SetLogLevel("INFO"); int noChannel = fChannels.size(); LOG(info) << "Number of defined channels: " << noChannel; for(auto const &entry : fChannels) { LOG(info) << "Channel name: " << entry.first; if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match."); OnData(entry.first, &CbmDeviceMonitorT0::HandleData); } InitContainers(); } catch (InitTaskError& e) { LOG(error) << e.what(); // Wrapper defined in CbmMQDefs.h to support different FairMQ versions cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound); } bool CbmDeviceMonitorT0::IsChannelNameAllowed(std::string channelName) { for(auto const &entry : fAllowedChannels) { std::size_t pos1 = channelName.find(entry); if (pos1!=std::string::npos) { const vector::const_iterator pos = std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry); const vector::size_type idx = pos-fAllowedChannels.begin(); LOG(info) << "Found " << entry << " in " << channelName; LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx; return true; } } LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names."; LOG(error) << "Stop device."; return false; } Bool_t CbmDeviceMonitorT0::InitContainers() { LOG(info) << "Init parameter containers for CbmDeviceMonitorT0."; fParCList = fAlgo->GetParList(); for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) { FairParGenericSet* tempObj = (FairParGenericSet*)(fParCList->At(iparC)); fParCList->Remove(tempObj); std::string paramName{tempObj->GetName()}; // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place). // Should only be used for small data because of the cost of an additional copy // Her must come the proper Runid std::string message = paramName + ",111"; LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message; FairMQMessagePtr req(NewSimpleMessage(message)); FairMQMessagePtr rep(NewMessage()); FairParGenericSet* newObj = nullptr; if (Send(req, "parameters") > 0) { if (Receive(rep, "parameters") >= 0) { if (rep->GetSize() != 0) { CbmMQTMessage tmsg(rep->GetData(), rep->GetSize()); newObj = static_cast(tmsg.ReadObject(tmsg.GetClass())); LOG(info) << "Received unpack parameter from the server:"; newObj->print(); } else { LOG(error) << "Received empty reply. Parameter not available"; } } } fParCList->AddAt(newObj, iparC); delete tempObj; } fAlgo->InitContainers(); fAlgo->SetIgnoreOverlapMs(); fAlgo->SetMonitorMode( true ); // fbMonitorMode fAlgo->SetHistoryHistoSize(3600); // fuHistoryHistoSize fAlgo->SetPulserTotLimits(180, 210); // fuMinTotPulser, fuMaxTotPulser fAlgo->AddMsComponentToList(0, 0x90); // fAlgo->SetNbMsInTs(100, 1); Bool_t initOK = fAlgo->ReInitContainers(); /// Histos creation, obtain pointer on them and add them to the HTTP server /// Trigger histo creation on all associated algos initOK &= fAlgo->CreateHistograms(); /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder) std::vector< std::pair< TNamed *, std::string > > vHistos = fAlgo->GetHistoVector(); /// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder) std::vector< std::pair< TCanvas *, std::string > > vCanvases = fAlgo->GetCanvasVector(); for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) { // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName() // << " in " << vHistos[ uHisto ].second.data() // ; // Only send one dimensional histograms for the time being if ( TString(vHistos[ uHisto ].first->ClassName()).Contains("TH1") ) { fArrayHisto.Add(vHistos[ uHisto ].first); } } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv ) { // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName() // << " in " << vCanvases[ uCanv ].second.data() // ; // gROOT->FindObject( (vCanvases[ uCanv ].first)->GetName() ) ); } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv ) return initOK; } // handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0) bool CbmDeviceMonitorT0::HandleData(FairMQMessagePtr& msg, int /*index*/) { // Don't do anything with the data // Maybe add an message counter which counts the incomming messages and add // an output fNumMessages++; LOG(debug) << "Received message number "<< fNumMessages << " with size " << msg->GetSize(); std::string msgStr(static_cast(msg->GetData()), msg->GetSize()); std::istringstream iss(msgStr); boost::archive::binary_iarchive inputArchive(iss); fles::StorableTimeslice component{0}; inputArchive >> component; // CheckTimeslice(component); DoUnpack(component, 0); if(fNumMessages%10000 == 0) LOG(info)<<"Processed "<ProcessRequests(); // Send histograms each 100 time slices. Should be each 1s if(fNumMessages%100 == 0) SendHistograms(); // Here we have to send the data //if((fNumMessages)%10000 == 0) Finish(); return true; } bool CbmDeviceMonitorT0::SendHistograms() { FairMQMessagePtr message(NewMessage()); Serialize(*message, &fArrayHisto); // test code to check if deserialization works /* TObject* tempObject = nullptr; Deserialize(*message, tempObject); if (TString(tempObject->ClassName()).EqualTo("TObjArray")) { TObjArray* arrayHisto = static_cast(tempObject); LOG(info) << "Array contains " << arrayHisto->GetEntriesFast() << " entries"; for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++) { TObject* obj = arrayHisto->At(i); LOG(info) << obj->GetName(); TH1* histogram = static_cast(obj); LOG(info) << histogram->GetNbinsX(); } } */ if ( Send(message, "histogram-in") < 0 ) { LOG(error) << "Problem sending data"; return false; } // reset all histograms for (Int_t i = 0; i < fArrayHisto.GetEntriesFast(); i++) { static_cast(fArrayHisto.At(i))->Reset(); } return true; } CbmDeviceMonitorT0::~CbmDeviceMonitorT0() { } void CbmDeviceMonitorT0::PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc) { LOG(info) << "Header ID: Ox" << std::hex << static_cast(mdsc.hdr_id) << std::dec; LOG(info) << "Header version: Ox" << std::hex << static_cast(mdsc.hdr_ver) << std::dec; LOG(info) << "Equipement ID: " << mdsc.eq_id; LOG(info) << "Flags: " << mdsc.flags; LOG(info) << "Sys ID: Ox" << std::hex << static_cast(mdsc.sys_id) << std::dec; LOG(info) << "Sys version: Ox" << std::hex << static_cast(mdsc.sys_ver) << std::dec; LOG(info) << "Microslice Idx: " << mdsc.idx; LOG(info) << "Checksum: " << mdsc.crc; LOG(info) << "Size: " << mdsc.size; LOG(info) << "Offset: " << mdsc.offset; } bool CbmDeviceMonitorT0::CheckTimeslice(const fles::Timeslice& ts) { if ( 0 == ts.num_components() ) { LOG(error) << "No Component in TS " << ts.index(); return 1; } auto tsIndex = ts.index(); LOG(debug) << "Found " << ts.num_components() << " different components in timeslice " << tsIndex; for (size_t c = 0; c < ts.num_components(); ++c) { LOG(debug) << "Found " << ts.num_microslices(c) << " microslices in component " << c; LOG(debug) << "Component " << c << " has a size of " << ts.size_component(c) << " bytes"; LOG(debug) << "Sys ID: Ox" << std::hex << static_cast(ts.descriptor(0,0).sys_id) << std::dec; /* for (size_t m = 0; m < ts.num_microslices(c); ++m) { PrintMicroSliceDescriptor(ts.descriptor(c,m)); } */ } return true; } Bool_t CbmDeviceMonitorT0::DoUnpack(const fles::Timeslice& ts, size_t /*component*/) { if( /* fbMonitorMode && */ bMcbm2018MonitorTaskT0ResetHistos ) { LOG(info) << "Reset T0 Monitor histos "; fAlgo->ResetHistograms(); bMcbm2018MonitorTaskT0ResetHistos = kFALSE; } // if( fbMonitorMode && bMcbm2018MonitorTaskT0ResetHistos ) if( kFALSE == fAlgo->ProcessTs( ts ) ) { LOG(error) << "Failed processing TS " << ts.index() << " in unpacker algorithm class"; return kTRUE; } // if( kFALSE == fMonitorAlgo->ProcessTs( ts ) ) /// Cleqr the digis vector in case it was filled // std::vector< CbmTofDigiExp > vDigi = fAlgo->GetVector(); fAlgo->ClearVector(); return kTRUE; } void CbmDeviceMonitorT0::Finish() { }