/** * CbmDeviceTriggerHandlerEtof.cxx * * @since 2019-11-15 * @author N. Herrmann */ #include "CbmDeviceTriggerHandlerEtof.h" #include "CbmMQDefs.h" #include "FairMQLogger.h" #include "FairEventHeader.h" #include "FairMQProgOptions.h" // device->fConfig #include "FairRuntimeDb.h" #include "FairGeoParSet.h" #include "FairRootManager.h" #include "FairRootFileSink.h" #include "FairRunOnline.h" #include "FairFileHeader.h" #include #include #include #include #include #include // this_thread::sleep_for #include #include struct InitTaskError : std::runtime_error { using std::runtime_error::runtime_error; }; static std::chrono::steady_clock::time_point dctime=std::chrono::steady_clock::now(); static double dSize=0.; using namespace std; CbmDeviceTriggerHandlerEtof::CbmDeviceTriggerHandlerEtof() : fNumMessages(0) , fiMsgCnt(0) , fbMonitorMode( kFALSE ) , fbDebugMonitorMode( kFALSE ) , fbSandboxMode( kFALSE ) , fbEventDumpEna( kFALSE ) , fdEvent(0.) { } CbmDeviceTriggerHandlerEtof::~CbmDeviceTriggerHandlerEtof() { } void CbmDeviceTriggerHandlerEtof::InitTask() try { // Get the information about created channels from the device // Check if the defined channels from the topology (by name) // are in the list of channels which are possible/allowed // for the device // The idea is to check at initilization if the devices are // properly connected. For the time beeing this is done with a // nameing convention. It is not avoided that someone sends other // data on this channel. int noChannel = fChannels.size(); LOG(info) << "Number of defined input channels: " << noChannel; for(auto const &entry : fChannels) { LOG(info) << "Channel name: " << entry.first; if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match."); if(entry.first!="syscmd") OnData(entry.first, &CbmDeviceTriggerHandlerEtof::HandleData); else OnData(entry.first, &CbmDeviceTriggerHandlerEtof::HandleMessage); } InitWorkspace(); } catch (InitTaskError& e) { LOG(error) << e.what(); cbm::mq::ChangeState(this,cbm::mq::Transition::ErrorFound); } bool CbmDeviceTriggerHandlerEtof::IsChannelNameAllowed(std::string channelName) { for(auto const &entry : fAllowedChannels) { std::size_t pos1 = channelName.find(entry); if (pos1!=std::string::npos) { const vector::const_iterator pos = std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry); const vector::size_type idx = pos-fAllowedChannels.begin(); LOG(info) << "Found " << entry << " in " << channelName; LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx; return true; } } LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names."; LOG(error) << "Stop device."; return false; } Bool_t CbmDeviceTriggerHandlerEtof::InitWorkspace() { LOG(info) << "Init work space for CbmDeviceTriggerHandlerEtof."; // steering variables fbSandboxMode = fConfig->GetValue("SandboxMode"); return kTRUE; } // handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0) //bool CbmDeviceTriggerHandlerEtof::HandleData(FairMQMessagePtr& msg, int /*index*/) bool CbmDeviceTriggerHandlerEtof::HandleData(FairMQParts& parts, int /*index*/) { // Don't do anything with the data // Maybe add an message counter which counts the incomming messages and add // an output fNumMessages++; LOG(debug) << "Received message "<< fNumMessages << " with " << parts.Size() << " parts" << ", size0: " << parts.At(0)->GetSize(); uint TrigWord{0}; std::string msgStrE(static_cast(parts.At(0)->GetData()), (parts.At(0))->GetSize()); std::istringstream issE(msgStrE); boost::archive::binary_iarchive inputArchiveE(issE); inputArchiveE>>TrigWord; char* pDataBuff = static_cast < char* >(parts.At(1)->GetData()); int iBuffSzByte = parts.At(1)->GetSize(); // Send Subevent to STAR LOG(debug) << "Send Data for event "< deltatime = std::chrono::steady_clock::now() - dctime; LOG(info) << "Processed " << fdEvent << " events, delta-time: "<< deltatime.count() << ", rate: " << dSize*1.E-6/deltatime.count() << "MB/s"; dctime=std::chrono::steady_clock::now(); dSize=0.; } fdEvent++; return kTRUE; } /************************************************************************************/ bool CbmDeviceTriggerHandlerEtof::HandleMessage(FairMQMessagePtr& msg, int /*index*/) { const char *cmd = (char *)(msg->GetData()); const char cmda[4]={*cmd}; LOG(info) << "Handle message " << cmd <<", " << cmd[0]; // only one implemented so far "Stop" if( strcmp(cmda,"STOP") ) { cbm::mq::ChangeState(this, cbm::mq::Transition::Ready); cbm::mq::LogState(this); cbm::mq::ChangeState(this, cbm::mq::Transition::DeviceReady); cbm::mq::LogState(this); cbm::mq::ChangeState(this, cbm::mq::Transition::Idle); cbm::mq::LogState(this); cbm::mq::ChangeState(this, cbm::mq::Transition::End); cbm::mq::LogState(this); // ChangeState(fair::mq::Transition(STOP)); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } return true; }