#include #include "FairMQSocket.h" #include "FairMQLogger.h" FairMQSocket::FairMQSocket(int type, int num) : fBytesTx(0), fBytesRx(0), fMessagesTx(0), fMessagesRx(0) { std::stringstream id; id << GetTypeString(type) << "." << num; fId = id.str(); fSocket = nn_socket (AF_SP, type); if (type == NN_SUB) { nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, NULL, 0); } std::stringstream logmsg; logmsg << "created socket #" << fId; FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); } FairMQSocket::~FairMQSocket() { Close(); } TString FairMQSocket::GetId() { return fId; } TString FairMQSocket::GetTypeString(int type) { switch (type) { case NN_SUB: return "sub"; case NN_PUB: return "pub"; case NN_PUSH: return "push"; case NN_PULL: return "pull"; default: return ""; } } int FairMQSocket::Bind(TString address) { std::stringstream logmsg; logmsg << "bind socket #" << fId << " on " << address; FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); int eid = nn_bind(fSocket, address.Data()); if (eid < 0) { std::stringstream logmsg2; logmsg2 << "failed binding socket #" << fId << ", reason: " << nn_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); } return eid; } int FairMQSocket::Connect(TString address) { std::stringstream logmsg; logmsg << "connect socket #" << fId << " to " << address; FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); int eid = nn_connect(fSocket, address.Data()); if (eid < 0) { std::stringstream logmsg2; logmsg2 << "failed connecting socket #" << fId << ", reason: " << nn_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); } return eid; } int FairMQSocket::Receive(FairMQMessage* msg) { void* ptr = msg->GetMessage(); int rc = nn_recv(fSocket, &ptr, NN_MSG, 0); if (rc < 0) { std::stringstream logmsg; logmsg << "failed receiving on socket #" << fId << ", reason: " << nn_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } else { fBytesRx += rc; ++fMessagesRx; msg->SetMessage(ptr, rc); } return rc; } int FairMQSocket::Send(FairMQMessage* msg) { void* ptr = msg->GetMessage(); int rc = nn_send(fSocket, &ptr, NN_MSG, 0); if (rc < 0) { std::stringstream logmsg; logmsg << "failed sending on socket #" << fId << ", reason: " << nn_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } else { fBytesTx += rc; ++fMessagesTx; } return rc; } void FairMQSocket::SetOption(int option, const void* value, size_t valueSize) { int rc = nn_setsockopt(fSocket, NN_SOL_SOCKET, option, value, valueSize); if (rc < 0) { std::stringstream logmsg; logmsg << "failed setting socket option, reason: " << nn_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } } void FairMQSocket::Close() { nn_close(fSocket); } int FairMQSocket::GetSocket() { return fSocket; } ULong_t FairMQSocket::GetBytesTx() { return fBytesTx; } ULong_t FairMQSocket::GetBytesRx() { return fBytesRx; } ULong_t FairMQSocket::GetMessagesTx() { return fMessagesTx; } ULong_t FairMQSocket::GetMessagesRx() { return fMessagesRx; }