// $Id$ /************************************************************ * The Data Acquisition Backbone Core (DABC) * ************************************************************ * Copyright (C) 2009 - * * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Planckstr. 1, 64291 Darmstadt, Germany * * Contact: http://dabc.gsi.de * ************************************************************ * This software can be used under the GPL license * * agreements as stated in LICENSE.txt file * * which is part of the distribution. * ************************************************************/ #include "dabc/SocketDevice.h" #include #include "dabc/SocketTransport.h" #include "dabc/Manager.h" #include "dabc/Configuration.h" #include "dabc/ConnectionManager.h" #define SocketServerTmout 0.2 namespace dabc { class SocketProtocolAddon; // TODO: Can we use here connection manager name - all information already there class NewConnectRec { public: std::string fReqItem; ///< reference in connection request SocketClientAddon* fClient{nullptr}; ///< client-side processor, to establish connection SocketProtocolAddon* fProtocol{nullptr}; ///< protocol processor, to verify connection id double fTmOut{0}; ///< used by device to process connection timeouts std::string fConnId; ///RemoveProtocolAddon(this, res); DeleteWorker(); } void OnSocketError(int, const std::string&) override { FinishWork(false); } void OnThreadAssigned() override { dabc::SocketIOAddon::OnThreadAssigned(); uint32_t header = SocketDevice::headerConnect; switch (fState) { case stServerProto: StartRecv(fInBuf, SocketDevice::ProtocolMsgSize); break; case stClientProto: // we can start both send and recv operations simultaneously, // while buffer will be received only when server answer on request memcpy(fOutBuf, &header, sizeof(header)); strncpy(fOutBuf + sizeof(header), fRec->ConnId(), sizeof(fOutBuf) - sizeof(header) - 1); strncpy(fInBuf, "denied", sizeof(fInBuf)-1); StartSend(fOutBuf, SocketDevice::ProtocolMsgSize); StartRecv(fInBuf, SocketDevice::ProtocolMsgSize); break; case stRedirect: // do like we receive input buffer ourself fDevice->ServerProtocolRequest(this, fInBuf, fOutBuf); StartSend(fOutBuf, SocketDevice::ProtocolMsgSize); break; default: EOUT("Wrong state %d", fState); FinishWork(false); } } void OnSendCompleted() override { switch (fState) { case stServerProto: case stRedirect: // DOUT5("Server job finished"); if (fDevice->ProtocolCompleted(this, nullptr)) DeleteWorker(); break; case stClientProto: // DOUT5("Client send request, wait reply"); break; default: EOUT("Wrong state %d", fState); FinishWork(false); } } void OnRecvCompleted() override { switch (fState) { case stServerProto: fDevice->ServerProtocolRequest(this, fInBuf, fOutBuf); StartSend(fOutBuf, SocketDevice::ProtocolMsgSize); break; case stClientProto: DOUT5("Client job finished"); if (fDevice->ProtocolCompleted(this, fInBuf)) DeleteWorker(); break; default: EOUT("Wrong state %d", fState); FinishWork(false); } } }; } // ______________________________________________________________________ dabc::SocketDevice::SocketDevice(const std::string &name, Command cmd1) : dabc::Device(name), fConnRecs(), fProtocols(), fConnCounter(0), fCmdChannelId(), fDebugMode(false) { fBindHost = Cfg("host", cmd1).AsStr(); fBindPort = Cfg("port", cmd1).AsInt(-1); if (fBindHost.empty() && (fBindPort < 0)) { dabc::WorkerRef chl = dabc::mgr.GetCommandChannel(); if (!chl.null()) { dabc::Command cmd2("RedirectSocketConnect"); cmd2.SetStr("Device", ItemName()); if (chl.Execute(cmd2)) fCmdChannelId = cmd2.GetStr("ServerId"); } if (!fCmdChannelId.empty()) DOUT0("Socket device %s reuses %s for connections", GetName(), fCmdChannelId.c_str()); } if (fCmdChannelId.empty() && fBindHost.empty()) fBindHost = dabc::Configuration::GetLocalHost(); } dabc::SocketDevice::~SocketDevice() { // FIXME: cleanup should be done much earlier CleanupRecs(-1); while (fProtocols.size() > 0) { SocketProtocolAddon* pr = (SocketProtocolAddon*) fProtocols[0]; fProtocols.remove_at(0); dabc::Object::Destroy(pr); } } std::string dabc::SocketDevice::StartServerAddon() { // in standard case use server socket of command channel if (!fCmdChannelId.empty()) return fCmdChannelId; SocketServerAddon* serv = dynamic_cast (fAddon()); if (!serv) { int port0 = fBindPort, portmin(7000), portmax(9000); if (port0 > 0) portmin = portmax = 0; serv = dabc::SocketThread::CreateServerAddon(fBindHost, port0, portmin, portmax); DOUT0("SocketDevice creates server with ID %s", serv->ServerId().c_str()); AssignAddon(serv); } if (!serv) return std::string(); return serv->ServerId(); } void dabc::SocketDevice::AddRec(NewConnectRec* rec) { if (!rec) return; bool firetmout = false; { LockGuard guard(DeviceMutex()); fConnRecs.emplace_back(rec); firetmout = (fConnRecs.size() == 1); } if (firetmout) ActivateTimeout(0.); } void dabc::SocketDevice::DestroyRec(NewConnectRec* rec, bool res) { if (!rec) return; if (rec->fClient) EOUT("Is client %p is destroyed?", rec->fClient); if (rec->fProtocol) EOUT("Is protocol %p is destroyed?", rec->fProtocol); rec->fLocalCmd.ReplyBool(res); delete rec; } dabc::NewConnectRec* dabc::SocketDevice::_FindRec(const char *connid) { for (unsigned n=0; nIsConnId(connid)) return rec; } return nullptr; } bool dabc::SocketDevice::CleanupRecs(double tmout) { PointersVector del_recs; bool more_timeout = false; { LockGuard guard(DeviceMutex()); unsigned n = 0; while (nfTmOut -= tmout; if ((rec->fTmOut<0) || (tmout<0)) { if (tmout>0) EOUT("Record %u timedout", n); fConnRecs.remove_at(n); del_recs.emplace_back(rec); } else n++; } more_timeout = fConnRecs.size() > 0; } for (unsigned n=0;nSocket(), req.GetServerId().c_str()); // try to make little bit faster than timeout expire why we need // some time for the connection protocol client->SetRetryOpt(5, req.GetConnTimeout()); client->SetConnHandler(this, req.GetConnId()); rec = new NewConnectRec(reqitem, req, client); AddRec(rec); thread().MakeWorkerFor(client); } else { if (fDebugMode) DOUT0("scktdev: FAIL to create client for server %s", req.GetServerId().c_str()); } } // reply remote command that one other side can start connection req.ReplyRemoteCommand(rec!=nullptr); if (!rec) return cmd_false; rec->fLocalCmd = cmd; return cmd_postponed; } default: EOUT("Request from connection manager in undefined situation progress = %d ???", req.progress()); break; } return cmd_true; } int dabc::SocketDevice::ExecuteCommand(Command cmd) { int cmd_res = cmd_true; if (cmd.IsName(CmdConnectionManagerHandle::CmdName())) { cmd_res = HandleManagerConnectionRequest(cmd); } else if (cmd.IsName("SocketConnect")) { std::string typ = cmd.GetStr("Type"); std::string connid = cmd.GetStr("ConnId"); int fd = cmd.GetInt("fd", -1); if (typ == "Server") { DOUT2("SocketDevice:: create server protocol for socket %d connid %s", fd, connid.c_str()); auto proto = new SocketProtocolAddon(fd, this, nullptr); thread().MakeWorkerFor(proto, connid); LockGuard guard(DeviceMutex()); fProtocols.emplace_back(proto); } else if (typ == "Client") { SocketProtocolAddon* proto = nullptr; { LockGuard guard(DeviceMutex()); NewConnectRec* rec = _FindRec(connid.c_str()); if (!rec) { EOUT("Client connected for not exiting rec %s", connid.c_str()); close(fd); cmd_res = cmd_false; } else { DOUT2("SocketDevice:: create client protocol for socket %d connid:%s", fd, connid.c_str()); proto = new SocketProtocolAddon(fd, this, rec); rec->fClient = nullptr; // if we get command, client is destroyed rec->fProtocol = proto; } } if (proto) thread().MakeWorkerFor(proto, connid); } else if (typ == "Error") { NewConnectRec* rec = nullptr; { LockGuard guard(DeviceMutex()); rec = _FindRec(connid.c_str()); if (!rec) { EOUT("Client error for not existing rec %s", connid.c_str()); cmd_res = cmd_false; } else { EOUT("Client error for connid %s", connid.c_str()); rec->fClient = nullptr; // if we get command, client is destroyed fConnRecs.remove(rec); } } if (rec) DestroyRec(rec, false); } else cmd_res = cmd_false; } else if (cmd.IsName("RedirectConnect")) { int fd = cmd.GetInt("Socket"); Buffer buf = cmd.GetRawData(); if (fDebugMode) DOUT0("scktdev: handle redirected socket %d", fd); auto proto = new SocketProtocolAddon(fd, this, nullptr, buf.SegmentPtr()); thread().MakeWorkerFor(proto, fCmdChannelId); LockGuard guard(DeviceMutex()); fProtocols.emplace_back(proto); cmd_res = cmd_true; } else if (cmd.IsName("EnableDebug")) { fDebugMode = true; } else { cmd_res = dabc::Device::ExecuteCommand(cmd); } return cmd_res; } void dabc::SocketDevice::ServerProtocolRequest(SocketProtocolAddon *proc, const char *inmsg, char *outmsg) { strcpy(outmsg, "denied"); uint32_t *header = (uint32_t*) inmsg; if (*header != SocketDevice::headerConnect) { EOUT("Wrong header identifier in the SOCKET connect"); return; } NewConnectRec* rec = nullptr; { LockGuard guard(DeviceMutex()); rec = _FindRec(inmsg+sizeof(uint32_t)); if (!rec) return; } strcpy(outmsg, "accepted"); if (fDebugMode) DOUT0("scktdev: sending accept message via socket %d", proc->Socket()); LockGuard guard(DeviceMutex()); fProtocols.remove(proc); rec->fProtocol = proc; proc->fRec = rec; } bool dabc::SocketDevice::ProtocolCompleted(SocketProtocolAddon* proc, const char *inmsg) { NewConnectRec* rec = proc->fRec; bool destr = false; { LockGuard guard(DeviceMutex()); if (!rec || !fConnRecs.has_ptr(rec)) { EOUT("Protocol completed without rec %p", rec); fProtocols.remove(proc); destr = true; } } if (destr) return true; bool res = true; if (inmsg) res = (strcmp(inmsg, "accepted") == 0); if (inmsg) DOUT3("Reply from server: %s", inmsg); if (fDebugMode && inmsg) DOUT0("scktdev: receive message %s via socket %d", inmsg, proc->Socket()); if (res) { // create transport for the established connection int fd = proc->TakeSocket(); ConnectionRequestFull req = dabc::mgr.FindPar(rec->fReqItem); auto addon = new SocketNetworkInetrface(fd); res = dabc::NetworkTransport::Make(req, addon, ThreadName()); DOUT0("Create socket transport for fd %d res %s", fd, DBOOL(res)); } RemoveProtocolAddon(proc, res); return true; } void dabc::SocketDevice::RemoveProtocolAddon(SocketProtocolAddon* proc, bool res) { if (!proc) return; NewConnectRec* rec = proc->fRec; { LockGuard guard(DeviceMutex()); if (rec) { fConnRecs.remove(rec); rec->fProtocol = nullptr; } else fProtocols.remove(proc); } DestroyRec(rec, res); }