/** * CbmDeviceStsLocalReco.cxx * * @since 2019-03-26 * @author F. Uhlig */ #include "CbmDeviceStsLocalReco.h" #include "CbmMQDefs.h" #include "CbmStsDigitizeParameters.h" #include "CbmFieldPar.h" #include "CbmFieldConst.h" #include "CbmFieldMap.h" #include "CbmFieldMapSym2.h" #include "CbmFieldMapSym3.h" #include "CbmFieldMapDistorted.h" #include "CbmFieldMapSym1.h" #include "CbmBsField.h" #include "FairMQLogger.h" #include "FairMQProgOptions.h" // device->fConfig #include "FairGeoParSet.h" #include "FairRunAna.h" #include "FairField.h" //#include "FairParGenericSet.h" //#include "RootSerializer.h" #include "TGeoManager.h" #include "TSystem.h" /* #include "TNamed.h" #include "TList.h" #include "TCanvas.h" #include "TFile.h" #include "TH1.h" */ #include #include #include #include #include struct InitTaskError : std::runtime_error { using std::runtime_error::runtime_error; }; //using namespace std; using std::string; CbmDeviceStsLocalReco::CbmDeviceStsLocalReco() : fMaxTimeslices{0} , fNumMessages{0} , fRunId{"0"} , fvmcworkdir{""} , fDigiPar{nullptr} , fGeoPar{nullptr} , fFieldPar{nullptr} // , fParCList{nullptr} { } CbmDeviceStsLocalReco::~CbmDeviceStsLocalReco() { if (gGeoManager) { gGeoManager->GetListOfVolumes()->Delete(); gGeoManager->GetListOfShapes()->Delete(); } } void CbmDeviceStsLocalReco::InitTask() try { fMaxTimeslices = fConfig->GetValue("max-timeslices"); // Get the information about created channels from the device // Check if the defined channels from the topology (by name) // are in the list of channels which are possible/allowed // for the device // The idea is to check at initilization if the devices are // properly connected. For the time beeing this is done with a // nameing convention. It is not avoided that someone sends other // data on this channel. //logger::SetLogLevel("INFO"); int noChannel = fChannels.size(); LOG(INFO) << "Number of defined channels: " << noChannel; for(auto const &entry : fChannels) { LOG(INFO) << "Channel name: " << entry.first; if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match."); OnData(entry.first, &CbmDeviceStsLocalReco::HandleData); } InitContainers(); } catch (InitTaskError& e) { LOG(ERROR) << e.what(); // Wrapper defined in CbmMQDefs.h to support different FairMQ versions cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound); } bool CbmDeviceStsLocalReco::IsChannelNameAllowed(std::string channelName) { for(auto const &entry : fAllowedChannels) { std::size_t pos1 = channelName.find(entry); if (pos1!=std::string::npos) { const std::vector::const_iterator pos = std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry); const std::vector::size_type idx = pos-fAllowedChannels.begin(); LOG(INFO) << "Found " << entry << " in " << channelName; LOG(INFO) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx; return true; } } LOG(INFO) << "Channel name " << channelName << " not found in list of allowed channel names."; LOG(ERROR) << "Stop device."; return false; } Bool_t CbmDeviceStsLocalReco::InitContainers() { Bool_t initOK{kTRUE}; fRunId = fConfig->GetValue("run-id"); fvmcworkdir = fConfig->GetValue("vmcworkdir"); fMaxTimeslices = fConfig->GetValue("max-timeslices"); LOG(INFO) << "Init parameter containers for CbmDeviceStsLocalReco."; // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place). // Should only be used for small data because of the cost of an additional copy std::string message{"CbmStsDigitizeParameters,"}; message += fRunId; LOG(INFO) << "Requesting parameter container CbmStsDigitizeParameters, sending message: " << message; FairMQMessagePtr req(NewSimpleMessage(message)); FairMQMessagePtr rep(NewMessage()); if (Send(req, "parameters") > 0) { if (Receive(rep, "parameters") >= 0) { if (rep->GetSize() != 0) { CbmMQTMessage tmsg(rep->GetData(), rep->GetSize()); fDigiPar = dynamic_cast(tmsg.ReadObject(tmsg.GetClass())); LOG(INFO) << "Received unpack parameter from parmq server: " << fDigiPar; // TODO: check if fDigiPar is properly initialized from the file fDigiPar->Print(); LOG(info) << fDigiPar->ToString(); } else { throw InitTaskError("Received empty reply. Parameter not available"); } } } std::string message1{"FairGeoParSet,"}; message1 += fRunId; LOG(INFO) << "Requesting parameter container FairGeoParSet, sending message: " << message1; FairMQMessagePtr req1(NewSimpleMessage(message1)); FairMQMessagePtr rep1(NewMessage()); if (Send(req1, "parameters") > 0) { if (Receive(rep1, "parameters") >= 0) { if (rep1->GetSize() != 0) { CbmMQTMessage tmsg(rep1->GetData(), rep1->GetSize()); fGeoPar = static_cast(tmsg.ReadObject(tmsg.GetClass())); LOG(INFO) << "Received unpack parameter from parmq server: " << fGeoPar; fGeoPar->Print(); if (!gGeoManager) { throw InitTaskError("No gGeoManager found in FairGeoParSet"); } else { gGeoManager->Print(); } } else { throw InitTaskError("Received empty reply. Parameter not available"); } } } std::string message2{"CbmFieldPar,"}; message2 += fRunId; LOG(INFO) << "Requesting parameter container CbmFieldPar, sending message: " << message2; FairMQMessagePtr req2(NewSimpleMessage(message2)); FairMQMessagePtr rep2(NewMessage()); if (Send(req2, "parameters") > 0) { if (Receive(rep2, "parameters") >= 0) { if (rep2->GetSize() != 0) { CbmMQTMessage tmsg(rep2->GetData(), rep2->GetSize()); fFieldPar = static_cast(tmsg.ReadObject(tmsg.GetClass())); LOG(info) << "Received unpack parameter from parmq server: " << fGeoPar; if ( -1 == fFieldPar->GetType() ) { throw InitTaskError("No field parameters available!"); }else{ fFieldPar->Print(); LOG(info) << "Before creating the field"; FairField* field = createField(); LOG(info) << "After creating the field"; FairRunAna* run = new FairRunAna(); run->SetField(field); } } else { LOG(ERROR) << "Received empty reply. Parameter not available"; } } } return initOK; return true; } FairField* CbmDeviceStsLocalReco::createField() { FairField* fMagneticField{nullptr}; // Instantiate correct field type Int_t fType = fFieldPar->GetType(); gSystem->Setenv("VMCWORKDIR", fvmcworkdir.c_str()); if ( fType == 0 ) fMagneticField = new CbmFieldConst(fFieldPar); else if ( fType == 1 ) fMagneticField = new CbmFieldMap(fFieldPar); else if ( fType == 2 ) fMagneticField = new CbmFieldMapSym2(fFieldPar); else if ( fType == 3 ) fMagneticField = new CbmFieldMapSym3(fFieldPar); else if ( fType == 4 ) fMagneticField = new CbmFieldMapDistorted(fFieldPar); else if ( fType == 5 ) fMagneticField = new CbmFieldMapSym1(fFieldPar); else if ( fType == 6 ) fMagneticField = new CbmBsField(fFieldPar); else { std::stringstream ss; ss << "Unknown field type " << fType; throw InitTaskError(ss.str()); } LOG(info) << "New field at " << fMagneticField << ", type " << fType; // Initialise field if ( fMagneticField ) { fMagneticField->Init(); fMagneticField->Print(""); } LOG(info) << "Before return"; return fMagneticField; } // handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0) bool CbmDeviceStsLocalReco::HandleData(FairMQMessagePtr& msg, int /*index*/) { // Don't do anything with the data // Maybe add an message counter which counts the incomming messages and add // an output fNumMessages++; LOG(DEBUG) << "Received message number "<< fNumMessages << " with size " << msg->GetSize(); std::string msgStr(static_cast(msg->GetData()), msg->GetSize()); std::istringstream iss(msgStr); boost::archive::binary_iarchive inputArchive(iss); DoWork(); if(fNumMessages%10000 == 0) LOG(INFO)<<"Processed "<