// $Id$ /************************************************************ * The Data Acquisition Backbone Core (DABC) * ************************************************************ * Copyright (C) 2009 - * * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Planckstr. 1, 64291 Darmstadt, Germany * * Contact: http://dabc.gsi.de * ************************************************************ * This software can be used under the GPL license * * agreements as stated in LICENSE.txt file * * which is part of the distribution. * ************************************************************/ #include "dabc/MemoryPool.h" #include #include "dabc/defines.h" namespace dabc { enum { evntProcessRequests = evntModuleLast }; class MemoryBlock { public: struct Entry { void *buf; ///< pointer on raw memory BufferSize_t size; ///< size of the block bool owner; ///< is memory should be released int refcnt; ///< usage counter - number of references on the memory }; typedef Queue FreeQueue; Entry* fArr{nullptr}; ///< array of buffers unsigned fNumber{0}; ///< number of buffers FreeQueue fFree; ///< list of free buffers MemoryBlock() : fArr(nullptr), fNumber(0), fFree() { } virtual ~MemoryBlock() { Release(); } inline bool IsAnyFree() const { return !fFree.Empty(); } void Release() { if (!fArr) return; for (unsigned n=0;n& bufs, const std::vector& sizes) throw() { Release(); fArr = new Entry[bufs.size()]; fNumber = bufs.size(); fFree.Allocate(bufs.size()); for (unsigned n=0;nAllocate(number, bufsize, fAlignment); fChangeCounter++; return true; } bool dabc::MemoryPool::Allocate(BufferSize_t bufsize, unsigned number) throw() { LockGuard lock(ObjectMutex()); return _Allocate(bufsize, number); } bool dabc::MemoryPool::Assign(bool isowner, const std::vector& bufs, const std::vector& sizes) throw() { LockGuard lock(ObjectMutex()); if (fMem) return false; if ((bufs.size() != sizes.size()) || (bufs.size() == 0)) return false; fMem = new MemoryBlock; fMem->Assign(isowner, bufs, sizes); return true; } bool dabc::MemoryPool::Release() throw() { LockGuard lock(ObjectMutex()); if (fMem) { delete fMem; fMem = nullptr; fChangeCounter++; } return true; } bool dabc::MemoryPool::IsEmpty() const { LockGuard lock(ObjectMutex()); return !fMem; } unsigned dabc::MemoryPool::GetNumBuffers() const { LockGuard lock(ObjectMutex()); return !fMem ? 0 : fMem->fNumber; } unsigned dabc::MemoryPool::GetBufferSize(unsigned id) const { LockGuard lock(ObjectMutex()); return !fMem || (id >= fMem->fNumber) ? 0 : fMem->fArr[id].size; } unsigned dabc::MemoryPool::GetMaxBufSize() const { LockGuard lock(ObjectMutex()); if (!fMem) return 0; unsigned max = 0; for (unsigned id=0;idfNumber;id++) if (fMem->fArr[id].size > max) max = fMem->fArr[id].size; return max; } unsigned dabc::MemoryPool::GetMinBufSize() const { LockGuard lock(ObjectMutex()); if (!fMem) return 0; unsigned min = 0; for (unsigned id=0;idfNumber;id++) if ((min == 0) || (fMem->fArr[id].size < min)) min = fMem->fArr[id].size; return min; } void* dabc::MemoryPool::GetBufferLocation(unsigned id) const { LockGuard lock(ObjectMutex()); return !fMem || (id>=fMem->fNumber) ? nullptr : fMem->fArr[id].buf; } bool dabc::MemoryPool::_GetPoolInfo(std::vector& bufs, std::vector& sizes, unsigned* changecnt) { if (changecnt) { if (*changecnt == fChangeCounter) return false; } if (fMem) for(unsigned n=0;nfNumber;n++) { bufs.emplace_back(fMem->fArr[n].buf); sizes.emplace_back(fMem->fArr[n].size); } if (changecnt) *changecnt = fChangeCounter; return true; } bool dabc::MemoryPool::GetPoolInfo(std::vector& bufs, std::vector& sizes) { LockGuard lock(ObjectMutex()); return _GetPoolInfo(bufs, sizes); } bool dabc::MemoryPool::TakeRawBuffer(unsigned& indx) { LockGuard lock(ObjectMutex()); if (!fMem || !fMem->IsAnyFree()) return false; indx = fMem->fFree.Pop(); return true; } void dabc::MemoryPool::ReleaseRawBuffer(unsigned indx) { LockGuard lock(ObjectMutex()); if (fMem) fMem->fFree.Push(indx); } dabc::Buffer dabc::MemoryPool::_TakeBuffer(BufferSize_t size, bool except, bool reserve_memory) { Buffer res; // DOUT0("_TakeBuffer obj %p", &res); // if no memory is available, try to allocate it if (!fMem) _Allocate(); if (!fMem) { if (except) throw dabc::Exception(ex_Pool, "No memory allocated in the pool", ItemName()); return res; } if (!fMem->IsAnyFree() && reserve_memory) { if (except) throw dabc::Exception(ex_Pool, "No any memory is available in the pool", ItemName()); return res; } if ((size == 0) && reserve_memory) size = fMem->fArr[fMem->fFree.Front()].size; // first check if required size is available BufferSize_t sum = 0; unsigned cnt = 0; while (sum < size) { if (cnt>=fMem->fFree.Size()) { if (except) throw dabc::Exception(ex_Pool, "Cannot reserve buffer of requested size", ItemName()); return res; } unsigned id = fMem->fFree.Item(cnt); sum += fMem->fArr[id].size; cnt++; } res.AllocateContainer(cnt < 8 ? 8 : cnt); sum = 0; cnt = 0; MemSegment* segs = res.Segments(); while (sumfFree.Pop(); if (fMem->fArr[id].refcnt != 0) throw dabc::Exception(ex_Pool, "Buffer is not free even is declared so", ItemName()); if (cnt>=res.GetObject()->fCapacity) throw dabc::Exception(ex_Pool, "All mem segments does not fit into preallocated list", ItemName()); segs[cnt].buffer = fMem->fArr[id].buf; segs[cnt].datasize = fMem->fArr[id].size; segs[cnt].id = id; // Provide buffer of exactly requested size BufferSize_t restsize = size - sum; if (restsize < segs[cnt].datasize) segs[cnt].datasize = restsize; sum += fMem->fArr[id].size; // increment reference counter on the memory space fMem->fArr[id].refcnt++; cnt++; } res.GetObject()->fPool.SetObject(this, false); res.GetObject()->fNumSegments = cnt; res.SetTypeId(mbt_Generic); return res; } dabc::Buffer dabc::MemoryPool::TakeBuffer(BufferSize_t size) throw() { dabc::Buffer res; { LockGuard lock(ObjectMutex()); res = _TakeBuffer(size, true); } return res; } void dabc::MemoryPool::IncreaseSegmRefs(MemSegment* segm, unsigned num) { LockGuard lock(ObjectMutex()); if (!fMem) throw dabc::Exception(ex_Pool, "Memory was not allocated in the pool", ItemName()); for (unsigned cnt=0;cntfMem->fNumber) throw dabc::Exception(ex_Pool, "Wrong buffer id in the segments list of buffer", ItemName()); if (fMem->fArr[id].refcnt + 1 == 0) throw dabc::Exception(ex_Pool, "To many references on single segments - how it can be", ItemName()); fMem->fArr[id].refcnt++; } } bool dabc::MemoryPool::IsSingleSegmRefs(MemSegment* segm, unsigned num) { LockGuard lock(ObjectMutex()); if (!fMem) throw dabc::Exception(ex_Pool, "Memory was not allocated in the pool", ItemName()); for (unsigned cnt=0;cntfMem->fNumber) throw dabc::Exception(ex_Pool, "Wrong buffer id in the segments list of buffer", ItemName()); if (fMem->fArr[id].refcnt != 1) return false; } return true; } void dabc::MemoryPool::DecreaseSegmRefs(MemSegment* segm, unsigned num) { LockGuard lock(ObjectMutex()); if (!fMem) throw dabc::Exception(ex_Pool, "Memory was not allocated in the pool", ItemName()); for (unsigned cnt=0;cnt fMem->fNumber) throw dabc::Exception(ex_Pool, "Wrong buffer id in the segments list of buffer", ItemName()); if (fMem->fArr[id].refcnt == 0) throw dabc::Exception(ex_Pool, "Reference counter of specified segment is already 0", ItemName()); if (--(fMem->fArr[id].refcnt) == 0) fMem->fFree.Push(id); } } bool dabc::MemoryPool::ProcessSend(unsigned port) { if (!fReqests[port].pending) { fPending.Push(port); fReqests[port].pending = true; } RecheckRequests(true); return true; // always } void dabc::MemoryPool::ProcessEvent(const EventId& ev) { if (ev.GetCode() == evntProcessRequests) { { LockGuard guard(ObjectMutex()); fEvntFired = false; } RecheckRequests(); return; } dabc::ModuleAsync::ProcessEvent(ev); } bool dabc::MemoryPool::RecheckRequests(bool from_recv) { // method called when one need to check if any requester need buffer and // was blocked by lack of free memory if (fProcessingReq) { EOUT("Event processing mismatch in the POOL - called for the second time"); return false; } int cnt = 100; fProcessingReq = true; while (!fPending.Empty() && (cnt-->0)) { // no any pending requests, nothing to do unsigned portid = fPending.Front(); if (portid>=fReqests.size()) { EOUT("Old requests is not yet removed!!!"); fPending.Pop(); continue; } if (!IsOutputConnected(portid)) { // if port was disconnected, just skip all pending requests fReqests[portid].pending = false; fPending.Pop(); continue; } if (!fReqests[portid].pending) { EOUT("Request %u was not pending", portid); fReqests[portid].pending = true; } if (!CanSend(portid)) { EOUT("Cannot send buffer to output %u", portid); fPending.Pop(); continue; } BufferSize_t sz = fReqests[portid].size; Buffer buf; { LockGuard lock(ObjectMutex()); buf = _TakeBuffer(sz, false, true); if (buf.null()) { fProcessingReq = false; return false; } } // we cannot get buffer, break loop and do not DOUT5("Memory pool %s send buffer size %u to output %u", GetName(), buf.GetTotalSize(), portid); if (CanSend(portid)) { Send(portid, buf); } else { EOUT("Buffer %u is ready, but cannot be add to the queue", buf.GetTotalSize()); buf.Release(); } fPending.Pop(); fReqests[portid].pending = false; if (from_recv && fPending.Empty()) { fProcessingReq = false; return true; } // if there are still requests pending, put it in the queue back if (CanSend(portid)) { fPending.Push(portid); fReqests[portid].pending = true; } } // we need to fire event again while not all requests was processed if (!fPending.Empty() && (cnt<=0)) FireEvent(evntProcessRequests); fProcessingReq = false; return fPending.Empty(); } bool dabc::MemoryPool::CheckChangeCounter(unsigned &cnt) { bool res = cnt!=fChangeCounter; cnt = fChangeCounter; return res; } bool dabc::MemoryPool::Reconstruct(Command cmd) { unsigned buffersize = Cfg(xmlBufferSize, cmd).AsUInt(GetDfltBufSize()); // dabc::SetDebugLevel(2); unsigned numbuffers = Cfg(xmlNumBuffers, cmd).AsUInt(); // dabc::SetDebugLevel(1); unsigned align = Cfg(xmlAlignment, cmd).AsUInt(GetDfltAlignment()); DOUT1("POOL:%s bufsize:%u X num:%u", GetName(), buffersize, numbuffers); if (align) SetAlignment(align); return Allocate(buffersize, numbuffers); } double dabc::MemoryPool::GetUsedRatio() const { LockGuard lock(ObjectMutex()); if (!fMem) return 0.; double sum1 = 0., sum2 = 0.; for(unsigned n=0;nfNumber;n++) { sum1 += fMem->fArr[n].size; if (fMem->fArr[n].refcnt>0) sum2 += fMem->fArr[n].size; } return sum1>0. ? sum2/sum1 : 0.; } uint64_t dabc::MemoryPool::GetTotalSize() const { LockGuard lock(ObjectMutex()); if (!fMem) return 0; uint64_t sum1 = 0; for (unsigned n = 0; n < fMem->fNumber; n++) sum1 += fMem->fArr[n].size; return sum1; } int dabc::MemoryPool::ExecuteCommand(Command cmd) { if (cmd.IsName("CreateNewRequester")) { unsigned portindx = NumOutputs(); std::string name = dabc::format("Output%u", portindx); for (unsigned n=0;n= fReqests.size()) return; if (!on) fReqests[portid].disconn = true; } // ========================================================================= dabc::Reference dabc::MemoryPoolRef::CreateNewRequester() { if (!GetObject()) return dabc::Reference(); dabc::Command cmd("CreateNewRequester"); if (Execute(cmd)==dabc::cmd_true) return FindPort(cmd.GetStr("PortName")); return dabc::Reference(); }