// $Id$ /************************************************************ * The Data Acquisition Backbone Core (DABC) * ************************************************************ * Copyright (C) 2009 - * * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Planckstr. 1, 64291 Darmstadt, Germany * * Contact: http://dabc.gsi.de * ************************************************************ * This software can be used under the GPL license * * agreements as stated in LICENSE.txt file * * which is part of the distribution. * ************************************************************/ #include "dabc/threads.h" #include #include #include #include #if defined(__MACH__) /* Apple OSX section */ // try to provide dummy wrapper for all using functions around affinity void CPU_ZERO(cpu_set_t *arg) { arg->flag = 0; } void CPU_SET(int cpu, cpu_set_t *arg) { arg->flag |= (1<flag & (1<flag = arg->flag & ~(1<flag = 0xFFFF; return 0; } int sched_setaffinity(int, int, cpu_set_t*) { return 0; } #endif dabc::Mutex::Mutex(bool recursive) { if (recursive) { pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); pthread_mutex_init(&fMutex, &attr); pthread_mutexattr_destroy(&attr); } else { pthread_mutex_init(&fMutex, nullptr); } } bool dabc::Mutex::TryLock() { return pthread_mutex_trylock(&fMutex) != EBUSY; } bool dabc::Mutex::IsLocked() { int res = pthread_mutex_trylock(&fMutex); if (res==EBUSY) return true; pthread_mutex_unlock(&fMutex); return false; } //_____________________________________________________________ bool dabc::MutexPtr::TryLock() { return fMutex ? pthread_mutex_trylock(fMutex) != EBUSY : false; } bool dabc::MutexPtr::IsLocked() { if (!fMutex) return false; int res = pthread_mutex_trylock(fMutex); if (res == EBUSY) return true; pthread_mutex_unlock(fMutex); return false; } //_____________________________________________________________ dabc::Condition::Condition(Mutex* ext_mtx) : fInternCondMutex(), fCondMutex(ext_mtx ? ext_mtx : &fInternCondMutex), fFiredCounter(0), fWaiting(false) { pthread_cond_init(&fCond, nullptr); } dabc::Condition::~Condition() { pthread_cond_destroy(&fCond); } bool dabc::Condition::_DoWait(double wait_seconds) { // mutex must be already locked at this point // meaning of argument // wait_seconds > 0 - time to wait // wait_seconds = 0 - do not wait, just check if condition was fired // wait_seconds < 0 - wait forever until condition is fired if (fFiredCounter == 0) { if (wait_seconds < 0.) { fWaiting = true; pthread_cond_wait(&fCond, &fCondMutex->fMutex); fWaiting = false; } else if (wait_seconds > 0.) { struct timeval tp; gettimeofday(&tp, nullptr); long wait_microsec = long(wait_seconds*1e6); tp.tv_sec += (wait_microsec + tp.tv_usec) / 1000000; tp.tv_usec = (wait_microsec + tp.tv_usec) % 1000000; struct timespec tsp = { tp.tv_sec, tp.tv_usec*1000 }; fWaiting = true; pthread_cond_timedwait(&fCond, &fCondMutex->fMutex, &tsp); fWaiting = false; } } if (fFiredCounter > 0) { fFiredCounter--; return true; } return false; } //_____________________________________________________________ dabc::Runnable::~Runnable() { DOUT5("dabc::Runnable::~Runnable destructor %p", this); } extern "C" void CleanupRunnable(void* abc) { dabc::Runnable *run = (dabc::Runnable*) abc; if (run) run->RunnableCancelled(); } extern "C" void* StartTRunnable(void* abc) { dabc::Runnable *run = (dabc::Runnable*) abc; void* res = nullptr; if (run) { pthread_cleanup_push(CleanupRunnable, abc); res = run->MainLoop(); pthread_cleanup_pop(1); } pthread_exit(res); } // ==================================================================== cpu_set_t dabc::PosixThread::fSpecialSet; cpu_set_t dabc::PosixThread::fDfltSet; bool dabc::PosixThread::SetDfltAffinity(const char *aff) { CPU_ZERO(&fSpecialSet); CPU_ZERO(&fDfltSet); if (!aff || (*aff == 0)) return true; if ((*aff == '-') && (strlen(aff) > 1)) { unsigned numspecial = 0; if (!str_to_uint(aff+1, &numspecial) || (numspecial == 0)) { EOUT("Wrong default affinity format %s", aff); return false; } int res = sched_getaffinity(0, sizeof(fDfltSet), &fDfltSet); if (res != 0) { EOUT("sched_getaffinity res = %d", res); return false; } unsigned numset = 0; for (int cpu=0;cpunumset-numspecial) { CPU_CLR(cpu, &fDfltSet); CPU_SET(cpu, &fSpecialSet); } } res = sched_setaffinity(0, sizeof(fDfltSet), &fDfltSet); if (res != 0) { EOUT("sched_setaffinity failed res = %d", res); return false; } return true; } if ((*aff == 'o') || (*aff == 'x') || (*aff == 's')) { unsigned cpu = 0; const char *curr = aff; bool isany = false; while ((*curr != 0) && (cpu1)) { unsigned specialid = 0, numspecial = 0; if (!str_to_uint(aff+1, &specialid)) { EOUT("Wrong affinity format %s", aff); return false; } for (unsigned cpu=0;cpu= numspecial) { EOUT("Where are only %u special processors, cannot assigned id %u", numspecial, specialid); return false; } return true; } if ((*aff=='o') || (*aff=='x')) { unsigned cpu = 0; const char *curr = aff; bool isany = false; while ((*curr != 0) && (cpu= 200112L pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &fCpuSet); #endif pthread_create(&fThrd, &attr, StartTRunnable, run); pthread_attr_destroy(&attr); } } void dabc::PosixThread::Start(StartRoutine* func, void* args) { if (!func) return; pthread_create(&fThrd, nullptr, func, args); } void dabc::PosixThread::Join() { void* res = nullptr; pthread_join(fThrd, &res); } void dabc::PosixThread::SetPriority(int prio) { struct sched_param thread_param; int ret = 0; memset(&thread_param, 0, sizeof(thread_param)); thread_param.sched_priority = prio; ret = pthread_setschedparam(fThrd, (prio>0) ? SCHED_FIFO : SCHED_OTHER, &thread_param); if (ret != 0) EOUT("pthread_setschedparam ret = %d %d %d %d %d\n", ret, (ret==EPERM), (ret==ESRCH), (ret==EINVAL), (ret==EFAULT)); } void dabc::PosixThread::Kill(int sig) { pthread_kill(fThrd, sig); } void dabc::PosixThread::Cancel() { pthread_cancel(fThrd); } void dabc::PosixThread::SetThreadName(const char *thrdname) { #if !defined(__MACH__) pthread_setname_np(fThrd, thrdname); #endif } bool dabc::PosixThread::GetDfltAffinity(char* buf, unsigned maxbuf) { unsigned last = 0; if (maxbuf == 0) return false; for (unsigned cpu=0;cpu= 200112L int s; pthread_attr_t attr; s = pthread_getattr_np(pthread_self(), &attr); if (s != 0) { EOUT("pthread_getattr_np failed"); return false; } s = pthread_attr_getaffinity_np(&attr, sizeof(cpu_set_t), &mask); if (s != 0) EOUT("pthread_attr_getaffinity_np failed"); s = pthread_attr_destroy(&attr); if (s != 0) EOUT("pthread_attr_destroy failed"); arg = &mask; #else buf[0] = 0; return false; #endif } unsigned last = 0; for (unsigned cpu=0;cpu