// $Id$ //----------------------------------------------------------------------- // The GSI Online Offline Object Oriented (Go4) Project // Experiment Data Processing at EE department, GSI //----------------------------------------------------------------------- // Copyright (C) 2000- GSI Helmholtzzentrum fuer Schwerionenforschung GmbH // Planckstr. 1, 64291 Darmstadt, Germany // Contact: http://go4.gsi.de //----------------------------------------------------------------------- // This software can be used under the license agreements as stated // in Go4License.txt file which is part of the distribution. //----------------------------------------------------------------------- #include "TGo4ServerTask.h" #include "TMutex.h" #include "TApplication.h" #include "TSystem.h" #include "TObjArray.h" #include "TGo4Log.h" #include "TGo4LockGuard.h" #include "TGo4Socket.h" #include "TGo4BufferQueue.h" #include "TGo4Thread.h" #include "TGo4ThreadHandler.h" #include "TGo4Status.h" #include "TGo4CommandInvoker.h" #include "TGo4TaskManager.h" #include "TGo4TaskHandler.h" #include "TGo4ConnectorRunnable.h" #include "TGo4Slave.h" #include "TGo4TaskConnectorTimer.h" const Int_t TGo4ServerTask::fgiOPENWAITCYCLES=100; // wait cycles (100) const UInt_t TGo4ServerTask::fguOPENWAITCYCLETIME=100; // time in ms (20) const Int_t TGo4ServerTask::fgiCLOSEWAITCYCLES=100; // wait cycles (100) const UInt_t TGo4ServerTask::fguCLOSEWAITCYCLETIME=100; // time in ms (20) const Int_t TGo4ServerTask::fgiCONNECTWAITCYCLES=20; // wait cycles (20) const UInt_t TGo4ServerTask::fguCONNECTWAITCYCLETIME=200; // time in ms (200) const UInt_t TGo4ServerTask::fguCONNECTTIMERPERIOD=100; // time in ms (50) const char* TGo4ServerTask::fgcLAUNCHPREFSFILE = "etc/Go4LaunchPrefs.txt"; TGo4ServerTask::TGo4ServerTask(const char* name, UInt_t negotiationport, Bool_t blockingmode, Bool_t standalone, Bool_t autostart, Bool_t autocreate, Bool_t ismaster) : TGo4Task(name,blockingmode, autostart,autocreate,ismaster), fxTaskManager(0),fxCurrentTaskHandler(0), fxConnectTransport(0), fxDisConnectTransport(0), fuConnectPort(0), fbKeepServerSocket(kFALSE), fbConnectRequest(kFALSE), fbDisConnectRequest(kFALSE), fbConnectIsOpen(kFALSE),fbConnectIsDone(kFALSE), fbConnectIsClose(kFALSE), fxConnectorTimer(0) { TString nomen("TaskManager of "); nomen += name; fxTaskManager= new TGo4TaskManager(nomen.Data(), this, negotiationport); if(negotiationport!=42) { // connector runnable: nomen.Form("ConnectorRunnable of %s", name); TGo4ConnectorRunnable* conny = new TGo4ConnectorRunnable(nomen.Data(),this); nomen.Form("CONNECTOR-%s", name); fxConnectorName=nomen; fxWorkHandler->NewThread(GetConnectorName(), conny); } else {} // do not start connector port in local mode <- nameservice problems without wan! TGo4CommandInvoker::Instance(); // make sure a command invoker exists TGo4CommandInvoker::Register("ServerTask", this); fxConnectorTimer= new TGo4TaskConnectorTimer(this,fguCONNECTTIMERPERIOD); fxConnectorTimer->TurnOn(); if(standalone) { Launch(); // create threads, start application control timer } else { // subclass must call Launch at end of its ctor } } TGo4ServerTask::~TGo4ServerTask() { if (GetWorkHandler()) GetWorkHandler()->CancelAll(); delete fxConnectorTimer; delete fxTaskManager; TGo4CommandInvoker::UnRegister(this); } Bool_t TGo4ServerTask::RemoveClient(const char* name, Bool_t clientwait, Bool_t isterminating) { Bool_t rev=kTRUE; TGo4TaskHandler* taskhandler=0; if(name && strstr(name,"current")) taskhandler=GetCurrentTaskHandler(); else taskhandler=GetTaskHandler(name); if(taskhandler==0) { // no such taskhandler for name TGo4Log::Debug(" ServerTask -- RemoveClient FAILED, no client %s !!! ", name); rev=kFALSE; } else // if(taskhandler==0) { TGo4Log::Debug(" ServerTask -- removing client task %s ",name); // first stop all user threads waiting on or writing into the data queues StopWorkThreads(); if(clientwait) { // normal mode: handshake with client to be removed // send quit command to client, client will send dummy objects back // to release waiting sockets // then client will request a disconnect action from the connector runnable if(IsMaster()) SubmitEmergencyCommand(kComQuit); // master quits client when removing else SubmitEmergencyData(kComQuit, taskhandler->GetName()); // client data runnable must handle quit request of server! TGo4Log::Debug(" Server Task -- Waiting for client %s disconnection...",taskhandler->GetName()); Int_t removeresult=fxTaskManager->WaitForClientRemoved(); // disconnection is done by connector // thread using task manager methods switch(removeresult) { case -1: // timeout TGo4Log::Debug(" !!! Server Task -- client remove wait TIMEOUT !!! "); rev=fxTaskManager->DisConnectClient(taskhandler->GetName(),kFALSE); // do not wait for client ok break; case -2: // we are terminating the server TGo4Log::Debug(" !!! Server Task -- client remove aborted for TERMINATION MODE !!! "); rev=kFALSE; break; default: // all right TGo4Log::Debug(" Server Task -- waited %d cycles until client was removed. ",removeresult); rev=kTRUE; break; } } // if(clientwait) else { // no communication with client, just disconnect without waiting TGo4Log::Debug(" !!! Server Task -- removing client %s without waiting... ", taskhandler->GetName()); SendStopBuffers(taskhandler->GetName()); rev= (fxTaskManager->DisConnectClient(taskhandler->GetName(),kFALSE)==0); // do not wait } if(!isterminating) StartWorkThreads(); } // else if(taskhandler==0) return rev; } Int_t TGo4ServerTask::RemoveAllClients(Bool_t force) { Int_t rev=0; // return value is number of removed clients //std::cout <<"TTTTTTTT TGo4ServerTask::RemoveAllClients" << std::endl; //// new: first figure out all existing names, then remove one by one: TGo4TaskHandler* taskhandler=0; TObjArray names; Bool_t reset=kTRUE; while((taskhandler=fxTaskManager->NextTaskHandler(reset)) !=0) { reset=kFALSE; //std::cout <<"adding name "<GetName() << std::endl; names.AddLast(new TNamed(taskhandler->GetName(), "title")); } TIter niter(&names); TObject* nomen=0; while((nomen =niter.Next()) !=0) { //std::cout <<"removing th "<GetName() << std::endl; RemoveClient(nomen->GetName(),!force,kTRUE); rev++; } names.Delete(); // end iteration return rev; } Bool_t TGo4ServerTask::RemoveCurrentClient() { Bool_t rev=kTRUE; TGo4TaskHandler* taskhandler=GetCurrentTaskHandler(); if(taskhandler!=0) { // we have a current client, remove it TGo4Log::Debug(" Server task -- removing current client %s ",taskhandler->GetName()); rev = RemoveClient(taskhandler->GetName()); } else { rev=kFALSE; } return rev; } void TGo4ServerTask::SetCurrentTask(const char* name) { //std::cout <<"server task setting current task to "<GetLastTaskHandler(); //std::cout << "**** set current th from get lastth:"<< fxCurrentTaskHandler << std::endl; } else // if(name==0) { // name specified, search for it han=fxTaskManager->GetTaskHandler(name); if(han) { fxCurrentTaskHandler=han; //std::cout << "**** set current th from name:"<< fxCurrentTaskHandler << std::endl; } else // if(han) { TGo4Log::Debug(" ServerTask: FAILED setting current task to %s-- no such client! ",name); } // else if(han) } // else if(name==0) } // TGo4LockGuard // finally, start user threads again StartWorkThreads(); } // else if(fxTaskManager==0) } TGo4TaskHandler* TGo4ServerTask::GetTaskHandler(const char* name) { return (fxTaskManager->GetTaskHandler(name)); } TGo4TaskHandler* TGo4ServerTask::GetTaskHandler() { return (GetCurrentTaskHandler()); } TGo4TaskHandler* TGo4ServerTask::GetCurrentTaskHandler() { return fxCurrentTaskHandler; } TGo4TaskManager* TGo4ServerTask::GetTaskManager() { return fxTaskManager; } void TGo4ServerTask::SetConnect(TGo4Socket * trans, const char* host, UInt_t port, Bool_t keepserv) { fxConnectTransport=trans; fxConnectHost=host; fuConnectPort=port; fbConnectRequest=kTRUE; fbKeepServerSocket=keepserv; } void TGo4ServerTask::SetDisConnect(TGo4Socket * trans) { fxDisConnectTransport=trans; fbDisConnectRequest=kTRUE; } Int_t TGo4ServerTask::TimerConnect() { Int_t rev=0; /////////////////////////////////////// //// handle the disconnection first: if(fbDisConnectRequest) { if(fxDisConnectTransport!=0) { // we have a transport instance to disconnect fxDisConnectTransport->Close(); //delete fxDisConnectTransport; // new //std::cout << "++++++++Timer closed transport"<< std::endl; fbConnectIsClose=kTRUE; fbDisConnectRequest=kFALSE; // we served the request, reset it rev+=1; } else { // error, zero pointer given rev+=32; } } else //// if(fbDisConnectRequest) { // no open request, continue rev+=2; } ///////////////////////////////////// //// then handle the connection: if(fbConnectRequest) { // timer shall open a transport as server if(fxConnectTransport!=0) { if(!fxConnectTransport->IsOpen()) { // transport is not open, so do it // std::cout << "++++++++Timer will open transport"<< std::endl; fbConnectIsOpen=kTRUE; // tell connector thread that we try to open Int_t result=fxConnectTransport->Open(GetConnectHost(), fuConnectPort, fbKeepServerSocket); if(result==0) { fbConnectIsDone=kTRUE; // tell connector thread we returned from open fbConnectRequest=kFALSE; // we served the request, reset it fbKeepServerSocket=kFALSE; // reset keep server socket flag rev+=4; } else { rev=-4; // open was not finished, we poll once again... } } else { // transport was already open // std::cout <<"OOOOOOOOOOOO TimerConnect transport already open!" << std::endl; fbConnectIsOpen=kTRUE; fbConnectIsDone=kTRUE; // tell connector thread we returned from open fbConnectRequest=kFALSE; // we served the request, reset it fbKeepServerSocket=kFALSE; // reset keep server socket flag rev+=8; } } // if(fxConnectTransport!=0) else { rev+=64; // no Transport specified: create raw server for negotiation port //fxConnectTransport=new TGo4Socket("Server",3); //std::cout << "(((((( timer created new raw server transport"<< std::endl; } } //// if(fbConnectRequest) else { // no open request, continue rev+=16; } return rev; } Int_t TGo4ServerTask::WaitForOpen() { Int_t count=0; while(!fbConnectIsOpen) { if(count>TGo4ServerTask::fgiOPENWAITCYCLES) { count = -1; // timeout break; } else { TGo4Thread::Sleep(TGo4ServerTask::fguOPENWAITCYCLETIME); ++count; } // std::cout << "*****WaitForOpen()"<< std::endl; } fbConnectIsOpen=kFALSE; // reset for next time return count; } Int_t TGo4ServerTask::WaitForClose() { Int_t count=0; while(!fbConnectIsClose) { if(count>TGo4ServerTask::fgiCLOSEWAITCYCLES) { count = -1; // timeout break; } else { TGo4Thread::Sleep(TGo4ServerTask::fguCLOSEWAITCYCLETIME); ++count; } //std::cout << "*****WaitForClose() "<TGo4ServerTask::fgiCONNECTWAITCYCLES) // { // count = -1; // timeout // break; // } else { TGo4Thread::Sleep(TGo4ServerTask::fguCONNECTWAITCYCLETIME); ++count; } // std::cout << "*****WaitForConnection()"<< std::endl; } fbConnectIsDone=kFALSE; // reset for next time return count; } TGo4Socket* TGo4ServerTask::GetConnectTransport() { return fxConnectTransport; } TGo4BufferQueue* TGo4ServerTask::GetCommandQueue(const char* name) { TGo4BufferQueue* queue=0; TGo4TaskHandler* currenttask=0; if(name==0 || strstr(name,"current")) currenttask=GetCurrentTaskHandler(); // find out the current client else currenttask=GetTaskHandler(name); // find out destination client by name if(currenttask) queue=dynamic_cast (currenttask->GetCommandQueue()); return queue; } TGo4BufferQueue * TGo4ServerTask::GetStatusQueue(const char* name) { TGo4BufferQueue* queue=0; TGo4TaskHandler* currenttask=0; if(name==0) currenttask=GetCurrentTaskHandler(); // find out the current client else currenttask=GetTaskHandler(name); // find out destination client by name if(currenttask) queue=dynamic_cast (currenttask->GetStatusQueue()); return queue; } TGo4BufferQueue * TGo4ServerTask::GetDataQueue(const char* name) { TGo4BufferQueue* queue=0; TGo4TaskHandler* currenttask=0; if(name==0 || strstr(name,"current")) currenttask=GetCurrentTaskHandler(); // find out the current client else currenttask=GetTaskHandler(name); // find out destination client by name if(currenttask) queue=dynamic_cast (currenttask->GetDataQueue()); return queue; } TGo4Command* TGo4ServerTask::NextCommand() { if(IsMaster()) return 0; TGo4Command* com=0; TGo4TaskHandler* han=0; Bool_t reset=kTRUE; TGo4LockGuard taskmutex(fxTaskManager->GetMutex()); while((han=fxTaskManager->NextTaskHandler(reset))!=0) { reset=kFALSE; TGo4BufferQueue * comq=dynamic_cast (han->GetCommandQueue()); if(comq==0) continue; //NEVER COME HERE! if(!comq->IsEmpty()) // prevent waiting on queue { com= dynamic_cast(comq->WaitObjectFromBuffer()); if(com) { com->SetTaskName(han->GetName()); com->SetMode(han->GetRole()); return com; } } } // while return com; } void TGo4ServerTask::SendStatus(TGo4Status * stat, const char* receiver) { if(IsMaster()) return; if(stat==0) return; if(receiver!=0) { TGo4Task::SendStatus(stat,receiver); return; } // send status to all TGo4LockGuard taskmutex(fxTaskManager->GetMutex()); TGo4TaskHandler* han = 0; Bool_t reset = kTRUE; while((han = fxTaskManager->NextTaskHandler(reset))!=0) { reset = kFALSE; TGo4BufferQueue * statq=dynamic_cast (han->GetStatusQueue()); if(statq==0) continue; //NEVER COME HERE! TGo4Log::Debug(" Task - sending status %s to task %s", stat->ClassName(), han->GetName()); statq->AddBufferFromObject(stat); }// while } void TGo4ServerTask::SendStatusBuffer() { if(IsMaster()) return; TGo4LockGuard statguard(fxStatusMutex); // do not send during buffer update TGo4LockGuard taskmutex(fxTaskManager->GetMutex()); // protect task list TGo4TaskHandler* han=0; Bool_t reset=kTRUE; while((han=fxTaskManager->NextTaskHandler(reset))!=0) { reset=kFALSE; TGo4BufferQueue * statq=dynamic_cast (han->GetStatusQueue()); if(statq==0) continue; //NEVER COME HERE! TGo4Log::Debug(" Task - sending status buffer to task %s", han->GetName()); statq->AddBuffer(fxStatusBuffer,kTRUE); }// while } Bool_t TGo4ServerTask::StartConnectorThread() { return GetWorkHandler()->Start( GetConnectorName() ); } Bool_t TGo4ServerTask::StopConnectorThread() { Bool_t rev = GetWorkHandler()->Stop( GetConnectorName() ); // unset running flag // now establish a dummy connection to our own server to release the listen socket: const char* host = gSystem->HostName(); Int_t negotiationport = fxTaskManager->GetNegotiationPort(); TGo4Socket* connector = new TGo4Socket(kTRUE); // raw socket transport //std::cout << "host:"<Open(host,negotiationport); // open connection to server's connector runnable connector->Send(TGo4TaskHandler::Get_fgcERROR()); // force server to stop connector->Close(); delete connector; return rev; } Bool_t TGo4ServerTask::ConnectorThreadIsStopped() { TGo4Thread* conny = GetWorkHandler()->GetThread(GetConnectorName()); return conny->IsWaiting(); } void TGo4ServerTask::Quit() { TGo4Log::Debug(" ServerTask Quit -- removing all connected clients "); SendStatusMessage(2,kTRUE,"ServerTask %s is shutting down now! All clients are removed...",GetName()); TGo4Slave* slave=GetSlave(); if(slave) { TGo4Log::Debug(" ServerTask Quit is stopping slave..."); slave->Stop(); // to execute analysis postloop if still running } if(!IsMaster()) { //std::cout <<"mmmmmmmmm quit is unlocking taskmanager mutex" << std::endl; fxTaskManager->GetMutex()->UnLock(); // JAM avoid deadlocking of analysis server main thread with connector thread that actually performs the remove } RemoveAllClients(); //StopWorkThreads(); // are re-started after last client is removed... WakeCommandQueue(TGo4Task::Get_fgiTERMID()); // will stop local command thread, and remote Terminate(!IsMaster()); // terminate only slave server here! // if(!IsMaster()) // fxTaskManager->GetMutex()->Lock(); // avoid conflicts with lockguard outside } void TGo4ServerTask::Shutdown() { TGo4Log::Debug(" ServerTask Shutdown without disconnect waiting"); SendStatusMessage(2,kTRUE,"ServerTask %s is shutting down now! All clients are removed...",GetName()); fxTaskManager->GetMutex()->UnLock(); // avoid possible deadlock between main thread and connector thread TGo4Thread::Sleep(10*fguCONNECTTIMERPERIOD); // wait 1 s to broadcast shutdown message before terminating... StopWorkThreads(); WakeCommandQueue(TGo4Task::Get_fgiTERMID()); // will stop local command thread, and remote RemoveAllClients(true); TGo4Slave* slave=GetSlave(); if(slave) { TGo4Log::Debug(" ServerTask Shutdown stopping slave..."); slave->Stop(); // to execute analysis postloop. // We are within main thread here, i.e. it never stops before termination! slave->SetTask(0,kFALSE); // otherwise owner dtor will delete us... delete slave; //call dtors of analysis framework //SetOwner(0); } fxTaskManager->GetMutex()->Lock(); gApplication->Terminate(); // do not wait until appctrl timer terminates us } const char* TGo4ServerTask::Get_fgcLAUNCHPREFSFILE() { return fgcLAUNCHPREFSFILE; }