// $Id$ /************************************************************ * The Data Acquisition Backbone Core (DABC) * ************************************************************ * Copyright (C) 2009 - * * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Planckstr. 1, 64291 Darmstadt, Germany * * Contact: http://dabc.gsi.de * ************************************************************ * This software can be used under the GPL license * * agreements as stated in LICENSE.txt file * * which is part of the distribution. * ************************************************************/ #include "hadaq/HldOutput.h" #include #include #include #include #if defined(__MACH__) /* Apple OSX section */ #include #else #include #endif #include "dabc/Manager.h" #include "hadaq/Iterator.h" hadaq::HldOutput::HldOutput(const dabc::Url& url) : dabc::FileOutput(url,".hld"), fRunSlave(false), fLastRunNumber(0), fRunNumber(0), fEventNumber(0), fEBNumber(0), fUseDaqDisk(false), fRfio(false), fLtsm(false), fPlainName(false), fUrlOptions(), fLastPrefix(), fFile() { fRunSlave = url.HasOption("slave"); fEBNumber = url.GetOptionInt("ebnumber",0); // default is single eventbuilder fRunNumber = url.GetOptionInt("runid", 0); // if specified, use runid from url fUseDaqDisk = url.GetOptionInt("diskdemon", 0); // if specified, use number of /data partition from daq_disk demon fRfio = url.HasOption("rfio"); fLtsm = url.HasOption("ltsm"); fPlainName = url.HasOption("plain") && (GetSizeLimitMB() <= 0); if (fRfio) { dabc::FileInterface* io = (dabc::FileInterface*) dabc::mgr.CreateAny("rfio::FileInterface"); if (io) { fUrlOptions = url.GetOptions(); fFile.SetIO(io, true); // set default protocol and node name, can only be used in GSI //if (fFileName.find("rfiodaq:gstore:")== std::string::npos) fFileName = std::string("rfiodaq:gstore:") + fFileName; } else { EOUT("Cannot create RFIO object, check if libDabcRfio.so loaded"); } } else if(fLtsm) { auto io = (dabc::FileInterface *) dabc::mgr.CreateAny("ltsm::FileInterface"); if (io) { fUrlOptions = url.GetOptions(); fFile.SetIO(io, true); } else { EOUT("Cannot create LTSM object, check if libDabcLtsm.so loaded"); } } } hadaq::HldOutput::~HldOutput() { DOUT3(" hadaq::HldOutput::DTOR"); CloseFile(); } bool hadaq::HldOutput::Write_Init(const dabc::WorkerRef &wrk, const dabc::Command &cmd) { if (!dabc::FileOutput::Write_Init(wrk, cmd)) return false; if (fRunSlave) { // use parameters only in slave mode fRunNumber = 0; ShowInfo(0, dabc::format("%s slave mode is enabled, waiting for runid", (fRunSlave ? "RUN" : "EPICS"))); return true; } return StartNewFile(); } bool hadaq::HldOutput::StartNewFile() { CloseFile(); if (fRunNumber == 0) { if (fRunSlave) { EOUT("Cannot start new file without valid RUNID"); return false; } fRunNumber = dabc::CreateHadaqRunId(); //std::cout <<"HldOutput Generates New Runid"<GetRunNr(); if (nextrunid == fRunNumber) { numevents++; payload += bufiter.evnt()->GetPaddedSize();// remember current position in buffer: continue; } DOUT1("HldOutput Finds New Runid %d or 0x%x from EPICS in event header (previous: %d or 0x%x)", nextrunid, nextrunid, fRunNumber, fRunNumber); fRunNumber = nextrunid; startnewfile = true; break; } // while bufiter // if current runid is still 0, just ignore buffer if (!startnewfile && (fRunNumber == 0)) return dabc::do_Ok; if(startnewfile) { // first flush rest of previous run to old file: cursor = payload; // only if file opened for writing, write rest buffers if (fFile.isWriting()) for (unsigned n = 0; n < buf.NumSegments(); n++) { if (payload == 0) break; unsigned write_size = buf.SegmentSize(n); if (write_size > payload) write_size = payload; if (fRfio) DOUT1("HldOutput write %u bytes from buffer with old runid", write_size); if (!fFile.WriteBuffer(buf.SegmentPtr(n), write_size)) return dabc::do_Error; DOUT1("HldOutput did flushes %d bytes (%d events) of old runid in buffer segment %d to file", write_size, numevents, n); payload -= write_size; } // for } } else { if (CheckBufferForNextFile(buf.GetTotalSize())) { fRunNumber = 0; startnewfile = true; } } // epicsslave if(startnewfile) { if (fRunSlave && (fRunNumber == 0)) { // in slave mode 0 runnumber means do nothing CloseFile(); DOUT0("CLOSE FILE WRITING in slave mode"); return dabc::do_Ok; } if ((fLastRunNumber != 0) && (fLastRunNumber == fRunNumber)) { DOUT0("Saw same runid %d 0x%u as previous - skip buffer", fLastRunNumber, fLastRunNumber); return dabc::do_Ok; } if (!StartNewFile()) { EOUT("Cannot start new file for writing"); return dabc::do_Error; } ShowInfo(0, dabc::format("%s open for writing runid %d", CurrentFileName().c_str(), fRunNumber)); } if (!fFile.isWriting()) return dabc::do_Error; unsigned total_write_size = 0, num_events = 0; if (is_subev) { // this is list of subevents in the buffer, one need to add artificial events headers for each subevents hadaq::RawEvent evnt; hadaq::ReadIterator iter(buf); while (iter.NextSubeventsBlock()) { if (!iter.NextSubEvent()) return dabc::do_Error; char* write_ptr = (char*) iter.subevnt(); unsigned write_size = iter.subevnt()->GetPaddedSize(); evnt.Init(fEventNumber++, fRunNumber); evnt.SetSize(write_size + sizeof(hadaq::RawEvent)); if (!fFile.WriteBuffer(&evnt, sizeof(hadaq::RawEvent))) return dabc::do_Error; if (!fFile.WriteBuffer(write_ptr, write_size)) return dabc::do_Error; total_write_size += sizeof(hadaq::RawEvent) + write_size; num_events ++; } } else if (is_events) { for (unsigned n=0;n= write_size) { // skip segment completely cursor -= write_size; continue; } char* write_ptr = (char*) buf.SegmentPtr(n); if(startnewfile) DOUT2("Wrote to %s at segment %d, cursor %d, size %d", CurrentFileName().c_str(), n, cursor, write_size-cursor); if (cursor > 0) { write_ptr += cursor; write_size -= cursor; cursor = 0; } if (fRunSlave && fRfio && startnewfile) DOUT1("HldOutput write %u bytes after new file was started", write_size); if (!fFile.WriteBuffer(write_ptr, write_size)) return dabc::do_Error; if (fRunSlave && fRfio && startnewfile) DOUT1("HldOutput did write %u bytes after new file was started", write_size); total_write_size += write_size; } num_events = hadaq::ReadIterator::NumEvents(buf); } // TODO: in case of partial written buffer, account sizes to correct file AccountBuffer(total_write_size, num_events); if (fRunSlave && fRfio && startnewfile) DOUT1("HldOutput write complete first buffer after new file was started"); return dabc::do_Ok; }