#include "MbsBuilderModule.h" #include "dabc/logging.h" #include "dabc/PoolHandle.h" #include "dabc/Command.h" #include "dabc/Port.h" #include "dabc/Parameter.h" #include "bnet/common.h" #include "bnet/WorkerPlugin.h" bnet::MbsBuilderModule::MbsBuilderModule(dabc::Manager* m, const char* name, WorkerPlugin* plugin) : BuilderModule(m, name, plugin), fCfgEventsCombine(plugin->CfgEventsCombine()), fEvntRate(0) { fOut.buf = 0; fOut.ready = false; fEvntRate = CreateRateParameter("EventRate", false, 1.); fEvntRate->SetUnits("Ev/s"); fEvntRate->SetLimits(0, 6000.); } bnet::MbsBuilderModule::~MbsBuilderModule() { dabc::Buffer::Release(fOut.buf); fOut.buf = 0; fOut.ready = false; } void bnet::MbsBuilderModule::StartOutputBuffer(dabc::BufferSize_t bufsize) { if ((fOut.buf==0) && !fOut.ready) { // increase buffer size on the header information bufsize += sizeof(mbs::sMbsBufferHeader); if (bufsizeAccountValue(fOut.bufhdr->iNumEvents); fOut.ready = true; } void bnet::MbsBuilderModule::SendOutputBuffer() { dabc::Buffer* buf = fOut.buf; fOut.ready = false; fOut.buf = 0; Send(Output(0), buf); } typedef struct BufRec { dabc::Pointer evptr; mbs::eMbs101EventHeader *evhdr; mbs::eMbs101EventHeader tmp; }; void bnet::MbsBuilderModule::DoBuildEvent(std::vector& bufs) { // DOUT1(("start DoBuildEvent n = %d", fCfgEventsCombine)); std::vector recs; recs.resize(bufs.size()); for (unsigned n=0;nGetTotalSize())); return; } } int nevent = 0; while (neventiCount < recs[pmin].evhdr->iCount) pmin = n; else if (recs[n].evhdr->iCount > recs[pmax].evhdr->iCount) pmax = n; if (n==0) fulleventlen += recs[n].evhdr->DataSize(); else fulleventlen += recs[n].evhdr->SubeventsDataSize(); } if (recs[pmin].evhdr->iCount < recs[pmax].evhdr->iCount) { EOUT(("Skip subevent %u from buffer %u", recs[pmin].evhdr->iCount, pmin)); if (!NextEvent(recs[pmin].evptr, recs[pmin].evhdr, &(recs[pmin].tmp))) return; // try to analyse events now continue; } if (fulleventlen==0) { EOUT(("Something wrong with data")); return; } // if rest of existing buffer less than new event data, close it if (!fOut.ready && fOut.buf && (fOut.evptr.fullsize() < fulleventlen)) FinishOutputBuffer(); // send buffer, if it is filled and ready to send if (fOut.ready) SendOutputBuffer(); // start new buffer if required if ((fOut.buf==0) && !fOut.ready) { StartOutputBuffer(fulleventlen); if (fOut.evptr.fullsize() < fulleventlen) { EOUT(("Single event do not pass in to the buffers")); exit(1); } } dabc::Pointer outdataptr; mbs::eMbs101EventHeader *evhdr(0), tmp_evhdr; if (!mbs::StartEvent(fOut.evptr, outdataptr, evhdr, &tmp_evhdr)) { EOUT(("Problem to start event in buffer")); return; } unsigned numstopacq = 0; for (unsigned n=0;niTrigger == mbs::tt_StopAcq) numstopacq++; if (n==0) evhdr->CopyFrom(*(recs[n].evhdr)); dabc::Pointer subevdata(recs[n].evptr); subevdata += sizeof(mbs::eMbs101EventHeader); // copy all subevents without header outdataptr.copyfrom(subevdata, recs[n].evhdr->SubeventsDataSize()); outdataptr += recs[n].evhdr->SubeventsDataSize(); } mbs::FinishEvent(fOut.evptr, outdataptr, evhdr, fOut.bufhdr); // if (fEvntRate) fEvntRate->AccountValue(1.); // DOUT1(("$$$$$$$$ Did event %d size %d", evhdr->iCount, evhdr->DataSize())); nevent++; unsigned numfinished = 0; // shift all pointers to next subevent if (nevent0) { if (numstopacq0) { if (numfinished