// ************************************************************************** // This file is property of and copyright by the ALICE HLT Project * // ALICE Experiment at CERN, All rights reserved. * // * // Primary Authors: Sergey Gorbunov * // Ivan Kisel * // for The ALICE HLT Project. * // * // Developed by: Igor Kulakov * // Maksym Zyzak * // * // Permission to use, copy, modify and distribute this software and its * // documentation strictly for non-commercial purposes is hereby granted * // without fee, provided that the above copyright notice appears in all * // copies and that both the copyright notice and this permission notice * // appear in the supporting documentation. The authors make no claims * // about the suitability of this software for any purpose. It is * // provided "as is" without express or implied warranty. * // * //*************************************************************************** #include #include "KFParticleTopoReconstructor.h" #include "KFParticle.h" #include "KFParticleSIMD.h" #include "KFPHistogram/KFPHistogram.h" #ifdef MAIN_DRAW #include "AliHLTTPCCADisplay.h" #endif #include #include #include #include #include using namespace std; #ifndef NVALGRIND #include #endif #include #ifndef HLTCA_STANDALONE #include "TStopwatch.h" typedef TStopwatch Stopwatch; #else #include "Stopwatch.h" #endif #include "pthread.h" #include #include "dirent.h" #include "omp.h" #include #define handle_error_en(en, msg) do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0) #define PAGE_SIZE 0x1000 //4KB inline void SetAffinity(const int cpuId) { int s; cpu_set_t cpuset; pthread_t thread = pthread_self(); CPU_ZERO(&cpuset); CPU_SET(cpuId, &cpuset); s = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset); if (s != 0){ cout << " pthread_setaffinity_np " << endl; handle_error_en(s, "pthread_setaffinity_np");} } int main(int argc, char **argv) { int nThreads = 12; int iHLT = 0; for( int i=1; i < argc; i++ ){ if ( !std::strcmp( argv[i], "-h" ) || !std::strcmp( argv[i], "--help" ) || !std::strcmp( argv[i], "-help" ) ) { std::cout << " -HLT - specify the index of the corresponding HLT instance." << std::endl; return 0; } else if ( !std::strcmp( argv[i], "-nThreads" ) && ++i < argc ) { nThreads = atoi( argv[i] ); } else if ( !std::strcmp( argv[i], "-HLT" ) && ++i < argc ) { iHLT = atoi( argv[i] ); } } //establish SKIF connection scif_epd_t endpoint, endpoint_sender; struct scif_portID portSenderId; portSenderId.node = 0; //the host node is always assumed to be CPU portSenderId.port = 2000 + iHLT; const uint16_t portRecieverId = 3000 + iHLT; endpoint = scif_open(); if( endpoint == SCIF_OPEN_FAILED ) { std::cout << "Fail openning SCIF endpoint." << std::endl; return 0 ; } int ret = 0; ret = scif_bind(endpoint, portRecieverId); if( ret == -1 ) { std::cout << "Fail binding SCIF. " << std::endl; return 0; } ret = scif_listen (endpoint, 5); if (ret == -1) { std::cout << "Fail listening SCIF." << std::endl; return 0; } scif_accept(endpoint, &portSenderId, &endpoint_sender, SCIF_ACCEPT_SYNC); if( ret == -1 ) { std::cout << "Fail accepting SCIF." << std::endl; return 0; } float Bz = 0; ret = scif_recv(endpoint_sender, &Bz, sizeof(Bz), SCIF_RECV_BLOCK); if (ret==-1) std::cout << "Fail receiving magnetic field." << std::endl; KFParticle::SetField(Bz); KFParticleSIMD::SetField(Bz); int runnumber = 0; ret = scif_recv(endpoint_sender, &runnumber, sizeof(runnumber), SCIF_RECV_BLOCK); if (ret==-1) std::cout << "Fail receiving runnumber field." << std::endl; int maxEventSize = PAGE_SIZE * 1024 * 3; // 12MB void* buffer; posix_memalign(&buffer, 0x1000, maxEventSize); off_t offsetServer = scif_register(endpoint_sender, buffer, maxEventSize, 0, SCIF_PROT_READ | SCIF_PROT_WRITE,0); if( offsetServer < -1 ) { std::cout << "Fail registerring SCIF." << std::endl; return 0; } scif_send(endpoint_sender, &offsetServer, sizeof(offsetServer), SCIF_SEND_BLOCK); scif_send(endpoint_sender, &maxEventSize, sizeof(maxEventSize), SCIF_SEND_BLOCK); int controlSignal = -1; ret = scif_recv(endpoint_sender, &controlSignal, sizeof(int), SCIF_RECV_BLOCK); if (ret==-1) std::cout << "Fail receiving control signal." << std::endl; if(controlSignal != portSenderId.port) { std::cout << "Wrong control signal." << std::endl; std::cout << "controlSignal " << controlSignal << " portSenderId.port " << portSenderId.port << std::endl; exit(0); } std::cout << "B " << Bz << " run number " << runnumber << std::endl; #ifdef FORMIC const int NMaxThreads = 244; int threadNumberToCpuMap[244]; for (int i=1; i<244; i++){ threadNumberToCpuMap[i-1] = i; } threadNumberToCpuMap[243] = 0; #else const int NMaxThreads = 40; int threadNumberToCpuMap[40]; for (int i=0; i<20; i++){ threadNumberToCpuMap[2*i+0] = i; threadNumberToCpuMap[2*i+1] = i+20; } #endif KFPLinkedList** data = new KFPLinkedList*[nThreads]; KFPLinkedList* currentEventPointer[nThreads]; int nQuedTasks[nThreads]; for(int iThread=0; iThread= NMaxThreads ) cpuId -= int(cpuId/NMaxThreads)*NMaxThreads; SetAffinity(threadNumberToCpuMap[cpuId]); string histoFileName = "KFPHistograms/KFPHistogram"; histoFileName += std::to_string(iThread); histoFileName += ".txt"; histograms[iThread] = new KFPHistogram; histograms[iThread]->SetOutFileName(histoFileName); } // std::cout << "Histos are ready " << std::endl; Stopwatch timer; timer.Start(); omp_lock_t *dataManipulationLock = new omp_lock_t[nThreads]; for(int iThread=0; iThread= NMaxThreads ) cpuId -= int(cpuId/NMaxThreads)*NMaxThreads; SetAffinity(threadNumberToCpuMap[cpuId]); // #pragma omp critical // std::cout << "Cpu Id reader " << cpuId << std::endl; while(1) { int msgSize; ret = scif_recv(endpoint_sender, &msgSize, sizeof(int), SCIF_RECV_BLOCK); if (ret==-1) { std::cout << "Fail receiving message with array size." << std::endl; scif_close(endpoint); stop = 1; break; } int mark = 0; controlSignal = portSenderId.port; scif_send(endpoint_sender, &controlSignal, sizeof(controlSignal), SCIF_SEND_BLOCK); ret = scif_fence_mark(endpoint_sender, SCIF_FENCE_INIT_PEER, &mark); if (ret==-1) { std::cout << "SCIF fence mark failed. Error: " << errno << std::endl; } ret = scif_fence_wait(endpoint_sender, mark); if (ret==-1) { std::cout << "SCIF fence wait failed. Error: " << std::endl; } ret = scif_recv(endpoint_sender, &controlSignal, sizeof(int), SCIF_RECV_BLOCK); if (ret==-1) std::cout << "Fail receiving control signal." << errno << std::endl; if(controlSignal != portSenderId.port) { std::cout << "Wrong control signal." << std::endl; continue; } KFPLinkedList* newInputData = new KFPLinkedList; newInputData->data.ReadDataFromVector(reinterpret_cast(buffer)); newInputData->next = 0; iThread = 0; int nMinTasks=nQuedTasks[0]; for(int iT=1; iTnext = newInputData; data[iThread] = newInputData; if(currentEventPointer[iThread] == 0) currentEventPointer[iThread] = newInputData; nQuedTasks[iThread]++; } #pragma omp flush omp_unset_lock(&dataManipulationLock[iThread]); // iThread++; // if(iThread == nThreads) // iThread = 0; nEvents++; } std::cout << nEvents << std::endl; } //run one instance of KF Particle Finder per core #pragma omp for nowait for(int iThread=0; iThread= NMaxThreads ) cpuId -= int(cpuId/NMaxThreads)*NMaxThreads; SetAffinity(threadNumberToCpuMap[cpuId]); // #pragma omp critical // std::cout << "Cpu Id iThread " << iThread << ": " << cpuId << std::endl; int nEventsLocal = 0; while(1) { bool isDataAvailable = 0; omp_set_lock(&dataManipulationLock[iThread]); isDataAvailable = (currentEventPointer[iThread] != NULL); omp_unset_lock(&dataManipulationLock[iThread]); if(isDataAvailable) { KFParticleTopoReconstructor topoReconstructor; topoReconstructor.Init(currentEventPointer[iThread]->data.GetTracks(), currentEventPointer[iThread]->data.GetPV() ); topoReconstructor.SetNThreads(1); topoReconstructor.ReconstructParticles(); histograms[iThread]->Fill(topoReconstructor); topoReconstructor.DeInit(); #pragma omp flush omp_set_lock(&dataManipulationLock[iThread]); #pragma omp flush { KFPLinkedList* processedEvent = currentEventPointer[iThread]; currentEventPointer[iThread] = currentEventPointer[iThread]->next; if(data[iThread] == processedEvent) data[iThread] = 0; delete processedEvent; nQuedTasks[iThread]--; } #pragma omp flush omp_unset_lock(&dataManipulationLock[iThread]); nEventsLocal++; } if(nEventsLocal == 100) { histograms[iThread]->Save(); nEventsLocal = 0; } if(stop && (data[iThread] == 0)) { histograms[iThread]->Save(); std::cout << "thread " << iThread << " " << nEventsLocal << std::endl; break; } } } } } timer.Stop(); for(int iThread=0; iThread