// $Id$ /************************************************************ * The Data Acquisition Backbone Core (DABC) * ************************************************************ * Copyright (C) 2009 - * * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Planckstr. 1, 64291 Darmstadt, Germany * * Contact: http://dabc.gsi.de * ************************************************************ * This software can be used under the GPL license * * agreements as stated in LICENSE.txt file * * which is part of the distribution. * ************************************************************/ #include "dabc/HierarchyStore.h" #include dabc::HierarchyStore::HierarchyStore() : fBasePath(), fIO(nullptr), fFile(), fLastStoreTm(), fLastFlushTm(), fDoStore(false), fDoFlush(false), fLastVersion(0), fStoreBuf(), fFlushBuf() { } dabc::HierarchyStore::~HierarchyStore() { CloseFile(); if (fIO) { delete fIO; fIO = nullptr; } } bool dabc::HierarchyStore::SetBasePath(const std::string &path) { if (fFile.isOpened()) { EOUT("Cannot change base path when current file is opened"); return false; } fBasePath = path; return true; } bool dabc::HierarchyStore::StartFile(dabc::Buffer buf) { if (!CloseFile()) return false; std::string path = fBasePath; if ((path.length()>0) && (path[path.length()-1]!='/')) path.append("/"); dabc::DateTime tm; tm.GetNow(); std::string strdate = tm.OnlyDateAsString(); std::string strtime = tm.OnlyTimeAsString(); path.append(strdate); if (!fIO) fIO = new FileInterface; if (!fIO->mkdir(path.c_str())) { EOUT("Cannot create path %s for hierarchy storage", path.c_str()); return false; } DOUT0("HierarchyStore:: CREATE %s", path.c_str()); path.append("/"); path.append(strtime); path.append(".dabc"); fFile.SetIO(fIO); if (!fFile.OpenWriting(path.c_str())) { EOUT("Cannot open file %s for hierarchy storage", path.c_str()); return false; } return WriteDiff(buf); } bool dabc::HierarchyStore::WriteDiff(dabc::Buffer buf) { if (!fFile.isWriting()) return false; BufferSize_t fullsz = buf.GetTotalSize(); DOUT0("HierarchyStore:: WRITE %u", (unsigned) buf.GetTotalSize()); if (!fFile.WriteBufHeader(fullsz, buf.GetTypeId())) { EOUT("Cannot write buffer header"); return false; } for (unsigned n=0;n store_period) { fLastStoreTm = now; fDoStore = true; if (now - fLastFlushTm >= flush_period) { fDoFlush = true; fLastFlushTm = now; } } return fDoStore || fDoFlush; } bool dabc::HierarchyStore::ExtractData(dabc::Hierarchy& h) { if (fDoStore) { // we record diff to previous version, including all history entries fStoreBuf = h.SaveToBuffer(dabc::stream_Full, fLastVersion, 1000000000); DOUT0("HierarchyStore:: EXTRACT prev %u now %u chlds %u size %u", (unsigned) fLastVersion, (unsigned) h.GetVersion(), (unsigned) h()->GetChildsVersion(), (unsigned) fStoreBuf.GetTotalSize()); fLastVersion = h.GetVersion(); } if (fDoFlush) { // we record complete hierarchy without any history entry fFlushBuf = h.SaveToBuffer(dabc::stream_Full, 0, 0); fLastVersion = h.GetVersion(); } if (fDoFlush || fDoStore) { // this time will be used as raster when reading files // it can be read very efficient - most decoding can be skipped h.SetField("storetm", fLastStoreTm); h.MarkChangedItems(fLastStoreTm.AsJSDate()); } return true; } bool dabc::HierarchyStore::WriteExtractedData() { if (fDoStore) { if (!fStoreBuf.null()) WriteDiff(fStoreBuf); fStoreBuf.Release(); fDoStore = false; } if (fDoFlush) { if (!fFlushBuf.null()) StartFile(fFlushBuf); fFlushBuf.Release(); fDoFlush = false; } return true; } // ================================================================================= dabc::HierarchyReading::HierarchyReading() : fBasePath(), fIO(nullptr) { } dabc::HierarchyReading::~HierarchyReading() { if (fIO) { delete fIO; fIO = nullptr; } } void dabc::HierarchyReading::SetBasePath(const std::string &path) { fBasePath = path; if ((fBasePath.length()>0) && (fBasePath[fBasePath.length()-1] != '/')) fBasePath.append("/"); } std::string dabc::HierarchyReading::MakeFileName(const std::string &fpath, const DateTime& dt) { std::string res = fpath; if ((res.length()>0) && (res[res.length()-1] != '/')) res.append("/"); std::string strdate = dt.OnlyDateAsString(); std::string strtime = dt.OnlyTimeAsString(); res += dabc::format("%s/%s.dabc", strdate.c_str(), strtime.c_str()); return res; } bool dabc::HierarchyReading::ScanFiles(const std::string &dirname, const DateTime& onlydate, std::vector& vect) { std::string mask = dirname + "*.dabc"; Reference files = fIO->fmatch(mask.c_str(), true); for (unsigned n=0;nfmatch(mask.c_str(), false); bool isanysubdir = false, isanydatedir = false; std::vector files; for (unsigned n=0;n0)) { std::sort(files.begin(), files.end()); dabc::DateTime mindt(files.front()), maxdt(files.back()); DOUT0("DIR: %s mintm: %s maxtm: %s files %lu", dirname.c_str(), mindt.AsJSString().c_str(), maxdt.AsJSString().c_str(), (long unsigned) files.size()); h.SetField("dabc:path", dirname); h.SetField("dabc:mindt", mindt); h.SetField("dabc:maxdt" ,maxdt); h.SetField("dabc:files", files); } return true; } bool dabc::HierarchyReading::ScanTree() { fTree.Release(); fTree.Create("TOP"); if (!fIO) fIO = new FileInterface; return ScanTreeDir(fTree, fBasePath); } dabc::Buffer dabc::HierarchyReading::ReadBuffer(dabc::BinaryFile& f) { dabc::Buffer buf; if (f.eof()) return buf; uint64_t size = 0, typ = 0; if (!f.ReadBufHeader(&size, &typ)) return buf; buf = dabc::Buffer::CreateBuffer(size); if (buf.null()) return buf; if (!f.ReadBufPayload(buf.SegmentPtr(), size)) { buf.Release(); return buf; } buf.SetTotalSize(size); buf.SetTypeId(typ); return buf; } bool dabc::HierarchyReading::ProduceStructure(Hierarchy& tree, const DateTime& from_date, const DateTime& till_date, const std::string &entry, Hierarchy& tgt) { if (tree.null()) return false; DOUT0("Produce structure for item %s", tree.ItemName().c_str()); if (!tree.HasField("dabc:path")) { for (unsigned n=0;n