/* * Device.txx * * Created on: Oct 25, 2012 * Author: dklein */ #include "Socket.h" namespace Highway { template Device::Device() : fId(""), fNumIoThreads(1), fPayloadContext(NULL), fBindAddress(new std::vector(numPayloadOutputs)), fConnectAddress(new std::vector(numPayloadInputs)), fPayloadInputs(new std::vector()), fPayloadOutputs(new std::vector()), fLogIntervalInMs(1000) { fBindSocketType = new std::vector(); fBindSndBufferSize = new std::vector(); fBindRcvBufferSize = new std::vector(); for (int i = 0; i < numPayloadOutputs; ++i) { fBindSocketType->push_back(ZMQ_PUB); fBindSndBufferSize->push_back(10000); fBindRcvBufferSize->push_back(10000); } fConnectSocketType = new std::vector(); fConnectSndBufferSize = new std::vector(); fConnectRcvBufferSize = new std::vector(); for (int i = 0; i < numPayloadInputs; ++i) { fConnectSocketType->push_back(ZMQ_SUB); fConnectSndBufferSize->push_back(10000); fConnectRcvBufferSize->push_back(10000); } } template void Device::Init() { Logger::GetInstance()->Log(Logger::INFO, ">>>>>>> Init <<<<<<<"); std::stringstream logmsg; logmsg << "numIoThreads: " << fNumIoThreads; Logger::GetInstance()->Log(Logger::INFO, logmsg.str()); fPayloadContext = new Context(fId, Context::PAYLOAD, fNumIoThreads); for (int i = 0; i < numPayloadInputs; ++i) { Socket *socket = new Socket(fPayloadContext, fConnectSocketType->at(i), i); socket->GetSocket()->setsockopt(ZMQ_SNDHWM, &fConnectSndBufferSize->at(i), sizeof(fConnectSndBufferSize->at(i))); socket->GetSocket()->setsockopt(ZMQ_RCVHWM, &fConnectRcvBufferSize->at(i), sizeof(fConnectRcvBufferSize->at(i))); fPayloadInputs->push_back(socket); } for (int i = 0; i < numPayloadOutputs; ++i) { Socket *socket = new Socket(fPayloadContext, fBindSocketType->at(i), i); socket->GetSocket()->setsockopt(ZMQ_SNDHWM, &fBindSndBufferSize->at(i), sizeof(fBindSndBufferSize->at(i))); socket->GetSocket()->setsockopt(ZMQ_RCVHWM, &fBindRcvBufferSize->at(i), sizeof(fBindRcvBufferSize->at(i))); fPayloadOutputs->push_back(socket); } } template void Device::Bind() { Logger::GetInstance()->Log(Logger::INFO, ">>>>>>> Bind <<<<<<<"); int i = 0; for (Socket *output : *fPayloadOutputs) { try { output->Bind(fBindAddress->at(i)); } catch (std::out_of_range &e) { } ++i; } } template void Device::Connect() { Logger::GetInstance()->Log(Logger::INFO, ">>>>>>> Connect <<<<<<<"); int i = 0; for (Socket *input : *fPayloadInputs) { try { input->Connect(fConnectAddress->at(i)); } catch (std::out_of_range &e) { } ++i; } } template void Device::Run() { } template void Device::Pause() { } template void Device::Shutdown() { for (Socket *input : *fPayloadInputs) { input->Close(); } for (Socket *output : *fPayloadOutputs) { output->Close(); } fPayloadContext->Close(); } template void Device::LogSocketRates() { std::chrono::high_resolution_clock::time_point t0; std::chrono::high_resolution_clock::time_point t1; std::chrono::milliseconds timeSinceLastLog; long unsigned int *bytesInput = new long unsigned int[numPayloadInputs]; long unsigned int *messagesInput = new long unsigned int[numPayloadInputs]; long unsigned int *bytesOutput = new long unsigned int[numPayloadOutputs]; long unsigned int *messagesOutput = new long unsigned int[numPayloadOutputs]; long unsigned int *bytesInputNew = new long unsigned int[numPayloadInputs]; long unsigned int *messagesInputNew = new long unsigned int[numPayloadInputs]; long unsigned int *bytesOutputNew = new long unsigned int[numPayloadOutputs]; long unsigned int *messagesOutputNew = new long unsigned int[numPayloadOutputs]; double *megabytesPerSecondInput = new double[numPayloadInputs]; double *messagesPerSecondInput = new double[numPayloadInputs]; double *megabytesPerSecondOutput = new double[numPayloadOutputs]; double *messagesPerSecondOutput = new double[numPayloadOutputs]; int i = 0; for (Socket *socket : *fPayloadInputs) { bytesInput[i] = socket->GetBytesRx(); messagesInput[i] = socket->GetMessagesRx(); ++i; } i = 0; for (Socket *socket : *fPayloadOutputs) { bytesOutput[i] = socket->GetBytesTx(); messagesOutput[i] = socket->GetMessagesTx(); ++i; } t0 = std::chrono::high_resolution_clock::now(); while (true) { std::this_thread::sleep_for( std::chrono::milliseconds(fLogIntervalInMs)); t1 = std::chrono::high_resolution_clock::now(); timeSinceLastLog = std::chrono::duration_cast(t1 - t0); i = 0; for (Socket *socket : *fPayloadInputs) { bytesInputNew[i] = socket->GetBytesRx(); megabytesPerSecondInput[i] = ((double) (bytesInputNew[i] - bytesInput[i]) / (1024. * 1024.)) / (double) timeSinceLastLog.count() * 1000.; bytesInput[i] = bytesInputNew[i]; messagesInputNew[i] = socket->GetMessagesRx(); messagesPerSecondInput[i] = (double) (messagesInputNew[i] - messagesInput[i]) / (double) timeSinceLastLog.count() * 1000.; messagesInput[i] = messagesInputNew[i]; std::stringstream logmsg; logmsg << "#" << socket->GetId() << ": " << messagesPerSecondInput[i] << " msg/s, " << megabytesPerSecondInput[i] << " MB/s"; Logger::GetInstance()->Log(Logger::DEBUG, logmsg.str()); ++i; } i = 0; for (Socket *socket : *fPayloadOutputs) { bytesOutputNew[i] = socket->GetBytesTx(); megabytesPerSecondOutput[i] = ((double) (bytesOutputNew[i] - bytesOutput[i]) / (1024. * 1024.)) / (double) timeSinceLastLog.count() * 1000.; bytesOutput[i] = bytesOutputNew[i]; messagesOutputNew[i] = socket->GetMessagesTx(); messagesPerSecondOutput[i] = (double) (messagesOutputNew[i] - messagesOutput[i]) / (double) timeSinceLastLog.count() * 1000.; messagesOutput[i] = messagesOutputNew[i]; std::stringstream logmsg; logmsg << "#" << socket->GetId() << ": " << messagesPerSecondOutput[i] << " msg/s, " << megabytesPerSecondOutput[i] << " MB/s"; Logger::GetInstance()->Log(Logger::DEBUG, logmsg.str()); ++i; } t0 = t1; } delete[] bytesInput; delete[] messagesInput; delete[] bytesOutput; delete[] messagesOutput; delete[] bytesInputNew; delete[] messagesInputNew; delete[] bytesOutputNew; delete[] messagesOutputNew; delete[] megabytesPerSecondInput; delete[] messagesPerSecondInput; delete[] megabytesPerSecondOutput; delete[] messagesPerSecondOutput; } template void Device::SetProperty(int key, std::string value, int slot/*= 0*/) { switch (key) { case Id: fId = value; break; case BindAddress: fBindAddress->erase(fBindAddress->begin() + slot); fBindAddress->insert(fBindAddress->begin() + slot, value); break; case ConnectAddress: fConnectAddress->erase(fConnectAddress->begin() + slot); fConnectAddress->insert(fConnectAddress->begin() + slot, value); break; default: Configurable::SetProperty(key, value, slot); break; } } template std::string Device::GetProperty(int key, std::string default_/*= ""*/, int slot/*= 0*/) { switch (key) { case Id: return fId; case BindAddress: return fBindAddress->at(slot); case ConnectAddress: return fConnectAddress->at(slot); default: return Configurable::GetProperty(key, default_, slot); } } template void Device::SetProperty(int key, int value, int slot/*= 0*/) { switch (key) { case NumIoThreads: fNumIoThreads = value; break; case LogIntervalInMs: fLogIntervalInMs = value; break; case BindSocketType: fBindSocketType->erase(fBindSocketType->begin() + slot); fBindSocketType->insert(fBindSocketType->begin() + slot, value); break; case ConnectSocketType: fConnectSocketType->erase(fConnectSocketType->begin() + slot); fConnectSocketType->insert(fConnectSocketType->begin() + slot, value); break; case BindSndBufferSize: fBindSndBufferSize->erase(fBindSndBufferSize->begin() + slot); fBindSndBufferSize->insert(fBindSndBufferSize->begin() + slot, value); break; case BindRcvBufferSize: fBindRcvBufferSize->erase(fBindRcvBufferSize->begin() + slot); fBindRcvBufferSize->insert(fBindRcvBufferSize->begin() + slot, value); break; case ConnectSndBufferSize: fConnectSndBufferSize->erase(fConnectSndBufferSize->begin() + slot); fConnectSndBufferSize->insert(fConnectSndBufferSize->begin() + slot, value); break; case ConnectRcvBufferSize: fConnectRcvBufferSize->erase(fConnectRcvBufferSize->begin() + slot); fConnectRcvBufferSize->insert(fConnectRcvBufferSize->begin() + slot, value); break; default: Configurable::SetProperty(key, value, slot); break; } } template int Device::GetProperty(int key, int default_/*= 0*/, int slot/*= 0*/) { switch (key) { case NumIoThreads: return fNumIoThreads; case LogIntervalInMs: return fLogIntervalInMs; case BindSocketType: return fBindSocketType->at(slot); case ConnectSocketType: return fConnectSocketType->at(slot); case ConnectSndBufferSize: return fConnectSndBufferSize->at(slot); case ConnectRcvBufferSize: return fConnectRcvBufferSize->at(slot); case BindSndBufferSize: return fBindSndBufferSize->at(slot); case BindRcvBufferSize: return fBindRcvBufferSize->at(slot); default: return Configurable::GetProperty(key, default_, slot); } } template Device::~Device() { for (Socket *input : *fPayloadInputs) { delete input; } for (Socket *output : *fPayloadOutputs) { delete output; } delete fBindAddress; delete fConnectAddress; delete fPayloadInputs; delete fPayloadOutputs; } } /* namespace Highway */