[programmer/prog-services.tex] \section{Memory management} \label{prog_services_memory} \figpdf{memorypool}{Schema of \dabc\ \class{MemoryPool} with {\em blocks} and {\em subblocks}, \class{Buffers} with {\em segments}, and \class{Pointer} object. A \class{PoolHandle} is used to access the pool from within a \class{Module}. See text for details.}{htb}{0}{.9} \subsection{Zero-copy approach} The \dabc~ framework is based on a dataflow concept: Data buffers are flowing through many components like \class{Modules}, \class{Transports}, and \class{Devices}. If it was required to copy the data content in each step of such transfer chain, this would reduce performance drastically. Therefore \dabc~ has a central memory management that provides global memory \strong{Buffers} from a \strong{Memory Pool}. All components use just references to this memory; these can be passed further without copying the content. This technique is called \strong{zero-copy approach} and is fully supported by \dabc~. \subsection{Memory pool} \label{prog_services_memory_pool} The memory in \class{dabc::MemoryPool} is organized in big {\em blocks} of contiguous virtual memory. Each {\em block} is divided into memory pieces of the same size, the {\em subblocks}; the size of each {\em subblock} is defined as a power of 2 (e.~g.~ 4096 bytes). The \class{MemoryPool} can have several memory {\em blocks} with different {\em subblock} sizes. Usually a \class{MemoryPool} has a fixed structure: the memory is allocated once and will not change during the complete run. This is the preferrable mode of operation, because any memory allocation may lead to an undefined execution time, or could even cause an error, if the system has too few resources. Nevertheless, one can configure a \class{MemoryPool} to be extendable: the \class{MemoryPool} will allocate new {\em blocks}, if it has no more memory available to provide a requested \class{Buffer}. Each {\em subblock} of the \class{MemoryPool} has a 32-bit reference counter which counts how many references to this memory region are in use by the \class{Buffers}. This is necessary for book-keeping of available memory, since several \class{Buffer} objects can refer to the same {\em subblock}. The user can request a new \class{Buffer} from the \class{MemoryPool} with method \func{TakeBuffer()}. This method returns a \class{dabc::Buffer} instance with an internal reference to a formerly unused {\em subblock} of the appropriate size. The reference counter of this {\em subblock} is incremented then. To release a \class{Buffer}, one should call static method \\ \func{dabc::Buffer::Release()}. This will delete the \class{Buffer} object and decrement again the {\em subblock} reference counter. \subsection{Buffer} \label{prog_services_memory_buffer} In the general case, \class{dabc::Buffer} contains a list of {\em segments} (gather list). Each {\em segment} (represented by class \class{dabc::MemSegment}) refers to a different part of a {\em subblock} in the \class{MemoryPool} (compare section \ref{prog_services_memory_pool}). The \class{dabc::MemSegment} contains a unique buffer id, the pointer to the {\em segment} begin, and the size of the {\em segment}. Usually, a \class{dabc::Buffer} contains just one segment, which fully covers a complete {\em subblock} of the \class{MemoryPool} (for instance, when a new \class{Buffer} is requested with method \func{MemoryPool::TakeBuffer()}). Methods \func{NumSegments()} and \func{Segment(unsigned)} provide access to the list of segments. One can also directly access the pointer, and the size of each {\em segment}, via methods\func{GetDataLocation(unsigned)}, and \func{GetDataSize(unsigned)}, respectively. For instance, filling a complete \class{Buffer} with zeros will look like this: \begin{small} \begin{verbatim} #include "dabc/Buffer.h" void UserModule::ProcessOutputEvent(dabc::Port* port) { dabc::Buffer* buf = Pool()->TakeBuffer(2048); memset(buf->GetDataLocation(), 0, buf->GetDataSize()); port->Send(buf); } \end{verbatim} \end{small} It is also possible to create a \class{Buffer} object that reference the same memory of another \class{Buffer}, by means of method \func{Buffer::MakeReference()}. This will deliver the pointer to a new \class{dabc::Buffer} instance with the same list of {\em segments} as the original instance. It will also increment the reference counter for all used {\em subblocks} in the \class{MemoryPool}. This method should be used e.~g.~ to send the same data over several \class{Ports}: one just makes as many reference \class{Buffers} as required and sends them to all destinations independently, without copying the data. For instance, a simplified version of \func{dabc::Module::SendToAllOutputs()} will look like: \begin{small} \begin{verbatim} void dabc::Module::SendToAllOutputs(const dabc::Buffer& buf) { for(unsigned n=0;nSend(buf.Duplicate()); } \end{verbatim} \end{small} The \class{dabc::Buffer} object has a 32-bit type identifier which can be set with method \func{SetTypeId()}, and can be retrieved with \func{GetTypeId()}. Its purpose is to identify the type of the buffer content. The value of this identifier is application specific - for instance, the \mbs~ plugin defines its own type, which is then used by the transports to distinguish if the buffer contains an \mbs~ event format. Each \class{dabc::Buffer} can be supplied with an additional \strong{header}. This is piece of memory which is allocated and managed by the pool separately from the main payload memory and in generally \strong{should be smaller} than the payload memory. The idea of the buffer header is to add user-specific information to an already existing \class{Buffer}, without changing the contained payload data, and even without touching the \class{Buffer} identifier. The header size can be set by \func{SetHeaderSize()} method; the pointer to the header can be obtained by \func{GetHeader()} method. The main difference between header memory and payload memory concerns the behaviour when the \class{Buffer} is send via a "zero copy" network transport implementation, like {\em InfiniBand verbs}: in contrast to the payload data, which will be transferred directly from the \class{Buffer} memory by DMA, the header contents will be explicitely copied first. \subsection{Pointer} \label{prog_services_memory_pointer} Class \class{dabc::Pointer} provides a virtual contiguous access to segmented data which is referenced by a \class{dabc::Buffer} object. Using \class{dabc::Pointer}, one should not care how many segments are referenced by the \class{Buffer}, and how big they are. One can use following methods: \bbul \item[\func{Pointer()} or \func{reset()}] initialize or reset the pointer as a reference of a \class{dabc::Buffer}, of another \class{dabc::Pointer}, or just of a simple memory region \item[\func{ptr()} or \func{operator()}] the current memory pointer \item[\func{rawsize()}] size of contiguous system memory from current pointer position \item[\func{fullsize()}] size of full memory from current pointer position \item[\func{shift()}] shift pointer \item[\func{copyfrom()}] set pointed memory content from a \class{dabc::Pointer}, or just from a memory region \item[\func{copyto()}] copy pointed memory content into specified memory region \ebul Example of pointer usage: \begin{small} \begin{verbatim} #include "dabc/Buffer.h" #include "dabc/Pointer.h" void UserModule::ProcessOutputEvent(dabc::Port* port) { if (!Input(0)->CanRecv()) return; dabc::Buffer* buf = Input(0)->Recv(); dabc::Pointer ptr(buf); uint32_t v = 0; while (ptr.fullsize()>0) { ptr.copyfrom(&v, sizeof(v)); ptr.shift(sizeof(v)); v++; } Output(0)->Send(buf); } \end{verbatim} \end{small} \subsection{Buffer guard} \label{prog_services_memory_bufferguard} Class \class{dabc::BufferGuard} is the equivalent of a \class{LockGuard} for threads (see section \ref{prog_services_threads_mutex}), preventing memory leaks due to unreleased \class{Buffers}. It should be used to automatically release a \class{Buffer} whenever the function scope is left, both by returning regularly, and by throwing an exception. One should \strong{explicitly} take out the \class{Buffer} from the guard with \func{BufferGuard::Take()} to avoid such automatic release in a normal situation. A typical usage of \class{dabc::BufferGuard} is shown here: \begin{small} \begin{verbatim} ... dabc::BufferGuard buf = pool->TakeBuffer(2048); ... port->Send(buf.Take()); ... \end{verbatim} \end{small} Class \class{dabc::ModuleSync} provides several methods to work directly with \class{dabc::BufferGuard} - this allows to correctly release a \class{Buffer} in case of any exception, which otherwise may not be handled correctly by the user. \subsection{Allocation} \label{prog_services_memory_allocation} There are several methods how a \class{MemoryPool} can be created: \bbul \item Automatically, when the user tries to access it via a \class{PoolHandle} the first time \item using \func{dabc::Manager::CreateMemoryPool()} method \item using \comm{dabc::CmdCreatePool} command \ebul Automatic creation is useful for simple applications with a few modules. In this case the parameters specified by the \class{PoolHandle} (size and number of buffers) are used. But in many situations it is good to create a memory pool explicitly, setting all its parameters directly, or from a configuration file. Typically, the memory pool is created by the user's \class{Application} class in method \func{CreateAppModules()}, called by state change command \strong{DoConfigure}. In simple case: \begin{small} \begin{verbatim} bool UserApplication::CreateAppModules() { ... dabc::mgr()->CreateMemoryPool("WorkPool", 8192, 100); ... } \end{verbatim} \end{small} One can call \func{CreateMemoryPool()} method several times to create memory {\em blocks} for different buffer sizes. As alternative, one can create and configure a command object \comm{dabc::CmdCreateMemoryPool} where all possible settings can be done via following methods: \bbul \item[\func{AddMem()}] add configuration for specified buffer size \item[\func{AddRef()}] add configuration for number of references and header sizes \item[\func{AddCfg()}] set generic configuration like cleanup timeout or size limit \ebul For instance, one can do the following: \begin{small} \begin{verbatim} bool UserApplication::CreateAppModules() { ... dabc::CmdCreateMemoryPool cmd("WorkPool"); cmd.AddMem(8192, 100); // 8K bufs cmd.AddMem(2048, 500); // 2K bus cmd.AddRef(2048, 64); // refs with 64 B headers cmd.AddCfg(true); // set fixed layout dabc::mgr.Execute(cmd); ... } \end{verbatim} \end{small} All parameters, configured for the command, can be set up in the configuration file. In this case one should just call {\tt dabc::mgr.CreateMemoryPool("WorkPool")}. \section{Threads organization} \label{prog_services_threads} Class \class{dabc::WorkingThread} organizes a working loop and performs execution of runnable jobs, represented by \class{dabc::Worker} class. \subsection{Working loop} \label{prog_services_threads_workloop} The implementation of \class{dabc::WorkingThread} is based on the {\em pthreads} library. The main task of \class{dabc::WorkingThread} is to wait for events (using \func{pthread\_cond\_wait()} function), and then execute the event callback in the corresponding \class{Worker}. This functionality is implemented in \func{dabc::WorkingThread::MainLoop()}. Usually events are produced by calling \func{dabc::Worker::FireEvent()} method; this method can be invoked from any thread. All events are queued and a {\em pthread condition} is fired in this case. The thread, waiting for this condition, is woken up, and the next event from the queue will be delivered to the \class{Worker} by calling virtual method \func{dabc::Worker::ProcessEvent()}. Here any user-specific code can be implemented. Another task of \class{dabc::WorkingThread} consists in \strong{timeout handling}. Some \class{Workers} may require to be invoked not only by events, but also after specified time intervals. Method \func{dabc::Worker::ActivateTimeout()} requests the thread to execute \\ \func{dabc::Worker::ProcessTimeout()} after the specified time interval. This virtual method may also be implemented by the user. \subsection{Sockets handling} \label{prog_services_threads_sockets} The POSIX sockets library provides the handling of all socket operations in an event-like manner, using the \func{select()} function. Such approach was used in \class{dabc::SocketThread} and \class{dabc::SocketWorker} classes to handle several sockets in parallel from a single thread. With each \class{dabc::SocketWorker} a socket descriptor is associated which can deliver events like: "can read next portion of data from socket", "sending over socket will not block", "socket is broken", and so on. The main loop of \class{dabc::SocketThread} is modified such, that, instead of waiting for the {\em pthread condition}, the thread waits for the next event from the sockets. Handling these events allows to send and receive of data via sockets in a non-blocking manner, i.~e.~ one can run several socket operations in parallel with one thread. At the same time, \class{dabc::SocketThread} class allows to run normal jobs, implemented with base class \class{dabc::Worker}. So within a \class{SocketThread} one can mix socket processors (like some \class{Transports}) with normal processors (like \class{modules}). A similar approach was used to support the {\em InfiniBand verbs} API in \dabc. \subsection{Mutex usage} \label{prog_services_threads_mutex} All methods of \class{dabc::WorkingThread} and \class{dabc::Worker} are thread safe (except for those started with underscore "\_" symbol). So user code could avoid mutexes completely. But if data is shared between \class{Processors} which run in different threads, one should use mutexes though. Here it is recommended to work with a \class{dabc::LockGuard}. This class takes care that the mutex will be unlocked automatically whenever the current function scope is left. For instance, if one has global static variable associated with a mutex, one should implement a thread-safe setter method like this: \begin{small} \begin{verbatim} #include "dabc/threads.h" int GlobalVariable = 0; dabc::Mutex GlobalMutex; void SetGlobalVariable(int newvalue) { dabc::LockGuard guard(GlobalMutex); GlobalVariable = newvalue; } \end{verbatim} \end{small} \section{Command execution} \label{prog_services_commands} The idea of command execution is to invoke user-specific code from any part of the system. There are several reasons to prefer a command interface over direct calls of class methods: \bbul \item The execution of a command is performed not in the context of the calling thread, but in the thread to which the command receiver object is assigned. This allows to avoid unnecessary mutex locking. \item The execution of a command can be performed synchronous or asynchronous to the calling thread, so one can easily specify a timeout for the command execution. \item A command can be submitted to any object in the system, including objects on remote nodes. \item The code that invokes the command execution does not strongly depend on the code that executes the command: the invoking client library must know a command base class and some common parameter names, but not the implementation of the execution itself. This allows to decouple the required libraries on different nodes. \item A command object can contain an arbitrary number of argument values, and can also be used to return any number of result values. \ebul \subsection{Command class} Class \class{dabc::Command} is a container for argument and result values. The name of the \class{command} is the main identifier for the command action which is executed in the \class{CommandReceiver} object. There are a number of methods to set/get command parameters: \begin{tabular}{l|l|l} Type & Getter & Setter \\ \hline string & GetStr() & SetStr() \\ int & GetInt() & SetInt() \\ unsigned int & GetUInt() & SetUInt() \\ bool & GetBool() & SetBool() \\ double & GetDouble() & SetDoble() \\ \end{tabular} In all setter methods the first argument is the name of a command parameter, and the second is the new parameter value of the corresponding type. In all getter methods, the first argument is again the parameter name, and the second is an optional default parameter value. This default value is returned if a parameter of that name is not contained in the \class{Command}. To instantiate a command, one should do: \begin{small} \begin{verbatim} ... dabc::Command* cmd = new dabc::Command("UserCommand"); cmd.SetInt("UserArg", 5); ... \end{verbatim} \end{small} Usually, the name of a \class{Command} defines the action which will be performed. There are several subclasses of \class{dabc::Command} (for instance, in file \decl{dabc/Manager.h}), but these subclasses are only used to set the command name and command-specific parameters. There is no sense to define some extra methods in the subclass, since \class{dabc::Command} is designed as a mere container for parameters. With method \func{ConvertToString()} one can convert a \class{Command} and all contained parameters in a plain string. Method \func{ReadFromString()} is used to reconstruct the \class{Command} object from a string. This feature is useful to transfer a \class{Command} over a network connection, or store it to a file. \subsection{Command receiver} Class \class{dabc::CommandReceiver} provides the interface for all classes which should execute a \class{Command}. The main place for user code is virtual method \func{ExecuteCommand()} which gets a \class{Command} object as argument. A typical implementation of this method looks like: \begin{small} \begin{verbatim} int UserModule::ExecuteCommand(dabc::Command cmd) { if (cmd.IsName("UserCommand")) { int v = cmd.GetInt("UserArg", 0); DOUT1(("Execute UserCommand with argument = %d", v)); return dabc::cmd_true; } else if (cmd.IsName("UserGetCommand")) { DOUT1(("Execute UserGetCommand without arguments")); cmd.SetInt("UserRes", fCounter); return dabc::cmd_true; } return dabc::ModuleAsync::ExecuteCommand(cmd); } \end{verbatim} \end{small} Method \func{ExecuteCommand} should analyse the command name and perform command-specific actions. It should return \keyw{dabc::cmd\_true} if the command has been executed succesfully, or \keyw{dabc::cmd\_false} otherwise. The default implementation of \class{dabc::CommandReceiver} methods performs the command execution in the calling thread. However, most command actions may access resources which are also used by another working thread assigned to the \class{CommandReceiver} object. In this case all command execution code had to protect these resources by mutex locks (see section \ref{prog_services_threads_mutex}), which would decrease performance. Because of this, class \class{dabc::Worker} inherits from \class{dabc::CommandReceiver}, and implements several virtual methods (like \func{IsExecutionThread()}, \func{Submit()}) which are necessary to deliver and execute a command in the thread context of the assigned \class{WorkingThread}. The user \strong{must not reimplement} these methods again in the derived classes. In the \dabc~ subclasses of \class{dabc::Worker}, like \class{dabc::Module}, \class{dabc::Application}, the custom commands will be executed in the appropriate thread context. With method \func{Execute()} of class \class{dabc::CommandReceiver} one can execute a command directly in the receiving object. Here one can specify a \class{dabc::Command} object as argument, or just a command name, if the command has no arguments. Method \func{Execute()} will block until the command is executed - this is called the \strong{synchronous} mode of command execution. Optionally, one can set a timeout - how long the calling thread will wait until the command is executed. Method \func{Execute()} can be used not only for command execution, but also for access to result parameters. First of all, one can get result of command execution as integer. Any non-negative value are considered as result of execution (0 == dabc::cmd_false as error) and (1 == dabc::cmd_true as ok). Command execution can return any positive value as normal result of execution. It is also possible to provide any number of result parameters via command itself. For example, the result of command "UserGetCommand" execution from the previous example one can obtain like this: \begin{small} \begin{verbatim} ... dabc::ModuleRef m = dabc::mgr.FindModule("Module1"); dabc::Command cmd("UserGetCommand"); if (m.Execute(cmd)) { int res = cmd.GetInt("UserRes"); } ... \end{verbatim} \end{small} There is an other way to execute a command - submit the \class{Command} with \func{Submit()} method (see also section \ref{prog_manager_framework_commands}) In this case the command will be executed \strong{asynchronous} to the calling thread, therefore one cannot get any direct information about the result of command execution from the return value of \func{Submit()}. \subsection{Command client} FIXME: This is out-of-date part, should be rewritten To really work with asynchronous command execution, one should be able to analyse the result of such commands though. This can be done with class \class{dabc::CommandClient}. Before submitted for execution, commands should be assigned to a \class{dabc::CommandClient} object. In this case, the \class{CommandClient} will get a callback from the \class{Command} when execution is done, and can react on this callback. One can assign more than one \class{Command} to a \class{CommandClient}. A first use case for the \class{CommandClient}: if one needs to execute many commands at once. Using \func{Execute()} method, all commands can be executed \strong{sequentially} only. By means of the \class{CommandClient}, however, one can submit many commands first, and then wait for all of them to be executed. If the associated \class{CommandReceivers} run with different threads, the commands will be executed \strong{in parallel}. For instance: \begin{small} \begin{verbatim} ... dabc::CommandClient cli; for (unsigned n=0; n<10; n++) { dabc::ModuleRef m = dabc::mgr.FindModule(FORMAT(("Module%u",n))); dabc::Command cmd("UserCommand"); cli.Assign(cmd); m.Submit(cmd); } bool res = cli.WaitCommands(); ... \end{verbatim} \end{small} This example submits 10 commands into 10 different modules, and waits at one place until all commands are executed. Another use case for the \class{CommandClient}: it keeps the \class{Command} object after execution and can analyse the contained result values. For instance, all 10 commands from previous example may return several values each. If one instantiates the \class{CommandClient} with \keyw{true} as constructor argument, at the end a list of all commands will be available via \func{ReplyedCmds()} method: \begin{small} \begin{verbatim} ... dabc::CommandClient cli(true); ... bool res = cli.WaitCommands(); DOUT1(("One has %u commands in replyed queue", cli.ReplyedCmds().Size())); ... \end{verbatim} \end{small} FIXME: This is out-of-date part, should be rewritten One more use case of the command client interface is the \class{dabc::CommandsSet}. This class inherits from \class{dabc::CommandClientBase}, the abstract base class for all commands clients. It useful if execution of a "master" command should cause the execution of several other commands. For instance, when execution of a command in one module should be distributed to two other modules, one should do: \begin{small} \begin{verbatim} int UserModule::ExecuteCommand(dabc::Command cmd) { if (cmd.IsName("MasterCommand")) { dabc::CommandsSet* set = new dabc::CommandsSet(thread()); dabc::ModuleRef m1 = dabc::mgr.FindModule("Module1"); m1.Submit(set->Assign(dabc::Command("UserCommand1")); dabc::ModuleRef m2 = dabc::mgr.FindModule("Module2"); m2.Submit(set->Assign(dabc::Command("UserCommand2")); dabc::CommandsSet::Completed(set, 10.); return dabc::cmd_postponed; } return dabc::ModuleAsync::ExecuteCommand(cmd); } \end{verbatim} \end{small} Here one creates a \class{CommandsSet} for a "master" command and submits two "slave" commands via the command client argument to two other modules. Method \func{dabc::CommandsSet::Completed()} is used to inform the framework that all commands are submitted and should be ready within 10 seconds. Return argument \keyw{dabc::cmd\_postponed} indicates that the master command may not be ready when \func{ExecuteCommand()} is returned. Therefore \class{dabc::CommandsSet} will take care about the correct reply of the master command, either when all slaves are ready, or when the master command timeout has expired.