/** * CbmDeviceEventBuilderEtofStar2019.cxx */ #include "CbmDeviceEventBuilderEtofStar2019.h" #include "CbmMQDefs.h" #include "CbmStar2019EventBuilderEtofAlgo.h" #include "CbmStar2019TofPar.h" #include "StorableTimeslice.hpp" #include "FairMQLogger.h" #include "FairMQProgOptions.h" // device->fConfig #include "FairRuntimeDb.h" #include "FairParGenericSet.h" #include "THttpServer.h" #include "TROOT.h" #include "TFile.h" #include "TString.h" #include "TH1.h" #include "TH2.h" #include #include // include this header to serialize vectors #include #include #include #include #include struct InitTaskError : std::runtime_error { using std::runtime_error::runtime_error; }; using namespace std; //static Int_t iMess=0; const Int_t DetMask = 0x003FFFFF; static uint fiSelectComponents{0}; CbmDeviceEventBuilderEtofStar2019::CbmDeviceEventBuilderEtofStar2019() : //CbmDeviceUnpackTofMcbm2018(), fNumMessages(0), fbMonitorMode( kFALSE ), fbDebugMonitorMode( kFALSE ), fbSandboxMode( kFALSE ), fbEventDumpEna( kFALSE ), fParCList( nullptr ), fulTsCounter( 0 ), fNumEvt(0), fEventBuilderAlgo( nullptr ), fTimer(), fUnpackPar( nullptr ), fpBinDumpFile( nullptr ) { fEventBuilderAlgo = new CbmStar2019EventBuilderEtofAlgo(); } CbmDeviceEventBuilderEtofStar2019::~CbmDeviceEventBuilderEtofStar2019() { delete fEventBuilderAlgo; } void CbmDeviceEventBuilderEtofStar2019::InitTask() try { // Get the information about created channels from the device // Check if the defined channels from the topology (by name) // are in the list of channels which are possible/allowed // for the device // The idea is to check at initilization if the devices are // properly connected. For the time beeing this is done with a // nameing convention. It is not avoided that someone sends other // data on this channel. //logger::SetLogLevel("INFO"); int noChannel = fChannels.size(); LOG(info) << "Number of defined channels: " << noChannel; for(auto const &entry : fChannels) { LOG(info) << "Channel name: " << entry.first; if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match."); if(entry.first == "syscmd") { OnData(entry.first, &CbmDeviceEventBuilderEtofStar2019::HandleMessage); continue; } //if(entry.first != "etofevts") OnData(entry.first, &CbmDeviceEventBuilderEtofStar2019::HandleData); if(entry.first != "etofevts") OnData(entry.first, &CbmDeviceEventBuilderEtofStar2019::HandleParts); else { fChannelsToSend[0].push_back(entry.first); LOG(info) << "Init to send data to channel " << fChannelsToSend[0][0]; } } InitContainers(); } catch (InitTaskError& e) { LOG(error) << e.what(); // Wrapper defined in CbmMQDefs.h to support different FairMQ versions cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound); } bool CbmDeviceEventBuilderEtofStar2019::IsChannelNameAllowed(std::string channelName) { for(auto const &entry : fAllowedChannels) { LOG(info) << "Inspect " << entry; std::size_t pos1 = channelName.find(entry); if (pos1!=std::string::npos) { const vector::const_iterator pos = std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry); const vector::size_type idx = pos-fAllowedChannels.begin(); LOG(info) << "Found " << entry << " in " << channelName; LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx; return true; } } LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names."; LOG(error) << "Stop device."; return false; } Bool_t CbmDeviceEventBuilderEtofStar2019::InitContainers() { LOG(info) << "Init parameter containers for CbmDeviceEventBuilderEtofStar2019."; // FairRuntimeDb* fRtdb = FairRuntimeDb::instance(); // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place). // Should only be used for small data because of the cost of an additional copy std::string message{"CbmStar2019TofPar,111"}; LOG(info) << "Requesting parameter container CbmStar2019TofPar, sending message: " << message; FairMQMessagePtr req(NewSimpleMessage("CbmStar2019TofPar,111")); FairMQMessagePtr rep(NewMessage()); if (Send(req, "parameters") > 0) { if (Receive(rep, "parameters") >= 0) { if (rep->GetSize() != 0) { CbmMQTMessage tmsg(rep->GetData(), rep->GetSize()); fUnpackPar = dynamic_cast(tmsg.ReadObject(tmsg.GetClass())); LOG(info) << "Received unpack parameter from parmq server: " << fUnpackPar; fUnpackPar->Print(); } else { LOG(error) << "Received empty reply. Parameter not available"; } } } SetParContainers(); Bool_t initOK = kTRUE; initOK &= fEventBuilderAlgo->InitContainers(); initOK &= ReInitContainers(); // needed for TInt parameters fEventBuilderAlgo->SetAddStatusToEvent( true ); if( kTRUE == fbMonitorMode ) { // CreateHistograms(); initOK &= fEventBuilderAlgo->CreateHistograms(); /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder) std::vector< std::pair< TNamed *, std::string > > vHistos = fEventBuilderAlgo->GetHistoVector(); /* FIXME /// Register the histos in the HTTP server THttpServer* server = FairRunOnline::Instance()->GetHttpServer(); for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) { server->Register( Form( "/%s", vHistos[ uHisto ].second.data() ), vHistos[ uHisto ].first ); } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) server->RegisterCommand("/Reset_EvtBuild_Hist", "bStarEtof2019EventBuilderResetHistos=kTRUE"); server->Restrict("/Reset_EvtBuild_Hist", "allow=admin"); */ } // if( kTRUE == fbMonitorMode ) return initOK; } void CbmDeviceEventBuilderEtofStar2019::SetParContainers() { FairRuntimeDb* fRtdb = FairRuntimeDb::instance(); fParCList = fEventBuilderAlgo->GetParList(); LOG(info) << "Setting parameter containers for " << fParCList->GetEntries() << " entries "; for( Int_t iparC = 0; iparC < fParCList->GetEntries(); ++iparC ) { FairParGenericSet* tempObj = (FairParGenericSet*)(fParCList->At(iparC)); fParCList->Remove(tempObj); std::string sParamName{ tempObj->GetName() }; FairParGenericSet* newObj = dynamic_cast( fRtdb->getContainer( sParamName.data() ) ); LOG(info) << " - Get " << sParamName.data() << " at " << newObj; if( nullptr == newObj ) { LOG(error) << "Failed to obtain parameter container " << sParamName << ", for parameter index " << iparC; return; } // if( nullptr == newObj ) if( iparC == 0 ) { newObj=(FairParGenericSet *) fUnpackPar; LOG(info) << " - Mod " << sParamName.data() << " to " << newObj; } fParCList->AddAt(newObj, iparC); // delete tempObj; } // for( Int_t iparC = 0; iparC < fParCList->GetEntries(); ++iparC ) } void CbmDeviceEventBuilderEtofStar2019::AddMsComponentToList( size_t component, UShort_t usDetectorId ) { fEventBuilderAlgo->AddMsComponentToList( component, usDetectorId ); } Bool_t CbmDeviceEventBuilderEtofStar2019::DoUnpack(const fles::Timeslice& ts, size_t /*component*/) { if( 0 == fulTsCounter ) { LOG(info) << "FIXME ===> Jumping 1st TS as corrupted with current FW + FLESNET combination"; fulTsCounter++; return kTRUE; } // if( 0 == fulTsCounter ) if( kFALSE == fEventBuilderAlgo->ProcessTs( ts ) ) { LOG(error) << "Failed processing TS " << ts.index() << " in event builder algorithm class"; return kTRUE; } // if( kFALSE == fEventBuilderAlgo->ProcessTs( ts ) ) std::vector< CbmTofStarSubevent2019 > & eventBuffer = fEventBuilderAlgo->GetEventBuffer(); for( UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent ) { /// Send the sub-event to the STAR systems Int_t iBuffSzByte = 0; void * pDataBuff = eventBuffer[ uEvent ].BuildOutput( iBuffSzByte ); if( NULL != pDataBuff ) { /// Valid output, do stuff with it! // Bool_t fbSendEventToStar = kFALSE; if( kFALSE == fbSandboxMode ) { /* ** Function to send sub-event block to the STAR DAQ system * trg_word received is packed as: * * trg_cmd|daq_cmd|tkn_hi|tkn_mid|tkn_lo */ /* star_rhicf_write( eventBuffer[ uEvent ].GetTrigger().GetStarTrigerWord(), pDataBuff, iBuffSzByte ); */ SendSubevent(eventBuffer[ uEvent ].GetTrigger().GetStarTrigerWord(),(char *)pDataBuff, iBuffSzByte, 0); } // if( kFALSE == fbSandboxMode ) LOG(debug) << "Sent STAR event with size " << iBuffSzByte << " Bytes" << " and token " << eventBuffer[ uEvent ].GetTrigger().GetStarToken(); } // if( NULL != pDataBuff ) else LOG(error) << "Invalid STAR SubEvent Output, can only happen if trigger " << " object was not set => Do Nothing more with it!!! "; } // for( UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent ) return kTRUE; } Bool_t CbmDeviceEventBuilderEtofStar2019::ReInitContainers() { LOG(info) << "ReInit parameter containers for CbmDeviceEventBuilderEtofStar2019"; Bool_t initOK = fEventBuilderAlgo->ReInitContainers(); return initOK; } // handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0) bool CbmDeviceEventBuilderEtofStar2019::HandleData(FairMQMessagePtr& msg, int /*index*/) { // Don't do anything with the data // Maybe add an message counter which counts the incomming messages and add // an output fNumMessages++; LOG(debug) << "Received message number "<< fNumMessages << " with size " << msg->GetSize(); std::string msgStr(static_cast(msg->GetData()), msg->GetSize()); std::istringstream iss(msgStr); boost::archive::binary_iarchive inputArchive(iss); fles::StorableTimeslice component{0}; inputArchive >> component; CheckTimeslice(component); DoUnpack(component, 0); // if(fNumMessages%10000 == 0) LOG(info)<<"Processed "<(parts.At(0)->GetData()),(parts.At(0))->GetSize()); std::istringstream iss(msgStr); boost::archive::binary_iarchive inputArchive(iss); inputArchive >> ts; CheckTimeslice(ts); if( 1 == fNumMessages ) { LOG(info) << "Initialize TS components list to " << ts.num_components(); for (size_t c {0}; c < ts.num_components(); c++) { auto systemID = static_cast(ts.descriptor(c, 0).sys_id); LOG(info) << "Found systemID: " << std::hex << systemID << std::dec; fEventBuilderAlgo->AddMsComponentToList( c, systemID ); // TOF data } } } break; case 1: { fles::StorableTimeslice component{0}; uint ncomp=parts.Size(); for (uint i=0; i(parts.At(i)->GetData()),(parts.At(i))->GetSize()); std::istringstream iss(msgStr); boost::archive::binary_iarchive inputArchive(iss); //fles::StorableTimeslice component{i}; inputArchive >> component; CheckTimeslice(component); fEventBuilderAlgo->AddMsComponentToList( i, 0x60 ); // TOF data LOG(debug) << "HandleParts message " << fNumMessages << " with indx " << component.index(); } } break; default: ; } if( kFALSE == fEventBuilderAlgo->ProcessTs( ts ) ) { LOG(error) << "Failed processing TS " << ts.index() << " in event builder algorithm class"; return kTRUE; } // if( kFALSE == fEventBuilderAlgo->ProcessTs( ts ) ) std::vector< CbmTofStarSubevent2019 > & eventBuffer = fEventBuilderAlgo->GetEventBuffer(); LOG(debug) <<"Process time slice "<GetData()); const char cmda[4]={*cmd}; LOG(info) << "Handle message " << cmd <<", " << cmd[0]; cbm::mq::LogState(this); // only one implemented so far "Stop" if( strcmp(cmda,"STOP") ) { LOG(info) << "STOP"; cbm::mq::ChangeState(this, cbm::mq::Transition::Ready); cbm::mq::LogState(this); cbm::mq::ChangeState(this, cbm::mq::Transition::DeviceReady); cbm::mq::LogState(this); cbm::mq::ChangeState(this, cbm::mq::Transition::Idle); cbm::mq::LogState(this); cbm::mq::ChangeState(this, cbm::mq::Transition::End); cbm::mq::LogState(this); } return true; } bool CbmDeviceEventBuilderEtofStar2019::CheckTimeslice(const fles::Timeslice& ts) { if ( 0 == ts.num_components() ) { LOG(error) << "No Component in TS " << ts.index(); return 1; } auto tsIndex = ts.index(); LOG(debug) << "Found " << ts.num_components() << " different components in timeslice " << tsIndex; /* for (size_t c = 0; c < ts.num_components(); ++c) { LOG(debug) << "Found " << ts.num_microslices(c) << " microslices in component " << c; LOG(debug) << "Component " << c << " has a size of " << ts.size_component(c) << " bytes"; LOG(debug) << "Sys ID: Ox" << std::hex << static_cast(ts.descriptor(0,0).sys_id) << std::dec; for (size_t m = 0; m < ts.num_microslices(c); ++m) { PrintMicroSliceDescriptor(ts.descriptor(c,m)); } } */ return true; } bool CbmDeviceEventBuilderEtofStar2019::SendEvent( std::vector vdigi, int idx ) { LOG(debug) << "Send Data for event "<(strMsg->c_str()), // data strMsg->length(), // size [](void* , void* object){ delete static_cast(object); }, strMsg)); // object that manages the data LOG(debug) << "Send data to channel "<< idx << " " << fChannelsToSend[idx][0]; // if (Send(msg, fChannelsToSend[idx][0]) < 0) { if (Send(parts, fChannelsToSend[idx][0]) < 0) { LOG(error) << "Problem sending data " << fChannelsToSend[idx][0]; return false; } fNumEvt++; //if(fNumEvt==100) FairMQStateMachine::ChangeState(PAUSE); //sleep(10000); // Stop for debugging ... return true; } bool CbmDeviceEventBuilderEtofStar2019::SendSubevent( uint trig, char *pData, int nData, int idx ) { LOG(debug) << "SendSubevent "<(strMsgE->c_str()), // data strMsgE->length(), // size [](void* , void* object){ delete static_cast(object); }, strMsgE)); // object that manages the data parts.AddPart(NewMessage(const_cast(strMsg->c_str()), // data strMsg->length(), // size [](void* , void* object){ delete static_cast(object); }, strMsg)); // object that manages the data LOG(debug) << "Send data to channel "<< idx << " " << fChannelsToSend[idx][0]; // if (Send(msg, fChannelsToSend[idx][0]) < 0) { if (Send(parts, fChannelsToSend[idx][0]) < 0) { LOG(error) << "Problem sending data " << fChannelsToSend[idx][0]; return false; } fNumEvt++; //if(fNumEvt==100) FairMQStateMachine::ChangeState(PAUSE); //sleep(10000); // Stop for debugging ... return true; } void CbmDeviceEventBuilderEtofStar2019::Reset() { } void CbmDeviceEventBuilderEtofStar2019::Finish() { if( NULL != fpBinDumpFile ) { LOG(info) << "Closing binary file used for event dump."; fpBinDumpFile->close(); } // if( NULL != fpBinDumpFile ) /// If monitor mode enabled, trigger histos creation, obtain pointer on them and add them to the HTTP server if( kTRUE == fbMonitorMode ) { /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder) std::vector< std::pair< TNamed *, std::string > > vHistos = fEventBuilderAlgo->GetHistoVector(); /// (Re-)Create ROOT file to store the histos TDirectory * oldDir = NULL; TFile * histoFile = NULL; // Store current directory position to allow restore later oldDir = gDirectory; // open separate histo file in recreate mode histoFile = new TFile( "data/eventBuilderMonHist.root" , "RECREATE"); histoFile->cd(); /// Register the histos in the HTTP server for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) { /// Make sure we end up in chosen folder gDirectory->mkdir( vHistos[ uHisto ].second.data() ); gDirectory->cd( vHistos[ uHisto ].second.data() ); /// Write plot vHistos[ uHisto ].first->Write(); histoFile->cd(); } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) // Restore original directory position oldDir->cd(); histoFile->Close(); } // if( kTRUE == fbMonitorMode ) }