/* * Socket.cxx * * Created on: Dec 5, 2012 * Author: dklein */ #include "Socket.h" #include #include "Logger.h" namespace Highway { const std::string Socket::TCP = "tcp://"; const std::string Socket::IPC = "ipc://"; const std::string Socket::INPROC = "inproc://"; Socket::Socket(Context* context, int type, int num) : fBytesTx(0), fBytesRx(0), fMessagesTx(0), fMessagesRx(0) { std::stringstream id; id << context->GetId() << "." << GetTypeString(type) << "." << num; fId = id.str(); fSocket = new zmq::socket_t(*context->GetContext(), type); fSocket->setsockopt(ZMQ_IDENTITY, &fId, fId.length()); if (type == ZMQ_SUB) { fSocket->setsockopt(ZMQ_SUBSCRIBE, NULL, 0); } std::stringstream logmsg; logmsg << "created socket #" << fId; Logger::GetInstance()->Log(Logger::INFO, logmsg.str()); } Socket::~Socket() { } std::string Socket::GetId() { return fId; } std::string Socket::GetTypeString(int type) { switch (type) { case ZMQ_SUB: return "sub"; case ZMQ_PUB: return "pub"; case ZMQ_PUSH: return "push"; case ZMQ_PULL: return "pull"; default: return ""; } } bool Socket::Bind(std::string address) { bool result = true; try { if (!address.empty()) { std::stringstream logmsg; logmsg << "bind socket #" << fId << " on " << address; Logger::GetInstance()->Log(Logger::INFO, logmsg.str()); fSocket->bind(address.c_str()); } } catch (zmq::error_t& e) { std::stringstream logmsg2; logmsg2 << "failed binding socket #" << fId << ", reason: " << e.what(); Logger::GetInstance()->Log(Logger::ERROR, logmsg2.str()); result = false; } return result; } bool Socket::Connect(std::string address) { bool result = true; try { if (!address.empty()) { std::stringstream logmsg; logmsg << "connect socket #" << fId << " to " << address; Logger::GetInstance()->Log(Logger::INFO, logmsg.str()); fSocket->connect(address.c_str()); } } catch (zmq::error_t& e) { std::stringstream logmsg2; logmsg2 << "failed connecting socket #" << fId << ", reason: " << e.what(); Logger::GetInstance()->Log(Logger::ERROR, logmsg2.str()); result = false; } return result; } bool Socket::Send(Message* msg) { bool result = false; try { fBytesTx += msg->Size(); ++fMessagesTx; result = fSocket->send(*msg->GetMessage()); } catch (zmq::error_t& e) { std::stringstream logmsg; logmsg << "failed sending on socket #" << fId << ", reason: " << e.what(); Logger::GetInstance()->Log(Logger::ERROR, logmsg.str()); result = false; } return result; } bool Socket::Receive(Message* msg) { bool result = false; try { result = fSocket->recv(msg->GetMessage()); fBytesRx += msg->Size(); ++fMessagesRx; } catch (zmq::error_t& e) { std::stringstream logmsg; logmsg << "failed receiving on socket #" << fId << ", reason: " << e.what(); Logger::GetInstance()->Log(Logger::ERROR, logmsg.str()); result = false; } return result; } void Socket::Close() { fSocket->close(); } zmq::socket_t* Socket::GetSocket() { return fSocket; } long unsigned int Socket::GetBytesTx() { return fBytesTx; } long unsigned int Socket::GetBytesRx() { return fBytesRx; } long unsigned int Socket::GetMessagesTx() { return fMessagesTx; } long unsigned int Socket::GetMessagesRx() { return fMessagesRx; } } /* namespace Highway */