#ifndef multithread_h #define multithread_h #include using std::map; #include "tbb/blocked_range.h" #include "tbb/task_scheduler_observer.h" #include "tbb/spin_mutex.h" #include "tbb/tick_count.h" // #define DEBUG_THREADS using namespace tbb; typedef spin_mutex TMyMutex; TMyMutex mutex; #ifdef DEBUG_THREADS typedef spin_mutex TMyMutex2; TMyMutex2 mutex2; typedef spin_mutex TMyMutex3; TMyMutex3 mutex3; #endif // DEBUG_THREADS int threads_counter = -1; // for count number of current threads. Need for cpu-thread correspondence making. tick_count time0 = tick_count::now(); // reference time map threadToCpuMap; // let get cpuId by threadId map threadNumberToCpuMap; // let get cpuId by threadNumber (see threads_counter) class TMyObserver: public task_scheduler_observer { public: void FInit(); // set cpu - thread correspondence protected: void on_scheduler_entry(bool Is_worker); // run at begin of each thread execution void on_scheduler_exit(bool Is_worker); // run at end of each thread execution }; /// set cpu - thread correspondence void TMyObserver::FInit() { // default // for (int i=0; i<16; i+=1){ // threadNumberToCpuMap[i+0] = 15-i; // }; for (int i=0; i<8; i++){ threadNumberToCpuMap[2*i+0] = i; threadNumberToCpuMap[2*i+1] = i+8; }; // // // for (int i=0; i<16; i+=8){ // threadNumberToCpuMap[i+0] = 2; // threadNumberToCpuMap[i+1] = 3; // threadNumberToCpuMap[i+2] = 6; // threadNumberToCpuMap[i+3] = 7; // threadNumberToCpuMap[i+4] = 10; // threadNumberToCpuMap[i+5] = 11; // threadNumberToCpuMap[i+6] = 14; // threadNumberToCpuMap[i+7] = 15; // }; // // for (int i=0; i<16; i+=8){ // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 5; // threadNumberToCpuMap[i+2] = 6; // threadNumberToCpuMap[i+3] = 7; // threadNumberToCpuMap[i+4] = 12; // threadNumberToCpuMap[i+5] = 13; // threadNumberToCpuMap[i+6] = 14; // threadNumberToCpuMap[i+7] = 15; // }; // // for (int i=0; i<16; i+=4){ //70% overhead // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 5; // threadNumberToCpuMap[i+2] = 6; // threadNumberToCpuMap[i+3] = 7; // }; // // for (int i=0; i<16; i+=4){ //70% overhead // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 5; // threadNumberToCpuMap[i+2] = 12; // threadNumberToCpuMap[i+3] = 13; // }; // // for (int i=0; i<16; i+=4){ //70% overhead // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 5; // threadNumberToCpuMap[i+2] = 14; // threadNumberToCpuMap[i+3] = 15; // }; // // for (int i=0; i<16; i+=4){ //70% overhead // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 6; // threadNumberToCpuMap[i+2] = 13; // threadNumberToCpuMap[i+3] = 15; // }; // // for (int i=0; i<16; i+=4){ //25% overhead // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 5; // threadNumberToCpuMap[i+2] = 10; // threadNumberToCpuMap[i+3] = 11; // }; // // for (int i=0; i<16; i+=2){ //11% overhead // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 11; // }; // // for (int i=0; i<16; i+=2){ //11% overhead // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 3; // }; // // for (int i=0; i<16; i+=2){ //10% overhead // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 9; // }; // for (int i=0; i<16; i+=2){ //7% overhead // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 15; // }; // // for (int i=0; i<16; i+=2){ //7% overhead // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 7; // }; // for (int i=0; i<16; i+=2){ // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 12; // }; // // for (int i=0; i<16; i+=2){ //7% overhead // threadNumberToCpuMap[i+0] = 2; // threadNumberToCpuMap[i+1] = 3; // }; // for (int i=0; i<16; i+=2){ //7% overhead // threadNumberToCpuMap[i+0] = 2; // threadNumberToCpuMap[i+1] = 4; // }; } // TMyObserver::FInit() /// redefine function, which will be run at begining of execution of each thread #define handle_error_en(en, msg) do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0) void TMyObserver::on_scheduler_entry(bool Is_worker) { pthread_t I = pthread_self(); TMyMutex::scoped_lock lock; lock.acquire(mutex); ++threads_counter; int cpuId = threadNumberToCpuMap[threads_counter%/*L1Algo::nthreads*/L1AlgoInter::nCores]; // int cpuId = 4+((threads_counter/4)%2)*8+threads_counter%4; // int cpuId = 2+((threads_counter/2)%4)*4+threads_counter%2; // int cpuId = 15-threads_counter%8; #ifdef DEBUG_THREADS cout << "ThrId=" << I << " thread have been created " << threads_counter << "-th."; cout << " And was run on cpu " << cpuId << endl; #endif // DEBUG_THREADS lock.release(); threadToCpuMap[I] = cpuId; int s, j; cpu_set_t cpuset; pthread_t thread = I; CPU_ZERO(&cpuset); CPU_SET(cpuId, &cpuset); // cout << "before" << endl; //FIXME: segmentation fault somethere. s = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset); if (s != 0){ cout << " pthread_setaffinity_np " << endl; handle_error_en(s, "pthread_setaffinity_np");} // cout << "end set for " << cpuId << endl; // s = pthread_getaffinity_np(thread, sizeof(cpu_set_t), &cpuset); // // if (s != 0) handle_error_en(s, "pthread_getaffinity_np"); // // // printf("Set returned by pthread_getaffinity_np() contained:\n"); // for (j = 0; j < CPU_SETSIZE; j++) // if (CPU_ISSET(j, &cpuset)) // printf(" CPU %d uses\n ", j); }; // TMyObserver::on_scheduler_entry(bool Is_worker) /// run at end of each thread execution void TMyObserver::on_scheduler_exit(bool Is_worker) //FIXME: don't run { // pthread_t I = pthread_self(); // cout << "Thread with number " << I << " was ended " << threads_counter; // --threads_counter; }; // TMyObserver::on_scheduler_exit(bool Is_worker) // inline void FFF(){}; inline void FFF(int thrId){ pthread_t I = pthread_self(); TMyMutex::scoped_lock lock; lock.acquire(mutex); ++threads_counter; int cpuId = thrId + threadNumberToCpuMap[threads_counter%L1Algo::nthreads]; lock.release(); // threadToCpuMap[I] = cpuId; int s; cpu_set_t cpuset; pthread_t thread = I; CPU_ZERO(&cpuset); CPU_SET(cpuId, &cpuset); s = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset); if (s != 0){ cout << " pthread_setaffinity_np " << endl; handle_error_en(s, "pthread_setaffinity_np");} } class ApplyReco{ L1Algo *const algos; int nEvPerThr; public: void operator()(const blocked_range &range) const { for(int i = range.begin(); i != range.end(); ++i){ for (int j = 0; j < nEvPerThr; j++){ algos[i*L1AlgoInter::maxNEvents+j].CATrackFinder(); // cout << i << " " << j << endl; } } } ApplyReco(L1Algo *const _algos, const int _nEvPerThr): algos(_algos),nEvPerThr(_nEvPerThr){} ~ApplyReco() { } }; #undef DEBUG_THREADS #endif // multithread_h