/************************************************************ * The Data Acquisition Backbone Core (DABC) * ************************************************************ * Copyright (C) 2009 - * * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Planckstr. 1, 64291 Darmstadt, Germany * * Contact: http://dabc.gsi.de * ************************************************************ * This software can be used under the GPL license * * agreements as stated in LICENSE.txt file * * which is part of the distribution. * ************************************************************/ // TODO: make example with IB MCast // one could use global ID like // // #include "dabc/logging.h" #include "dabc/timing.h" #include "dabc/ModuleAsync.h" #include "dabc/Manager.h" #include "dabc/Factory.h" #include "dabc/Url.h" #include "dabc/Configuration.h" class NetTestSenderModule : public dabc::ModuleAsync { protected: std::string fKind; unsigned fSendCnt; unsigned fIgnoreNode; public: NetTestSenderModule(const std::string &name, dabc::Command cmd) : dabc::ModuleAsync(name, cmd) { fKind = Cfg("Kind", cmd).AsStr(); fSendCnt = dabc::mgr.NodeId(); fIgnoreNode = dabc::mgr.NodeId(); if (IsCmdTest()) CreatePar("CmdExeTime").SetAverage(false, 2); DOUT2("new NetTestSenderModule %s numout = %d ignore %u done", GetName(), NumOutputs(), fIgnoreNode); // for(unsigned n=0;n0) { // WorkerSleep(fSleepTime); // } return true; } void BeforeModuleStart() override { DOUT2("ReceiverModule starting"); } void AfterModuleStop() override { DOUT2("ReceiverModule finish"); } }; class NetTestSpecialModule : public dabc::ModuleAsync { protected: bool fReceiver; int fCnt; public: NetTestSpecialModule(const std::string &name, dabc::Command cmd) : dabc::ModuleAsync(name, cmd) { fReceiver = dabc::mgr.NodeId() < dabc::mgr.NumNodes() - 1; EnsurePorts(fReceiver ? 1 : 0, fReceiver ? 0 : 1); fCnt = 0; } bool ProcessRecv(unsigned port) override { dabc::Buffer buf = Recv(port); if (buf.null()) { EOUT("no buffer is received"); return false; } buf.Release(); fCnt++; return true; } bool ProcessSend(unsigned port) override { dabc::Buffer buf = TakeBuffer(); if (buf.null()) { EOUT("no free buffer"); return false; } // buf.SetTotalSize(1000); Send(port, buf); fCnt++; return true; } void AfterModuleStop() override { DOUT1("Total num buffers %d", fCnt); } }; class NetTestFactory : public dabc::Factory { public: NetTestFactory(const std::string &name) : dabc::Factory(name) {} dabc::Module *CreateModule(const std::string &classname, const std::string &modulename, dabc::Command cmd) override { if (classname == "NetTestSenderModule") return new NetTestSenderModule(modulename, cmd); if (classname == "NetTestReceiverModule") return new NetTestReceiverModule(modulename, cmd); if (classname == "NetTestSpecialModule") return new NetTestSpecialModule(modulename, cmd); return nullptr; } }; dabc::FactoryPlugin nettest(new NetTestFactory("net-test"));