#ifndef multithread_h #define multithread_h #include using std::map; #include "tbb/blocked_range.h" #include "tbb/task_scheduler_init.h" #include "tbb/task_scheduler_observer.h" #include "tbb/parallel_for.h" #include "tbb/spin_mutex.h" #include "tbb/tick_count.h" #include "pthread.h" #include "Performance.h" #include "InputDataArray.h" #include "L1AlgoInputMCData.h" #include "L1Algo.h" // #define DEBUG_THREADS using namespace tbb; typedef spin_mutex TMyMutex; TMyMutex mutex; #ifdef DEBUG_THREADS typedef spin_mutex TMyMutex2; TMyMutex2 mutex2; typedef spin_mutex TMyMutex3; TMyMutex3 mutex3; #endif // DEBUG_THREADS tick_count time0 = tick_count::now(); // reference time //map threadToCpuMap; // let get cpuId by threadId //map threadNumberToCpuMap; // let get cpuId by threadNumber (see threads_counter) class TMyObserver: public task_scheduler_observer { public: TMyObserver():threads_counter(-1) {}; void FInit(); // set cpu - thread correspondence void InitThreadsCounter() { threads_counter = -1; } protected: void on_scheduler_entry(bool Is_worker); // run at begin of each thread execution void on_scheduler_exit(bool Is_worker); // run at end of each thread execution int threads_counter; // for count number of current threads. Need for cpu-thread correspondence making. }; /// set cpu - thread correspondence void TMyObserver::FInit() { // default // for (int i=0; i<16; i+=1){ // threadNumberToCpuMap[i+0] = 15-i; // }; // for (int i=0; i<48; i+=1){ // threadNumberToCpuMap[i+0] = 47-i; // }; #if 0 //lxir039 for (int i=0; i<8; i++){ threadNumberToCpuMap[2*i+0] = 15-i; threadNumberToCpuMap[2*i+1] = 15-(i+8); } #endif #if 1 //lxir075 // for(int iProc=0; iProc<4; iProc++) // { // for(int i=0; i<8; i++){ // threadNumberToCpuMap[2*i+0 + iProc*20] = 4*i + iProc; // threadNumberToCpuMap[2*i+1 + iProc*20] = 4*i + 32 + iProc; // } // for(int i=0; i<2; i++){ // threadNumberToCpuMap[2*i+0 + 16 + iProc*20] = 4*i + iProc + 64; // threadNumberToCpuMap[2*i+1 + 16 + iProc*20] = 4*i + 8 + iProc + 64; // } // } #endif /* threadNumberToCpuMap[0] = 0; */ /* threadNumberToCpuMap[1] = 8; */ /* threadNumberToCpuMap[2] = 4; */ /* threadNumberToCpuMap[3] = 12; */ /* threadNumberToCpuMap[4] = 1; */ /* threadNumberToCpuMap[5] = 9; */ /* threadNumberToCpuMap[6] = 5; */ /* threadNumberToCpuMap[7] = 13; */ /* threadNumberToCpuMap[8] = 2; */ /* threadNumberToCpuMap[9] = 10; */ /* threadNumberToCpuMap[10] = 6; */ /* threadNumberToCpuMap[11] = 14; */ /* threadNumberToCpuMap[12] = 3; */ /* threadNumberToCpuMap[13] = 11; */ /* threadNumberToCpuMap[14] = 7; */ /* threadNumberToCpuMap[15] = 15; */ // // // for (int i=0; i<16; i+=8){ // threadNumberToCpuMap[i+0] = 2; // threadNumberToCpuMap[i+1] = 3; // threadNumberToCpuMap[i+2] = 6; // threadNumberToCpuMap[i+3] = 7; // threadNumberToCpuMap[i+4] = 10; // threadNumberToCpuMap[i+5] = 11; // threadNumberToCpuMap[i+6] = 14; // threadNumberToCpuMap[i+7] = 15; // }; // // for (int i=0; i<16; i+=8){ // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 5; // threadNumberToCpuMap[i+2] = 6; // threadNumberToCpuMap[i+3] = 7; // threadNumberToCpuMap[i+4] = 12; // threadNumberToCpuMap[i+5] = 13; // threadNumberToCpuMap[i+6] = 14; // threadNumberToCpuMap[i+7] = 15; // }; // // for (int i=0; i<16; i+=4){ //70% overhead // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 5; // threadNumberToCpuMap[i+2] = 6; // threadNumberToCpuMap[i+3] = 7; // }; // // for (int i=0; i<16; i+=4){ //70% overhead // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 5; // threadNumberToCpuMap[i+2] = 12; // threadNumberToCpuMap[i+3] = 13; // }; // // for (int i=0; i<16; i+=4){ //70% overhead // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 5; // threadNumberToCpuMap[i+2] = 14; // threadNumberToCpuMap[i+3] = 15; // }; // // for (int i=0; i<16; i+=4){ //70% overhead // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 6; // threadNumberToCpuMap[i+2] = 13; // threadNumberToCpuMap[i+3] = 15; // }; // // for (int i=0; i<16; i+=4){ //25% overhead // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 5; // threadNumberToCpuMap[i+2] = 10; // threadNumberToCpuMap[i+3] = 11; // }; // // for (int i=0; i<16; i+=2){ //11% overhead // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 11; // }; // // for (int i=0; i<16; i+=2){ //11% overhead // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 3; // }; // // for (int i=0; i<16; i+=2){ //10% overhead // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 9; // }; // for (int i=0; i<16; i+=2){ //7% overhead // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 15; // }; // // for (int i=0; i<16; i+=2){ //7% overhead // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 7; // }; // for (int i=0; i<16; i+=2){ // threadNumberToCpuMap[i+0] = 4; // threadNumberToCpuMap[i+1] = 12; // }; // // for (int i=0; i<16; i+=2){ //7% overhead // threadNumberToCpuMap[i+0] = 2; // threadNumberToCpuMap[i+1] = 3; // }; // for (int i=0; i<16; i+=2){ //7% overhead // threadNumberToCpuMap[i+0] = 2; // threadNumberToCpuMap[i+1] = 4; // }; } // TMyObserver::FInit() /// redefine function, which will be run at begining of execution of each thread #define handle_error_en(en, msg) do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0) void TMyObserver::on_scheduler_entry(bool Is_worker) { pthread_t I = pthread_self(); TMyMutex::scoped_lock lock; lock.acquire(mutex); ++threads_counter; int cpuId = 1;//threadNumberToCpuMap[threads_counter%/*L1Algo::nthreadsL1AlgoInter::nCores*/ 80]; // int cpuId = 4+((threads_counter/4)%2)*8+threads_counter%4; // int cpuId = 2+((threads_counter/2)%4)*4+threads_counter%2; // int cpuId = 15-threads_counter%8; #ifdef DEBUG_THREADS cout << "ThrId=" << I << " thread have been created " << threads_counter << "-th."; cout << " And was run on cpu " << cpuId << endl; #endif // DEBUG_THREADS //threadToCpuMap[I] = cpuId; lock.release(); int s; cpu_set_t cpuset; pthread_t thread = I; CPU_ZERO(&cpuset); CPU_SET(cpuId, &cpuset); // cout << "before" << endl; //FIXME: segmentation fault somethere. s = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset); if (s != 0){ cout << " pthread_setaffinity_np " << endl; handle_error_en(s, "pthread_setaffinity_np");} // cout << "end set for " << cpuId << endl; // s = pthread_getaffinity_np(thread, sizeof(cpu_set_t), &cpuset); // // if (s != 0) handle_error_en(s, "pthread_getaffinity_np"); // // // printf("Set returned by pthread_getaffinity_np() contained:\n"); // for (int j = 0; j < CPU_SETSIZE; j++) // if (CPU_ISSET(j, &cpuset)) // printf(" CPU %d uses\n ", j); } // TMyObserver::on_scheduler_entry(bool Is_worker) /// run at end of each thread execution void TMyObserver::on_scheduler_exit(bool Is_worker) //FIXME: don't run { // pthread_t I = pthread_self(); // cout << "Thread with number " << I << " was ended " << threads_counter; // --threads_counter; } // TMyObserver::on_scheduler_exit(bool Is_worker) // inline void FFF(){}; // inline void FFF(int thrId){ // pthread_t I = pthread_self(); // TMyMutex::scoped_lock lock; // lock.acquire(mutex); // ++threads_counter; // int cpuId = thrId + threadNumberToCpuMap[threads_counter%L1AlgoInter::nCores]; // lock.release(); // // threadToCpuMap[I] = cpuId; // int s; // cpu_set_t cpuset; // pthread_t thread = I; // CPU_ZERO(&cpuset); // CPU_SET(cpuId, &cpuset); // s = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset); // if (s != 0){ cout << " pthread_setaffinity_np " << endl; handle_error_en(s, "pthread_setaffinity_np");} // } class ApplyReco{ InputDataArray *const InpData; int nEvPerThr; int* threadNumberToCpuMap; public: void operator()(const blocked_range &range) const { for(unsigned int i = range.begin(); i != range.end(); ++i){ TMyMutex::scoped_lock lock; lock.acquire(mutex); pthread_t I = pthread_self(); int s; cpu_set_t cpuset; pthread_t thread = I; CPU_ZERO(&cpuset); CPU_SET(threadNumberToCpuMap[i], &cpuset); s = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset); int nEv = nEvPerThr; lock.release(); L1Algo Tracker; Tracker.Init( InpData[i].fSettings->GetSettings() ); lock.acquire(mutex); std::cout << threadNumberToCpuMap[i] << std::endl; lock.release(); // for(int uuu=0; uuu<100; ++uuu) for (int j = 0; j < nEv; j++){ Tracker.SetData( InpData[i].fInput[j].GetStsHits(), InpData[i].fInput[j].GetStsStrips(), InpData[i].fInput[j].GetStsStripsB(), InpData[i].fInput[j].GetStsZPos(), InpData[i].fInput[j].GetSFlag(), InpData[i].fInput[j].GetSFlagB(), InpData[i].fInput[j].GetStsHitsStartIndex(), InpData[i].fInput[j].GetStsHitsStopIndex()); Tracker.CATrackFinder(); } } } ApplyReco(InputDataArray* const InpD_, const int nEvPerThr_, int* threadNumberToCpuMap_): InpData(InpD_),nEvPerThr(nEvPerThr_) { threadNumberToCpuMap = new int[20]; for(int i=0; i<20; i++) threadNumberToCpuMap[i] = threadNumberToCpuMap_[i]; } ~ApplyReco() { if(threadNumberToCpuMap) delete [] threadNumberToCpuMap; } }; class ApplyRecoWithPerf{ InputDataArray *const InpData; InputMCDataArray *const InpMCData; Performance * const Perf; int nEvPerThr; int iVerbose; public: void operator()(const blocked_range &range) const { for(unsigned int i = range.begin(); i != range.end(); ++i){ L1Algo Tracker; Tracker.Init( InpData[i].fSettings->GetSettings() ); Perf[i].Init("",&Tracker,nEvPerThr); for (int j = 0; j < nEvPerThr; j++){ Tracker.SetData( InpData[i].fInput[j].GetStsHits(), InpData[i].fInput[j].GetStsStrips(), InpData[i].fInput[j].GetStsStripsB(), InpData[i].fInput[j].GetStsZPos(), InpData[i].fInput[j].GetSFlag(), InpData[i].fInput[j].GetSFlagB(), InpData[i].fInput[j].GetStsHitsStartIndex(), InpData[i].fInput[j].GetStsHitsStopIndex()); Tracker.CATrackFinder(); Perf[i].SetData(InpMCData[i].fMCInput[j].GetHitStore(), InpMCData[i].fMCInput[j].GetStsHits(), InpMCData[i].fMCInput[j].GetMCPoints(), InpMCData[i].fMCInput[j].GetMCTracks(), InpMCData[i].fMCInput[j].GetHitMCRef() ); Perf[i].RunEv(); } } } ApplyRecoWithPerf(InputDataArray* const InpD_, InputMCDataArray* const InpMCD_, Performance * const Perf_, const int nEvPerThr_, const int iVerbose_): InpData(InpD_), InpMCData(InpMCD_), Perf(Perf_), nEvPerThr(nEvPerThr_), iVerbose(iVerbose_) {} ~ApplyRecoWithPerf() {} }; class ApplyMem{ const InputDataArray& InpDataPerThread; InputDataArray* const InpData; int nEvPerThr; int* threadNumberToCpuMap; public: void operator()(const blocked_range &range) const { const int NEvLocal = nEvPerThr; for(unsigned int i = range.begin(); i != range.end(); ++i){ pthread_t I = pthread_self(); int s; cpu_set_t cpuset; pthread_t thread = I; CPU_ZERO(&cpuset); CPU_SET(threadNumberToCpuMap[i], &cpuset); s = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset); InpData[i].fSettings = new L1AlgoInputSettings; InpData[i].fInput = new L1AlgoInputData[NEvLocal]; *(InpData[i].fSettings) = *(InpDataPerThread.fSettings); for(int j=0; j &range) const { const int NEvLocal = nEvPerThr; for(unsigned int i = range.begin(); i != range.end(); ++i){ // pthread_t I = pthread_self(); // int s; // cpu_set_t cpuset; // pthread_t thread = I; // CPU_ZERO(&cpuset); // CPU_SET(threadNumberToCpuMap[i], &cpuset); // s = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset); Perf[i] = Performance(); InpMCData[i].fMCInput = new L1AlgoInputMCData[NEvLocal]; for(int j=0; j &range) const // { // for(unsigned int i = range.begin(); i != range.end(); ++i){ // algos[i].algo = new L1Algo[L1AlgoInter::maxNEvents]; // for (int j = 0; j < nEvPerThr; j++){ // algos[i].algo[j] = algoForOneCore.algo[j]; // // cout << i << " " << j << endl; // } // } // } // ApplyMem(const AlgoArray & _algoFOC, AlgoArray *const _algos, const int _nEvPerThr): algoForOneCore(_algoFOC), algos(_algos),nEvPerThr(_nEvPerThr){} // ~ApplyMem() // { // } // }; #undef DEBUG_THREADS #endif // multithread_h