// $Id$ /************************************************************ * The Data Acquisition Backbone Core (DABC) * ************************************************************ * Copyright (C) 2009 - * * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Planckstr. 1, 64291 Darmstadt, Germany * * Contact: http://dabc.gsi.de * ************************************************************ * This software can be used under the GPL license * * agreements as stated in LICENSE.txt file * * which is part of the distribution. * ************************************************************/ #include "dabc/SocketThread.h" #include /* See NOTES */ #include #include #include #include #include #include #include #include #include #include "dabc/Configuration.h" #if defined(__MACH__) /* Apple OSX section */ #if !defined(MSG_NOSIGNAL) #define MSG_NOSIGNAL SO_NOSIGPIPE #endif #endif const char *SocketErr(int err) { switch (err) { case -1: return "Internal"; case 0: return "Close"; case EAGAIN: return "EAGAIN"; case EBADF: return "EBADF"; case ECONNREFUSED: return "ECONNREFUSED"; case EFAULT: return "EFAULT"; case EINTR: return "EINTR"; case EINVAL: return "EINVAL"; case ENOMEM: return "ENOMEM"; case ENOTCONN: return "ENOTCONN"; case ENOTSOCK: return "ENOTSOCK"; case EACCES: return "EACCES"; case ECONNRESET: return "ECONNRESET"; case EDESTADDRREQ: return "EDESTADDRREQ"; case EISCONN: return "EISCONN"; case EMSGSIZE: return "EMSGSIZE"; case ENOBUFS: return "ENOBUFS"; case EOPNOTSUPP: return "EOPNOTSUPP"; case EPIPE: return "EPIPE"; case EPERM: return "EPERM"; case EADDRINUSE: return "EADDRINUSE"; case EAFNOSUPPORT: return "EAFNOSUPPORT"; case EALREADY: return "EALREADY"; case EINPROGRESS: return "EINPROGRESS"; case ENETUNREACH: return "ENETUNREACH"; case ETIMEDOUT: return "ETIMEDOUT"; } return "UNCKNOWN"; } // _____________________________________________________________________ dabc::SocketAddon::SocketAddon(int fd) : WorkerAddon("socket"), fSocket(fd), fDoingInput(false), fDoingOutput(false), fIOPriority(1), fDeliverEventsToWorker(false), fDeleteWorkerOnClose(false) { } dabc::SocketAddon::~SocketAddon() { CloseSocket(); } void dabc::SocketAddon::ProcessEvent(const EventId& evnt) { switch (evnt.GetCode()) { case evntSocketRead: break; case evntSocketWrite: break; case evntSocketError: OnSocketError(-1, "get error event"); break; default: WorkerAddon::ProcessEvent(evnt); } } void dabc::SocketAddon::SetSocket(int fd) { CloseSocket(); fSocket = fd; } int dabc::SocketAddon::TakeSocket() { int fd = fSocket; fSocket = -1; return fd; } void dabc::SocketAddon::CloseSocket() { if (fSocket<0) return; DOUT3("~~~~~~~~~~~~~~~~ Close socket %d", fSocket); close(fSocket); fSocket = -1; } int dabc::SocketAddon::TakeSocketError() { if (Socket()<0) return -1; int myerrno = 753642; socklen_t optlen = sizeof(myerrno); int res = getsockopt(Socket(), SOL_SOCKET, SO_ERROR, &myerrno, &optlen); if ((res<0) || (myerrno == 753642)) return -1; return myerrno; } void dabc::SocketAddon::OnSocketError(int msg, const std::string &) { if (IsDeliverEventsToWorker()) { DOUT2("Addon:%p Connection closed - worker should process", this); FireWorkerEvent(msg == 0 ? evntSocketCloseInfo : evntSocketErrorInfo); } else if (fDeleteWorkerOnClose) { DOUT2("Connection closed - destroy socket"); CloseSocket(); DeleteWorker(); } else { DOUT2("Connection closed - destroy addon"); CloseSocket(); DeleteAddonItself(); } } ssize_t dabc::SocketAddon::DoRecvBuffer(void* buf, ssize_t len) { ssize_t res = recv(fSocket, buf, len, MSG_DONTWAIT | MSG_NOSIGNAL); if (res == 0) OnSocketError(0, "closed during recv()"); else if (res < 0) { if (errno != EAGAIN) OnSocketError(errno, "when recv()"); } return res; } ssize_t dabc::SocketAddon::DoRecvBufferHdr(void* hdr, ssize_t hdrlen, void* buf, ssize_t len, void* srcaddr, unsigned srcaddrlen) { struct iovec iov[2]; iov[0].iov_base = hdr; iov[0].iov_len = hdrlen; iov[1].iov_base = buf; iov[1].iov_len = len; struct msghdr msg; msg.msg_name = srcaddr; msg.msg_namelen = srcaddrlen; msg.msg_iov = iov; msg.msg_iovlen = buf ? 2 : 1; msg.msg_control = nullptr; msg.msg_controllen = 0; msg.msg_flags = 0; ssize_t res = recvmsg(fSocket, &msg, MSG_DONTWAIT | MSG_NOSIGNAL); if (res == 0) OnSocketError(0, "when recvmsg()"); else if (res < 0) { if (errno != EAGAIN) OnSocketError(errno, "when recvmsg()"); } return res; } ssize_t dabc::SocketAddon::DoSendBuffer(void* buf, ssize_t len) { ssize_t res = send(fSocket, buf, len, MSG_DONTWAIT | MSG_NOSIGNAL); if (res == 0) OnSocketError(0, "when send()"); else if (res < 0) { if (errno != EAGAIN) OnSocketError(errno, "When send()"); } return res; } ssize_t dabc::SocketAddon::DoSendBufferHdr(void* hdr, ssize_t hdrlen, void* buf, ssize_t len, void* tgtaddr, unsigned tgtaddrlen) { struct iovec iov[2]; iov[0].iov_base = hdr; iov[0].iov_len = hdrlen; iov[1].iov_base = buf; iov[1].iov_len = len; struct msghdr msg; msg.msg_name = tgtaddr; msg.msg_namelen = tgtaddrlen; msg.msg_iov = iov; msg.msg_iovlen = buf ? 2 : 1; msg.msg_control = nullptr; msg.msg_controllen = 0; msg.msg_flags = 0; ssize_t res = sendmsg(fSocket, &msg, MSG_DONTWAIT | MSG_NOSIGNAL); if (res == 0) OnSocketError(0, "when sendmsg()"); else if (res < 0) { if (errno != EAGAIN) OnSocketError(errno, "When sendmsg()"); } return res; } // _____________________________________________________________________ dabc::SocketIOAddon::SocketIOAddon(int fd, bool isdatagram, bool usemsg) : SocketAddon(fd), fDatagramSocket(isdatagram), fUseMsgOper(usemsg), fSendUseMsg(true), fSendIOV(nullptr), fSendIOVSize(0), fSendIOVFirst(0), fSendIOVNumber(0), fSendUseAddr(false), fRecvUseMsg(true), fRecvIOV(nullptr), fRecvIOVSize(0), fRecvIOVFirst(0), fRecvIOVNumber(0), fLastRecvSize(0) { if (IsDatagramSocket() && !fUseMsgOper) { EOUT("Dangerous - datagram socket MUST use sendmsg()/recvmsg() operation to be able send/recv segmented buffers, force"); fUseMsgOper = true; } memset(&fSendAddr, 0, sizeof(fSendAddr)); #ifdef SOCKET_PROFILING fSendOper = 0; fSendTime = 0.; fSendSize = 0; fRecvOper = 0; fRecvTime = 0.; fRecvSize = 0.; #endif } dabc::SocketIOAddon::~SocketIOAddon() { #ifdef SOCKET_PROFILING DOUT1("SocketIOAddon::~SocketIOAddon Send:%ld Recv:%ld", fSendOper, fRecvOper); if (fSendOper>0) DOUT1(" Send time:%5.1f microsec sz:%7.1f", fSendTime*1e6/fSendOper, 1.*fSendSize/fSendOper); if (fRecvOper>0) DOUT1(" Recv time:%5.1f microsec sz:%7.1f", fRecvTime*1e6/fRecvOper, 1.*fRecvSize/fRecvOper); #endif DOUT4("Destroying SocketIOAddon %p fd:%d", this, Socket()); AllocateSendIOV(0); AllocateRecvIOV(0); } void dabc::SocketIOAddon::SetSendAddr(const std::string &host, int port) { if (!IsDatagramSocket()) { EOUT("Cannot specify send addr for non-datagram sockets"); return; } memset(&fSendAddr, 0, sizeof(fSendAddr)); fSendUseAddr = false; /* if (!host.empty() && (port>0)) { fSendUseAddr = true; fSendAddr.sin_family = AF_INET; fSendAddr.sin_addr.s_addr = inet_addr(host.c_str()); fSendAddr.sin_port = htons(port); } */ struct hostent *h = gethostbyname(host.c_str()); if (!h || (h->h_addrtype != AF_INET) || host.empty()) { EOUT("Cannot get host information for %s", host.c_str()); return; } fSendAddr.sin_family = AF_INET; memcpy(&fSendAddr.sin_addr.s_addr, h->h_addr_list[0], h->h_length); fSendAddr.sin_port = htons (port); fSendUseAddr = true; } void dabc::SocketIOAddon::AllocateSendIOV(unsigned size) { if (fSendIOV) delete [] fSendIOV; fSendIOV = nullptr; fSendIOVSize = 0; fSendIOVFirst = 0; fSendIOVNumber = 0; if (size <= 0) return; fSendIOVSize = size; fSendIOV = new struct iovec [size]; } void dabc::SocketIOAddon::AllocateRecvIOV(unsigned size) { if (fRecvIOV) delete [] fRecvIOV; fRecvIOV = nullptr; fRecvIOVSize = 0; fRecvIOVFirst = 0; fRecvIOVNumber = 0; if (size <= 0) return; fRecvIOVSize = size; fRecvIOV = new struct iovec [size]; } bool dabc::SocketIOAddon::StartSend(const void* buf1, unsigned size1, const void* buf2, unsigned size2, const void* buf3, unsigned size3) { if (fSendIOVNumber>0) { EOUT("Current send operation not yet completed"); return false; } if (fSendIOVSize<3) AllocateSendIOV(8); int indx = 0; if (buf1 && (size1>0)) { fSendIOV[indx].iov_base = (void*) buf1; fSendIOV[indx].iov_len = size1; indx++; } if (buf2 && (size2>0)) { fSendIOV[indx].iov_base = (void*) buf2; fSendIOV[indx].iov_len = size2; indx++; } if (buf3 && (size3>0)) { fSendIOV[indx].iov_base = (void*) buf3; fSendIOV[indx].iov_len = size3; indx++; } if (indx == 0) { EOUT("No buffer specified"); return false; } fSendUseMsg = fUseMsgOper; fSendIOVFirst = 0; fSendIOVNumber = indx; // TODO: Should we inform thread directly that we want to send data?? SetDoingOutput(true); return true; } bool dabc::SocketIOAddon::StartRecv(void* buf, size_t size) { return StartRecvHdr(nullptr, 0, buf, size); } bool dabc::SocketIOAddon::StartSend(const Buffer& buf) { // this is simple version, // where only buffer itself without header is transported return StartNetSend(nullptr, 0, buf); } bool dabc::SocketIOAddon::StartRecvHdr(void* hdr, unsigned hdrsize, void* buf, size_t size) { if (fRecvIOVNumber>0) { EOUT("Current recv operation not yet completed"); return false; } if (fRecvIOVSize<2) AllocateRecvIOV(8); int indx = 0; if (hdr && (hdrsize > 0)) { fRecvIOV[indx].iov_base = hdr; fRecvIOV[indx].iov_len = hdrsize; indx++; } fRecvIOV[indx].iov_base = buf; fRecvIOV[indx].iov_len = size; indx++; fRecvUseMsg = fUseMsgOper; fRecvIOVFirst = 0; fRecvIOVNumber = indx; // TODO: Should we inform thread directly that we want to recv data?? SetDoingInput(true); return true; } bool dabc::SocketIOAddon::StartRecv(Buffer& buf, BufferSize_t datasize) { return StartNetRecv(nullptr, 0, buf, datasize); } bool dabc::SocketIOAddon::StartNetRecv(void* hdr, unsigned hdrsize, Buffer& buf, BufferSize_t datasize) { // datasize == 0 here really means that there is no data to get !!!! if (fRecvIOVNumber>0) { EOUT("Current recv operation not yet completed"); return false; } if (buf.null()) return false; if (fRecvIOVSize<=buf.NumSegments()) AllocateRecvIOV(buf.NumSegments()+1); fRecvUseMsg = fUseMsgOper; fRecvIOVFirst = 0; int indx = 0; if (hdr && (hdrsize > 0)) { fRecvIOV[indx].iov_base = hdr; fRecvIOV[indx].iov_len = hdrsize; indx++; } for (unsigned nseg=0; nseg datasize) segsize = datasize; if (segsize == 0) break; fRecvIOV[indx].iov_base = buf.SegmentPtr(nseg); fRecvIOV[indx].iov_len = segsize; indx++; datasize -= segsize; } fRecvIOVNumber = indx; // TODO: Should we inform thread directly that we want to recv data?? SetDoingInput(true); return true; } bool dabc::SocketIOAddon::StartNetSend(void* hdr, unsigned hdrsize, const Buffer& buf) { if (fSendIOVNumber > 0) { EOUT("Current send operation not yet completed"); return false; } if (buf.null()) return false; if (fSendIOVSize<=buf.NumSegments()) AllocateSendIOV(buf.NumSegments()+1); fSendUseMsg = fUseMsgOper; fSendIOVFirst = 0; int indx = 0; if (hdr && (hdrsize > 0)) { fSendIOV[indx].iov_base = hdr; fSendIOV[indx].iov_len = hdrsize; indx++; } for (unsigned nseg=0; nseg0) fRecvSize += res; #endif if (res == 0) { // DOUT0("Addon:%p socket:%d res == 0 when doing read usemsg %s numseg %u seg0.len %u", this, fSocket, DBOOL(fRecvUseMsg), fRecvIOVNumber - fRecvIOVFirst, fRecvIOV[fRecvIOVFirst].iov_len); OnSocketError(0, "when recvmsg()"); return; } if (res<0) { if (errno!=EAGAIN) { OnSocketError(errno, "when recvmsg()"); } else { // we indicating that we want to receive data but there is nothing to read // why we get message at all? SetDoingInput(true); EOUT("Why socket read message produce but we do not get any data??"); } return; } fLastRecvSize = res; if (IsDatagramSocket()) { // for datagram the only recv message is possible fRecvIOVFirst = 0; fRecvIOVNumber = 0; // if (IsLogging()) // DOUT0("Socket %d signals COMPL", Socket()); OnRecvCompleted(); return; } while (res>0) { struct iovec* rec = &(fRecvIOV[fRecvIOVFirst]); if (rec->iov_len <= (unsigned) res) { // just skip current rec, jump to next res -= rec->iov_len; fRecvIOVFirst++; if (fRecvIOVFirst == fRecvIOVNumber) { if (res != 0) EOUT("Internal error - length after recvmsg() not zero"); fRecvIOVFirst = 0; fRecvIOVNumber = 0; // if (IsLogging()) // DOUT0("Socket %d signals COMPL", Socket()); OnRecvCompleted(); return; } } else { rec->iov_len -= res; rec->iov_base = (char*)rec->iov_base + res; res = 0; } } // there is still some portion of data should be read from the socket, indicate this for the thread SetDoingInput(true); break; } case evntSocketWrite: { if (fSendIOVNumber == 0) return; // nothing to send #ifdef SOCKET_PROFILING fSendOper++; TimeStamp tm1 = dabc::Now(); #endif if ((fSocket < 0) || !fSendIOV) { EOUT("HARD PROBLEM when trying write socket"); } ssize_t res = 0; if (fSendUseMsg) { struct msghdr msg; msg.msg_name = fSendUseAddr ? &fSendAddr : nullptr; msg.msg_namelen = fSendUseAddr ? sizeof(fSendAddr) : 0; msg.msg_iov = &(fSendIOV[fSendIOVFirst]); msg.msg_iovlen = fSendIOVNumber - fSendIOVFirst; msg.msg_control = nullptr; msg.msg_controllen = 0; msg.msg_flags = 0; res = sendmsg(fSocket, &msg, MSG_DONTWAIT | MSG_NOSIGNAL); } else res = send(fSocket, fSendIOV[fSendIOVFirst].iov_base, fSendIOV[fSendIOVFirst].iov_len, MSG_DONTWAIT | MSG_NOSIGNAL); #ifdef SOCKET_PROFILING TimeStamp tm2 = dabc::Now(); fSendTime += (tm2-tm1); if (res>0) fSendSize += res; #endif if (res == 0) { OnSocketError(0, "when sendmsg()"); return; } if (res < 0) { DOUT2("Error when sending via socket %d usemsg %s first %d number %d", fSocket, DBOOL(fSendUseMsg), fSendIOVFirst, fSendIOVNumber); if (errno!=EAGAIN) { OnSocketError(errno, "when sendmsg()"); } else { // we indicating that we want to receive data but there is nothing to read // why we get message at all? SetDoingOutput(true); EOUT("Why socket write message produce but we did not send any bytes?"); } return; } DOUT5("Socket %d send %d bytes", Socket(), res); while (res > 0) { struct iovec* rec = &(fSendIOV[fSendIOVFirst]); if (rec->iov_len <= (unsigned) res) { // just skip current rec, jump to next res -= rec->iov_len; fSendIOVFirst++; if (fSendIOVFirst == fSendIOVNumber) { if (res != 0) EOUT("Internal error - length after sendmsg() not zero"); fSendIOVFirst = 0; fSendIOVNumber = 0; OnSendCompleted(); return; } } else { rec->iov_len -= res; rec->iov_base = (char*)rec->iov_base + res; res = 0; } } // we are informing that there is some data still to send SetDoingOutput(true); break; } default: SocketAddon::ProcessEvent(evnt); } } void dabc::SocketIOAddon::CancelIOOperations() { fSendIOVNumber = 0; fRecvIOVNumber = 0; } // ___________________________________________________________________ dabc::SocketServerAddon::SocketServerAddon(int serversocket, const char *hostname, int portnum, struct addrinfo *info) : SocketConnectAddon(serversocket), fServerHostName(hostname ? hostname : ""), fServerPortNumber(portnum) { if (fServerHostName.empty()) fServerHostName = dabc::SocketThread::DefineHostName(true); if (info) { ai_family = info->ai_family; ai_socktype = info->ai_socktype; ai_protocol = info->ai_protocol; ai_addrlen = info->ai_addrlen; memcpy(&ai_addr, info->ai_addr, info->ai_addrlen); } SetDoingInput(true); listen(Socket(), 10); DOUT2("Create dabc::SocketServerAddon"); } void dabc::SocketServerAddon::ProcessEvent(const EventId& evnt) { switch (evnt.GetCode()) { case evntSocketRead: { // we accept more connections SetDoingInput(true); int connfd = accept(Socket(), nullptr, nullptr); if (connfd < 0) { EOUT("Error with accept in server socket %d", Socket()); fAcceptErrors++; if (fAcceptErrors >= 1000) { // try to recreate socket every 100 failure if (fAcceptErrors % 100 == 0) RecreateSocket(); } if (fAcceptErrors > 2000) { EOUT("Fatal - too many accept errors, abort application"); exit(1); } return; } fAcceptErrors = 0; int err = listen(Socket(), 10); if (err) EOUT("Error %d in listen of server socket %d", err, Socket()); if (!dabc::SocketThread::SetNonBlockSocket(connfd)) { EOUT("Cannot set nonblocking flag for connected socket %d", connfd); close(connfd); return; } DOUT0("Server socket %d get connection %d", Socket(), connfd); OnClientConnected(connfd); break; } default: DOUT0("Server socket %d get event %u", Socket(), (unsigned) evnt.GetCode()); dabc::SocketConnectAddon::ProcessEvent(evnt); } } void dabc::SocketServerAddon::OnClientConnected(int fd) { Command cmd("SocketConnect"); cmd.SetStr("Type", "Server"); cmd.SetInt("fd", fd); if (!fConnRcv.null() && !fConnId.empty()) { cmd.SetStr("ConnId", fConnId); fConnRcv.Submit(cmd); } else if (!fWorker.null()) { ((Worker*) fWorker())->Submit(cmd); } else { EOUT("Method not implemented - socked will be closed"); close(fd); } } bool dabc::SocketServerAddon::RecreateSocket() { if (ai_addrlen == 0) return false; CloseSocket(); int sockfd = socket(ai_family, ai_socktype, ai_protocol); if (sockfd < 0) return false; int opt = 1; setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); if (bind(sockfd, (struct sockaddr *) &ai_addr, ai_addrlen) == 0) { if (dabc::SocketThread::SetNonBlockSocket(sockfd)) { SetSocket(sockfd); return true; } } close(sockfd); return false; } // _____________________________________________________________________ dabc::SocketClientAddon::SocketClientAddon(const struct addrinfo* serv_addr, int fd) : SocketConnectAddon(fd), fRetry(1), fRetryTmout(-1) { if (!serv_addr) { EOUT("Server address not specified"); return; } fServAddr.ai_flags = serv_addr->ai_flags; fServAddr.ai_family = serv_addr->ai_family; fServAddr.ai_socktype = serv_addr->ai_socktype; fServAddr.ai_protocol = serv_addr->ai_protocol; fServAddr.ai_addrlen = serv_addr->ai_addrlen; fServAddr.ai_addr = nullptr; if ((serv_addr->ai_addrlen > 0) && serv_addr->ai_addr) { fServAddr.ai_addr = (sockaddr*) std::malloc(fServAddr.ai_addrlen); if (fServAddr.ai_addr) memcpy(fServAddr.ai_addr, serv_addr->ai_addr, fServAddr.ai_addrlen); else EOUT("Memory allocation error"); } fServAddr.ai_canonname = nullptr; if (serv_addr->ai_canonname) { size_t len = strlen(serv_addr->ai_canonname); fServAddr.ai_canonname = (char *) std::malloc(len + 1); if (fServAddr.ai_canonname) strncpy(fServAddr.ai_canonname, serv_addr->ai_canonname, len+1); else EOUT("Memory allocation error"); } fServAddr.ai_next = nullptr; } dabc::SocketClientAddon::~SocketClientAddon() { // DOUT0("Actual destroy of SocketClientAddon %p worker %p", this, fWorker()); std::free(fServAddr.ai_addr); fServAddr.ai_addr = nullptr; std::free(fServAddr.ai_canonname); fServAddr.ai_canonname = nullptr; } void dabc::SocketClientAddon::SetRetryOpt(int nretry, double tmout) { fRetry = nretry; fRetryTmout = tmout; } void dabc::SocketClientAddon::OnThreadAssigned() { dabc::SocketConnectAddon::OnThreadAssigned(); FireWorkerEvent(evntSocketStartConnect); } void dabc::SocketClientAddon::ProcessEvent(const EventId& evnt) { switch (evnt.GetCode()) { case evntSocketWrite: { // we could get write event after socket was closed by error - ignore such event if (Socket() <= 0) return; // we can check if connection established int myerrno = TakeSocketError(); if (myerrno == 0) { DOUT5("Connection done %7.5f", dabc::Now().AsDouble()); int fd = TakeSocket(); OnConnectionEstablished(fd); return; } DOUT3("Postponed connect socket err:%d %s", myerrno, SocketErr(myerrno)); break; } case evntSocketError: { // int myerrno = TakeSocketError(); // DOUT3("Doing connect socket err:%d %s", myerrno, SocketErr(myerrno)); break; } case evntSocketStartConnect: { // start next attempt for connection DOUT3("Start next connect attempt sock:%d", Socket()); if (Socket() <= 0) { int fd = socket(fServAddr.ai_family, fServAddr.ai_socktype, fServAddr.ai_protocol); if (fd<=0) EOUT("Cannot create socket with given address"); else SetSocket(fd); } if (Socket() > 0) { dabc::SocketThread::SetNonBlockSocket(Socket()); int res = connect(Socket(), fServAddr.ai_addr, fServAddr.ai_addrlen); if (res == 0) { int fd = TakeSocket(); OnConnectionEstablished(fd); return; } if (errno==EINPROGRESS) { DOUT3("Connection in progress %7.5f", dabc::Now().AsDouble()); SetDoingOutput(true); return; } DOUT3("When calling connection socket err:%d %s", errno, SocketErr(errno)); } break; } default: dabc::SocketConnectAddon::ProcessEvent(evnt); return; } SetDoingOutput(false); CloseSocket(); if (--fRetry > 0) { DOUT3("Try connect after %5.1f s n:%d", fRetryTmout, fRetry); ActivateTimeout(fRetryTmout > 0. ? fRetryTmout : 0.); } else { OnConnectionFailed(); } } double dabc::SocketClientAddon::ProcessTimeout(double) { // activate connection start again FireWorkerEvent(evntSocketStartConnect); return -1.; } void dabc::SocketClientAddon::OnConnectionEstablished(int fd) { Command cmd("SocketConnect"); cmd.SetStr("Type", "Client"); cmd.SetInt("fd", fd); cmd.SetStr("ConnId", fConnId); if (!fWorker.null() && IsDeliverEventsToWorker()) { ((Worker*) fWorker())->Submit(cmd); return; } if (!fConnRcv.null()) { fConnRcv.Submit(cmd); } else { EOUT("Connection established, but not processed - close socket"); close(fd); } DeleteWorker(); } void dabc::SocketClientAddon::OnConnectionFailed() { Command cmd("SocketConnect"); cmd.SetStr("Type", "Error"); cmd.SetStr("ConnId", fConnId); if (!fWorker.null() && IsDeliverEventsToWorker()) { ((Worker*) fWorker())->Submit(cmd); return; } if (!fConnRcv.null()) { fConnRcv.Submit(cmd); } else { EOUT("Connection failed to establish, error not processed"); } DeleteWorker(); } // _______________________________________________________________________ dabc::SocketThread::SocketThread(Reference parent, const std::string &name, Command cmd) : dabc::Thread(parent, name, cmd), fPipeFired(false), fWaitFire(false), fScalerCounter(10), f_sizeufds(0), f_ufds(nullptr), f_recs(nullptr), fIsAnySocket(false), fCheckNewEvents(true), fBalanceCnt(0) { #ifdef SOCKET_PROFILING fWaitCalls = 0; fWaitDone = 0; fWaitTime = 0; fFillTime = 0; fPipeCalled = 0; #endif fPipe[0] = 0; fPipe[1] = 0; auto res = pipe(fPipe); (void) res; // ignore compiler warnings // by this call we rebuild ufds array, for now only for the pipe WorkersSetChanged(); } dabc::SocketThread::~SocketThread() { // !!!!!! we should stop thread before destroying anything while // thread may use some structures in the MainLoop !!!!!!!! DOUT3("~~~~~~~~~~~~~~ SOCKThread %s destructor with timeout %3.1fs", GetName(), GetStopTimeout()); Stop(GetStopTimeout()); // JAM 6.7.2017 - try with larger timeout for ltsm if (fPipe[0] != 0) { close(fPipe[0]); fPipe[0] = 0; } if (fPipe[1] != 0) { close(fPipe[1]); fPipe[1] = 0; } if (f_ufds) { delete[] f_ufds; delete[] f_recs; f_ufds = nullptr; f_recs = nullptr; f_sizeufds = 0; } #ifdef SOCKET_PROFILING DOUT1("Thrd:%s Wait called %ld done %ld ratio %5.3f %s Pipe:%ld", GetName(), fWaitCalls, fWaitDone, (fWaitCalls>0 ? 100.*fWaitDone/fWaitCalls : 0.) ,"%", fPipeCalled); if (fWaitDone>0) DOUT1("Aver times fill:%5.1f microsec wait:%5.1f microsec", fFillTime*1e6/fWaitDone, fWaitTime*1e6/fWaitDone); #endif } bool dabc::SocketThread::CompatibleClass(const std::string &clname) const { if (Thread::CompatibleClass(clname)) return true; return clname == typeSocketThread; } bool dabc::SocketThread::SetNonBlockSocket(int fd) { int opts = fcntl(fd, F_GETFL); if (opts < 0) { EOUT("fcntl(F_GETFL) failed"); return false; } opts = (opts | O_NONBLOCK); if (fcntl(fd, F_SETFL,opts) < 0) { EOUT("fcntl(F_SETFL) failed"); return false; } return true; } bool dabc::SocketThread::SetNoDelaySocket(int fd) { int one = 1; int res = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)); return res == 0; } std::string dabc::SocketThread::DefineHostName(bool force) { std::string host = dabc::Configuration::GetLocalHost(); if (host.empty() && force) { char sbuf[500]; if (gethostname(sbuf, sizeof(sbuf))) { EOUT("Error to get local host name"); host = "localhost"; } else host = sbuf; } return host; } dabc::SocketServerAddon* dabc::SocketThread::CreateServerAddon(const std::string &host, int nport, int portmin, int portmax) { char nameinfo[1024], serviceinfo[1024]; int numtests = 1; // at least test value of nport if ((portmin>0) && (portmax>0) && (portmin<=portmax)) numtests+=(portmax-portmin+1); const char *hostname = host.empty() ? nullptr : host.c_str(); SocketServerAddon *addon = nullptr; for (int ntest = 0; ntest < numtests; ntest++) { int serviceid = (ntest == 0) ? nport : portmin - 1 + ntest; if (serviceid < 0) continue; struct addrinfo hints, *info = nullptr; memset(&hints, 0, sizeof(hints)); hints.ai_flags = AI_PASSIVE; hints.ai_family = AF_UNSPEC; //AF_INET; hints.ai_socktype = SOCK_STREAM; char service[100]; snprintf(service, sizeof(service), "%d", serviceid); int n = getaddrinfo(hostname, service, &hints, &info); DOUT2("GetAddrInfo %s:%s res = %d", host.c_str(), service, n); if (n < 0) { EOUT("Cannot get addr info for service %s:%s", host.c_str(), service); continue; } int sockfd = -1; for (struct addrinfo *t = info; t; t = t->ai_next) { sockfd = socket(t->ai_family, t->ai_socktype, t->ai_protocol); if (sockfd < 0) continue; int opt = 1; setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); if (bind(sockfd, t->ai_addr, t->ai_addrlen) == 0) { int ni = getnameinfo(t->ai_addr, t->ai_addrlen, nameinfo, sizeof(nameinfo), serviceinfo, sizeof(serviceinfo), /*NI_NUMERICHOST | NI_NUMERICSERV */ NI_NOFQDN | NI_NUMERICSERV); if (host.empty() && (ni == 0) && (strcmp(nameinfo,"0.0.0.0") != 0)) hostname = nameinfo; if (dabc::SocketThread::SetNonBlockSocket(sockfd)) { addon = new SocketServerAddon(sockfd, hostname ? hostname : "localhost", serviceid, t); break; } } close(sockfd); sockfd = -1; } freeaddrinfo(info); if (addon) return addon; } EOUT("Cannot bind server socket to port %d or find its in range %d:%d", nport, portmin, portmax); return nullptr; } int dabc::SocketThread::StartClient(const std::string &host, int nport, bool nonblocking) { char service[100]; snprintf(service, sizeof(service), "%d", nport); struct addrinfo hints, *info = nullptr; memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; int sockfd = -1; if (getaddrinfo(host.c_str(), service, &hints, &info) != 0) return sockfd; for (struct addrinfo *t = info; t; t = t->ai_next) { sockfd = socket(t->ai_family, t->ai_socktype, t->ai_protocol); if (sockfd<=0) { sockfd = -1; continue; } if (connect(sockfd, t->ai_addr, t->ai_addrlen) == 0) { if (!nonblocking) break; if (dabc::SocketThread::SetNonBlockSocket(sockfd)) break; // socket is initialized - one could return it else EOUT("Cannot set non-blocking flag for client socket"); } close(sockfd); sockfd = -1; } // always must be called freeaddrinfo(info); return sockfd; } int dabc::SocketThread::SendBuffer(int fd, void* buf, int len) { return send(fd, buf, len, MSG_NOSIGNAL); } int dabc::SocketThread::RecvBuffer(int fd, void* buf, int len) { return recv(fd, buf, len, MSG_NOSIGNAL); } bool dabc::SocketThread::AttachMulticast(int socket_descriptor, const std::string &host) { if (host.empty() || (socket_descriptor<=0)) { EOUT("Multicast address or socket handle not specified"); return false; } struct hostent *server_host_name = gethostbyname(host.c_str()); if (!server_host_name) { EOUT("Cannot get host information for %s", host.c_str()); return false; } struct ip_mreq command; // Allow to use same port by many processes // Allow to receive broadcast to this port int loop = 1; if (setsockopt (socket_descriptor, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof (loop)) < 0) { EOUT("Fail setsockopt IP_MULTICAST_LOOP"); return false; } // Join the multicast group command.imr_multiaddr.s_addr = inet_addr (host.c_str()); command.imr_interface.s_addr = htonl (INADDR_ANY); if (command.imr_multiaddr.s_addr == (in_addr_t)-1) { EOUT("%s is not valid address", host.c_str()); return false; } if (setsockopt(socket_descriptor, IPPROTO_IP, IP_ADD_MEMBERSHIP, &command, sizeof (command)) < 0) { EOUT("File setsockopt IP_ADD_MEMBERSHIP"); return false; } return true; } void dabc::SocketThread::DettachMulticast(int handle, const std::string &host) { if ((handle<0) || host.empty()) return; struct ip_mreq command; command.imr_multiaddr.s_addr = inet_addr (host.c_str()); command.imr_interface.s_addr = htonl (INADDR_ANY); // Remove socket from multicast group if (setsockopt (handle, IPPROTO_IP, IP_DROP_MEMBERSHIP, &command, sizeof (command)) < 0 ) { EOUT("Fail setsockopt:IP_DROP_MEMBERSHIP"); } } int dabc::SocketThread::CreateUdp() { int fd = socket(PF_INET, SOCK_DGRAM, 0); if (fd<0) return -1; if (SetNonBlockSocket(fd)) return fd; close(fd); return -1; } void dabc::SocketThread::CloseUdp(int fd) { if (fd>0) close(fd); } int dabc::SocketThread::BindUdp(int fd, int nport, int portmin, int portmax) { if (fd < 0) return -1; struct sockaddr_in m_addr; int numtests = 1; // at least test value of portnum if ((portmin>0) && (portmax>0) && (portmin<=portmax)) numtests+=(portmax-portmin+1); for(int ntest=0;ntest 0) nport = portmin - 1 + ntest; memset(&m_addr, 0, sizeof(m_addr)); m_addr.sin_family = AF_INET; // m_addr.s_addr = htonl (INADDR_ANY); m_addr.sin_port = htons(nport); if (bind(fd, (struct sockaddr *)&m_addr, sizeof(m_addr)) == 0) return nport; } return -1; } int dabc::SocketThread::ConnectUdp(int fd, const std::string &remhost, int remport) { if (fd < 0) return fd; struct hostent *host = gethostbyname(remhost.c_str()); if (!host || (host->h_addrtype != AF_INET) || remhost.empty()) { EOUT("Cannot get host information for %s", remhost.c_str()); close(fd); return -1; } struct sockaddr_in address; memset (&address, 0, sizeof (address)); address.sin_family = AF_INET; memcpy(&address.sin_addr.s_addr, host->h_addr_list[0], host->h_length); address.sin_port = htons (remport); if (connect(fd, (struct sockaddr *) &address, sizeof (address)) < 0) { EOUT("Fail to connect to host %s port %d", remhost.c_str(), remport); close(fd); return -1; } return fd; } dabc::SocketClientAddon* dabc::SocketThread::CreateClientAddon(const std::string &serverid, int dflt_port) { if (serverid.empty()) return nullptr; std::string host, service; size_t pos = serverid.find(':'); if (pos != std::string::npos) { host = serverid.substr(0, pos); service = serverid.substr(pos+1, serverid.length()-pos); } else if (dflt_port > 0) { host = serverid; service = dabc::format("%d", dflt_port); } else { return nullptr; } SocketClientAddon *addon = nullptr; DOUT5("CreateClientAddon %s:%s", host.c_str(), service.c_str()); struct addrinfo *info; struct addrinfo hints; memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; if (getaddrinfo(host.c_str(), service.c_str(), &hints, &info) == 0) { for (struct addrinfo *t = info; t; t = t->ai_next) { int sockfd = socket(t->ai_family, t->ai_socktype, t->ai_protocol); if (sockfd<=0) continue; addon = new SocketClientAddon(t, sockfd); break; } freeaddrinfo(info); } DOUT5("CreateClientAddon %s:%s done res = %p", host.c_str(), service.c_str(), addon); return addon; } void dabc::SocketThread::_Fire(const dabc::EventId& arg, int nq) { DOUT5("SocketThread::_Fire %s nq:%d numq:%d waiting:%s", GetName(), nq, fNumQueues, DBOOL(fWaitFire)); _PushEvent(arg, nq); if (fWaitFire && !fPipeFired) { auto res = write(fPipe[1], "w", 1); (void) res; // suppress compiler warnings fPipeFired = true; #ifdef SOCKET_PROFILING fPipeCalled++; #endif } } bool dabc::SocketThread::WaitEvent(EventId& evnt, double tmout_sec) { // first check, if we have already event, which must be processed #ifdef SOCKET_PROFILING fWaitCalls++; TimeStamp tm1 = dabc::Now(); #endif #ifdef DABC_EXTRA_CHECKS unsigned sizebefore = 0, sizeafter = 0; #endif { dabc::LockGuard lock(ThreadMutex()); #ifdef DABC_EXTRA_CHECKS sizebefore = _TotalNumberOfEvents(); #endif // if we already have events in the queue, // check if we take them out or first check if new sockets events there if (_TotalNumberOfEvents()>0) { if (!fCheckNewEvents) return _GetNextEvent(evnt); // we have events in the queue, therefore do not wait - just check new events tmout_sec = 0.; } if (!f_ufds) return false; fWaitFire = true; } // here we wait for next event from any socket, including pipe int numufds = 1; f_ufds[0].fd = fPipe[0]; f_ufds[0].events = POLLIN; f_ufds[0].revents = 0; for(unsigned n=1; naddon; if (addon->Socket()<=0) continue; short events = 0; if (addon->IsDoingInput()) events |= POLLIN; if (addon->IsDoingOutput()) events |= POLLOUT; if (events == 0) continue; f_ufds[numufds].fd = addon->Socket(); f_ufds[numufds].events = events; f_ufds[numufds].revents = 0; f_recs[numufds].indx = n; // this is for dereferencing of the value numufds++; } int tmout = tmout_sec < 0. ? -1 : int(tmout_sec*1000.); #ifdef SOCKET_PROFILING fWaitDone++; TimeStamp tm2 = dabc::Now(); fFillTime += (tm2-tm1); #endif // DOUT2("SOCKETTHRD: start waiting %d", tmout); // DOUT0("SocketThread %s (%d) wait with timeout %d ms numufds %d", GetName(), entry_cnt, tmout, numufds); int poll_res = poll(f_ufds, numufds, tmout); #ifdef SOCKET_PROFILING TimeStamp tm3 = dabc::Now(); fWaitTime += (tm3-tm2); #endif dabc::LockGuard lock(ThreadMutex()); // DOUT2("SOCKETTHRD: did waiting %d numevents %u", tmout, _TotalNumberOfEvents()); fWaitFire = false; // cleanup pipe in bigger steps if (fPipeFired) { char sbuf; auto res = read(fPipe[0], &sbuf, 1); (void) res; // suppress compiler warnings fPipeFired = false; } bool isany = false; // if we really has any events, analyze all of them and push in the queue if (poll_res>0) for (int imn=1; imnaddon; Worker* worker = fWorkers[f_recs[n].indx]->work; if (!addon || !worker) { EOUT("Something went wrong - socket addon=%p worker = %p, something is gone", addon, worker); exit(543); } if (f_ufds[n].revents & (POLLERR | POLLHUP | POLLNVAL)) { // EOUT("Error on the socket %d", f_ufds[n].fd); _PushEvent(EventId(SocketAddon::evntSocketError, f_recs[n].indx), 0); addon->SetDoingInput(false); addon->SetDoingOutput(false); IncWorkerFiredEvents(worker); isany = true; } if (f_ufds[n].revents & (POLLIN | POLLPRI)) { _PushEvent(EventId(SocketAddon::evntSocketRead, f_recs[n].indx), addon->fIOPriority); addon->SetDoingInput(false); IncWorkerFiredEvents(worker); isany = true; } if (f_ufds[n].revents & POLLOUT) { _PushEvent(EventId(SocketAddon::evntSocketWrite, f_recs[n].indx), addon->fIOPriority); addon->SetDoingOutput(false); IncWorkerFiredEvents(worker); isany = true; } } // DOUT0("SocketThread %s (%d) did wait with res %d isany %s", GetName(), entry_cnt, poll_res, DBOOL(isany)); // we put additional event to enable again sockets checking if (isany) { fCheckNewEvents = false; _PushEvent(evntEnableCheck, 1); fBalanceCnt = (fBalanceCnt + 1) % 100000; // just use big number, fWorker.size() is not proper here } #ifdef DABC_EXTRA_CHECKS sizeafter = _TotalNumberOfEvents(); // if (sizeafter-sizebefore > 1) DOUT0("Thread:%s before:%u after:%u diff:%u", GetName(), sizebefore, sizeafter, sizeafter - sizebefore); #endif return _GetNextEvent(evnt); } void dabc::SocketThread::ProcessExtraThreadEvent(const EventId& evid) { if (evid.GetCode() == evntEnableCheck) { fCheckNewEvents = true; return; } dabc::Thread::ProcessExtraThreadEvent(evid); } void dabc::SocketThread::WorkersSetChanged() { unsigned new_sz = fWorkers.size(); if (new_sz > f_sizeufds) { delete[] f_ufds; delete[] f_recs; f_ufds = new pollfd [new_sz]; f_recs = new ProcRec [new_sz]; f_sizeufds = new_sz; } memset(f_ufds, 0, sizeof(pollfd) * f_sizeufds); memset(f_recs, 0, sizeof(ProcRec) * f_sizeufds); f_recs[0].use = true; f_recs[0].indx = 0; fIsAnySocket = false; // DOUT0("SocketThread %s WorkersNumberChanged size %u", GetName(), fWorkers.size()); for (unsigned indx=1;indx (fWorkers[indx]->addon); f_recs[indx].use = addon != nullptr; if (addon) { fIsAnySocket = true; // DOUT0("Socket %d doing input=%s output=%s", addon->Socket(), DBOOL(addon->IsDoingInput()), DBOOL(addon->IsDoingOutput())); } } // any time new processor is added, check for new socket events fCheckNewEvents = fIsAnySocket; // DOUT0("SocketThread %s WorkersNumberChanged %u done", GetName(), fWorkers.size()); }