/* * StandaloneMerger.cxx * * Created on: Dec 6, 2012 * Author: dklein */ #include "StandaloneMerger.h" #include #include namespace Highway { StandaloneMerger::StandaloneMerger() { } StandaloneMerger::~StandaloneMerger() { } void StandaloneMerger::Run() { Logger::GetInstance()->Log(Logger::INFO, ">>>>>>> Run <<<<<<<"); bool received0 = false; bool received1 = false; Message* msg0 = NULL; Message* msg1 = NULL; std::string source0 = fPayloadInputs->at(0)->GetId(); std::string source1 = fPayloadInputs->at(1)->GetId(); int size0 = 0; int size1 = 0; // Initialize poll set zmq_pollitem_t items[] = { { *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 }, { *(fPayloadInputs->at(1)->GetSocket()), 0, ZMQ_POLLIN, 0 } }; std::thread logger([&]() { DEVICE::LogSocketRates(); }); while (true) { msg0 = new Message(); msg1 = new Message(); zmq_poll(items, 2, -1); if (items[0].revents & ZMQ_POLLIN) { received0 = fPayloadInputs->at(0)->Receive(msg0); } if (items[1].revents & ZMQ_POLLIN) { received1 = fPayloadInputs->at(1)->Receive(msg1); } if (received0) { size0 = msg0->Size(); fPayloadOutputs->at(0)->Send(msg0); received0 = false; } if (received1) { size1 = msg1->Size(); fPayloadOutputs->at(0)->Send(msg1); received1 = false; } delete msg0; delete msg1; } logger.join(); } } /* namespace Highway */