/************************************************************
* The Data Acquisition Backbone Core (DABC) *
************************************************************
* Copyright (C) 2009 - *
* GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Planckstr. 1, 64291 Darmstadt, Germany *
* Contact: http://dabc.gsi.de *
************************************************************
* This software can be used under the GPL license *
* agreements as stated in LICENSE.txt file *
* which is part of the distribution. *
************************************************************/
// TODO: make example with IB MCast
// one could use global ID like
//
//
#include "dabc/logging.h"
#include "dabc/timing.h"
#include "dabc/ModuleAsync.h"
#include "dabc/Manager.h"
#include "dabc/Factory.h"
#include "dabc/Url.h"
#include "dabc/Configuration.h"
class NetTestSenderModule : public dabc::ModuleAsync {
protected:
std::string fKind;
unsigned fSendCnt;
unsigned fIgnoreNode;
public:
NetTestSenderModule(const std::string &name, dabc::Command cmd) :
dabc::ModuleAsync(name, cmd)
{
fKind = Cfg("Kind", cmd).AsStr();
fSendCnt = dabc::mgr.NodeId();
fIgnoreNode = dabc::mgr.NodeId();
if (IsCmdTest())
CreatePar("CmdExeTime").SetAverage(false, 2);
DOUT2("new NetTestSenderModule %s numout = %d ignore %u done", GetName(), NumOutputs(), fIgnoreNode);
// for(unsigned n=0;n0) {
// WorkerSleep(fSleepTime);
// }
return true;
}
void BeforeModuleStart() override
{
DOUT2("ReceiverModule starting");
}
void AfterModuleStop() override
{
DOUT2("ReceiverModule finish");
}
};
class NetTestSpecialModule : public dabc::ModuleAsync {
protected:
bool fReceiver;
int fCnt;
public:
NetTestSpecialModule(const std::string &name, dabc::Command cmd) :
dabc::ModuleAsync(name, cmd)
{
fReceiver = dabc::mgr.NodeId() < dabc::mgr.NumNodes() - 1;
EnsurePorts(fReceiver ? 1 : 0, fReceiver ? 0 : 1);
fCnt = 0;
}
bool ProcessRecv(unsigned port) override
{
dabc::Buffer buf = Recv(port);
if (buf.null()) { EOUT("no buffer is received"); return false; }
buf.Release();
fCnt++;
return true;
}
bool ProcessSend(unsigned port) override
{
dabc::Buffer buf = TakeBuffer();
if (buf.null()) { EOUT("no free buffer"); return false; }
// buf.SetTotalSize(1000);
Send(port, buf);
fCnt++;
return true;
}
void AfterModuleStop() override
{
DOUT1("Total num buffers %d", fCnt);
}
};
class NetTestFactory : public dabc::Factory {
public:
NetTestFactory(const std::string &name) : dabc::Factory(name) {}
dabc::Module *CreateModule(const std::string &classname, const std::string &modulename, dabc::Command cmd) override
{
if (classname == "NetTestSenderModule")
return new NetTestSenderModule(modulename, cmd);
if (classname == "NetTestReceiverModule")
return new NetTestReceiverModule(modulename, cmd);
if (classname == "NetTestSpecialModule")
return new NetTestSpecialModule(modulename, cmd);
return nullptr;
}
};
dabc::FactoryPlugin nettest(new NetTestFactory("net-test"));