Thread Classes -------------- To make it easier to program with threads in C++, I started with the pthread library and wrapped it in a few classes. Some aspects of the pthread library don't work well with object oriented programming, but using these classes is still pretty straight forward. The classes used as the basis for multithreaded programming in Poco are: Class Summary ------------- CThread - The base class from which all thread classes are derived. It is primarily a wrapper around pthread library functions, but does contain some basic thread information (thread ID number) and is responsible for creating and starting all threads. CMutex - This class defines mutual exclusions or mutex. A mutex is an invaluable tool for controlling when threads can access information. CCondVarMutex - This class defines a condition variable and its associated mutex. Condition variables are used when a thread must wait for some other event to occur. For example, a CEventThread often has an empty queue of events. When the queue is empty, the CEventQueue waits on its CCondVarMutex. This class is derived from CMutex. CEventThread - This class defines an event driven thread. It contains a message queue and an event loop. If there is a message on the queue, the event loop removes the message, handles it, and then waits for the next message. CEventThread is derived from CThread and is very dependent on the CCondVarMutex and CMsgQueue classes. CMsgQueue - This class contains a queue and methods of adding and removing CQMsg items from the queue. CQMsg - This class defines the basic form of message for CEventThread. An instance of CQMsg (or an instance of a class derived from CQMsg) is the only type that can be added to a CMsgQueue. CQData - This is the base class for defining data that can be part of a CQMsg. This is a virtual class and can't be used directly. Other classes (like CQDataArray) must be derived from it. CQDataArray - This class is a message container. When it is defined, it can store some number of items of type X. The number of items it can store is set when an instance is created. CQDataArray is a template class derived from CQData. As a template, CQDataArray can store any data type or structure. Predefined templates are the following: CQDataChar, CQDataDouble, CQDataInt. CQDataPQData - This class is a message container built to store CQDataArrays. It is very useful when one message must contains multiple types in its data, and this information is not used often enough to warrant defining a structure to hold it. CQDataPQData is derived from CQData. CThread (thread.h) ------- This class is mostly a wrapper around several of the core pthread functions, but it also contains a few vital connections that allow thread based classes to work. It also stores the pthread ID number, which is used to identify the target thread when calling pthread functions. CThread is only meant to be used as a base for other classes. While using the CThread class as a basis for other classes is fairly simple, there are some unusual issues rising from the mixing of C and C++ code. The first unpleasant piece of the pthread library is that they chose to use different functions to setup the attributes of threads before the thread starts, and a different group of function to change the attributes after the thread has started. To clarify this in the class, all member functions that deal with thread attributes before the thread starts, start with attr_. Functions that change the state of the thread after it has started don't have prefixes. Overall this isn't a great inconvenience as each these functions are rarely called, but I'm not particularly pleased with this. Of greater concern is that the pthread library function that starts a thread is this: pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start routine)(void *), void *arg); That C++ doesn't like pointers to void. This generates a warning that is very difficult to remove. This is a nuisance, but has no effect on program execution. The second problem is that C++ only allows pointers to functions if the function is not a member function of any class, or if the function is a static member of a class. A static function in a class is a function that does not use any non-static member variables of that class (i.e.. It can be called directly without any association to an instance of its class). This makes for interesting hurdle. To get around the problem CThread, and every class based from CThread must have a static member function called connector(void *thispointer), and must set the m_coreProcConnector member variable to point at its own connector function. Here is a brief example of what a derived class must contain. header_file #include "thread.h" // This class creates a timer that writes 'a' // to the file descriptor m_fd every m_fDelay miliseconds class CTimerThread : public CThread { public: CTimerThread(); virtual ~CTimerThread(){}; void timerLoop(); void stop(); int m_fd; double m_fDelay; static void connector(void *thispointer); protected: bool m_bStop; CMutex m_mtxStop; //mutex to protect m_bStop }; main_file void CTimerThread::connector(void *thisPointer) { ((CThread*)thisPointer)->updateThreadId(); ((CTimerThread*)thisPointer)->timerLoop(); CThread::selfDelete((CThread*)thisPointer); } CTimerThread::CTimerThread() { m_fd = stdout; m_fDelay = 20.0; // milliseconds m_bStop = false; m_coreProcConnector = connector; } void CTimerThread::stop() { m_mtxStop.lock(); m_bStop = true; m_mtxStop.unlock(); } void CTimerThread::timerLoop() { m_mtxStop.lock(); while (!m_bStop) { m_mtxStop.unlock(); usleep((int)(m_fDelay*1000)); write(m_fd,"a"); m_mtxStop.lock(); } } main() { CTimerThread *pTimerThread = new CTimerThread(); pTimerThread->m_fDelay = 1000; // 1 sec delay pTimerThread->run(); pTimerThread->detach(); sleep(7000); pTimerThread->stop(); sleep(7); } Looking at main(), we see that pTimerThread is created. The CTimerThread constructor sets m_coreProcConnector to CTimerThread::connector(void *thisPointer)). The default m_fDelay set by the constructor is no good, so main() sets it to 1000 milliseconds, and then calls CThread::run(). CThreadRun calls pthread_create(&m_Thread, &m_Attr,m_coreProcConnector, (void*)this); When the thread is created, it's ID will be placed in m_Thread, it will use the attributes in m_Attr, and it will call the function pointed at by m_coreProcConnector (in this case CTimerThread::connector) with 'this' as the argument to connector. ('this' is part of the C++ language and in this case 'this' = pTimerThread). Connector then uses thisPointer to call CTimerThread::timerLoop(), which will run until main() stops sleeping and calls pTimerThread->stop(). Please notice that it casts thisPointer to (CTimerThread*), which ensures that we get the CTimerThread::timerLoop function and not an error. Using a mutex here isn't really essential on a single processor computer. Only one thread writes to m_bStop while all other threads simply read it, which doesn't cause conflicts. It can be important on a multiple processor system as mutex force private memory for each processor to sync. pTimerThread->detach() will cause the thread to not communicate with main with the join command when it completes. This is not always a good idea as it makes it much easier to delete threads, however the main thread would block until pTimerThread finished. When the thread exits the timerLoop function, it will then call CThread::selfDelete((CThread*)thisPointer) from connector. selfDelete will cause the thread to sleep for 5 seconds and then delete the memory it was using. The thread will then terminate. It is worth noting that deleting threads is a tricky business. If a thread is deleted while it is running, it causes a segmentation fault. If you cancel a thread and delete it immediately, it is difficult to inform all other threads that the thread no longer exists and it is easy to get more segmentation faults. In this case there is a small possibility that main() will finish before selfDelete(), which isn't cause for alarm here, but it can be very important. Such cases need to use some form of a barrier to make sure selfDelete completes before main. Another option for stopping the timer thread would be to change the last 2 lines of main to pTimerThread->cancel(); sleep(1); delete pTimerThread; In this case selfDelete is not called. CThread Public functions ------------------------ CThread() - constructor virtual ~CThread() - destructor void initialize() - sets initial values, called by constructor int run() - This function creates the thread and starts it running. int detach() - Lets the system know that the thread resources can be freed when it terminates, without waiting for a join. //virtual int join(void*); - not implemented, but a local version must exist for a class derived from CThread if the thread is not detached. int cancel(); - This function stops execution of the thread at the first point the thread blocks (mutex, sleep, I/O command, etc.) pthread_t getThreadID(); - this function returns the thread ID static void connector(void *thisPointer); - This function connects the thread class to the thread created in the run() function. static void selfDelete(CThread*); - This function deletes the class that is passed to it as an argument after 5 seconds. CMutex (mutex.h) ------- This class is essentially a wrapper around the pthread mutex functions, and is used to protect data from multiple threads accessing data at the same time. If used correctly, it only allows one thread to access data at any given time. If used improperly they can cause deadlock, and degrade system performance. Each CMutex should be associated with a specific variable or group of variables, as m_mtxStop is associated with the m_bStop variable in the CThread example. If multiple mutex must be locked at the same time it is very important to establish a lock hierarchy (always lock mutex A before mutex B) or a way to 'back out' of a lock or there's a risk of deadlock. CMutex uses a fast mutex, and one thread should avoid locking the same mutex multiple times before unlocking. The number of mutex and how often they are locked should be kept to a minimum as locking and unlocking mutex can eat up a lot of system time. CMutex Public Functions ----------------------- CMutex() - constructor, creates the mutex ~CMutex() - destructor, destroys the mutex void initialize() - This function initializes the mutex and is called by the constructor. int lock(); - This function blocks the thread until it can lock the mutex int tryLock(); - This function locks the mutex if it is unlocked, otherwise it returns EBUSY and lets execution continue. int unlock(); - This function releases the mutex allowing another thread to lock it. CCondVarMutex (mutex.h) ----------------------- This class is derived from CMutex and is used to send signals between different threads. For example, Thread A needs to wait for information from thread B. Thread A could sit there and poll thread B until the information is ready (and waste a great deal of CPU time) or it can wait on a Condition Variable. The simplest case, one thread is just waiting for another thread to signal Waiting thread Signaling Thread CondVar->lock() CondVar->lock() CondVar->wait() CondVar->signal() CondVar->unlock() CondVar->unlock() A better example is this simplified code from CEventThread. If a message is on the m_MsgQ, it is handled immediately, otherwise the thread waits on the condition variable m_pCdMtxQ until que_Event is called by another thread. m_pCdMtxQ is of type CCondVarMutex, and is meant to protect and signal for the message queue m_MsgQ. int CEventThread::checkEvents() { CQMsg *pQMsg; int status = 0; detach(); // release system resources when this function completes while (!m_bExit) { // oddly enough, we need the mutex lock more for the signal // then to protect the queue's integrity m_pCdMtxQ->lock(); if(m_MsgQ.getMsg(&pQMsg) >= 0) { m_pCdMtxQ->unlock(); status = handleQMsg(pQMsg); } else { status = m_pCdMtxQ->wait(); //wait forever m_pCdMtxQ->unlock(); if (status != 0) { if (status == ETIMEDOUT && m_nSecToWait != 0) vTimedOut(); else threadErr("CEventThread::checkEvents, wait failed"); } } } return 0; } int CEventThread::que_Event(CQMsg *pqmsg) { m_pCdMtxQ->lock(); m_MsgQ.queMsg(pqmsg); status = m_pCdMtxQ->signal(); m_pCdMtxQ->unlock(); return status; } The CEventThread::checkEvents() function loops until m_bExit is equal to true. Each time through the loop it locks the condition variable m_pCdMtxQ. If there is a message to be handled, it unlocks the condition variable, then handles the message. If there is no message to handle, m_pCdMtxQ is commanded to wait() for a signal() on m_pCdMtxQ. The CEventThread::que_Event function adds a message to the que. While it is doing this it sends a signal to m_pCdMtxQ. This signal is picked up by the wait() function and the checkEvents thread resumes. CCondVarMutex Public Functions (based on CMutex) ------------------------------------------------- CCondVarMutex(); - constructor (calls initialize) ~CCondVarMutex(); - destructor void initialize(); - initializes the condition variable int wait(); - wait for a signal() forever int timedWait(struct timespec *t_limit); - wait for a signal until time limit t_limit int signal(); - send a signal() that restarts one thread waiting on this condition variable. int broadcast(); - sends a signal that restarts all threads waiting on this condition signal. CEventThread (eventthread.h based on CThread) ------------------------------------------------ The CEventThread class builds on the CThread class to provide a base for event driven threads. The CEventThread class provides a message queue, signal, mutex, and functions to help add messages to the queue and functions to help handle those messages. It also provides some hooks so that the thread can be started and stopped in a reasonably clean fashion. While this is one of the more complex classes, most of the complexity is within the class itself. The CEventThread class handles the event loop, message queue and most of issues caused by multiple threads. Child classes have to define messages they need and functions to handle them. To help keep the code clear and manageable, all functions that add a message to the event queue start with 'que' and are public. Each 'que' function has a corresponding 'do' function, which is protected. For example, there could be a queBar(int x) function with a corresponding doBar(CQMsg *pqmsg). The queBar function would create a message with the appropriate message ID and an integer data member and place it on the message queue. When the message reaches the top of the queue, the local vMsgHandler function recognizes the message ID and passes it to the doBar function which deciphers the rest of the message and then handles it. There are a few advantages to using the 'que' and 'do' convention. Since the 'do' functions are protected, it provides a modest barrier to other threads accessing value without going through a mutex or some other method to protect the data's integrity. It also has the advantage that message construction occurs in the 'que' function so that errors are localized and duplicate code is reduced (both common sources of errors). Lastly, it is easy to tell when the code is contacting another thread. Here is an example of a very simple CEventThread based class. It writes characters to file when it gets a message. header file #include "eventthread.h class CFileWriter : public CEventThread { public: CFileWriter(int FD, CEventThread *pMaster); queWrite(char *buf, int length); enum { WRITE_TO_FILE = 100, LAST_ENUM // always last enum }; protected: virtual int vMsgHandler(CQMsg *pqmsg); virtual int vLastRequests(); int doWrite(CQMsg *pqmsg); int m_fd; CEventThread *m_pMaster; }; program file int CFileWriter::queWrite(char *buf, int lngth) { CQData *pData = new CQDataChar(buf, lngth); CQMsg *pqmsg = new CQMsg(WRITE_TO_FILE, pData); return que_Event(pqmsg); } int CFileWriter::doWrite(CQMsg *pqmsg) { CQDataChar *pData = (CQDataChar*)(pqmsg->getData()); char *buf = pData->getArray(); int lngth = pData->getLength(); int bytes = write(m_fd, buf, lngth); if (bytes < lngth) { if (pMaster) pMaster->vQueFileErr(m_fd, this,0); return -1; } return 0; } int CFileWriter::vMsgHandler(CQMsg *pqmsg) { switch (pqmsg->getType()) { case WRITE_TO_FILE: doWrite(pqmsg); break; default: printf("unknown message\n"); return -1; } return 0; } int CFileWriter::vLastRequests() { close(m_fd); return 0; } This simple sample class writes text to a file descriptor that is already open. If it encounters and error while writing, it sends an error to its master (if pMaster != NULL that is). Another thread that wants to write to the file writer, would call pFileWriter->queWrite("hello",5); queWrite would place the text in a CData structure, put the data structure in a message with the message number WRITE_TO_FILE, and places it on the queue using the CEventThread::que_Event(CQMsg*) function. The CEventThread::checkEvents() function eventually pulls the message from the queue. Since WRITE_TO_FILE is greater then 99, it passes the message to the CFileWriter::vMsgHandler function. vMsgHandler recognizes the message number and passes the message to doWrite. doWrite pulls the string out of the messages data member and writes it to the file. When doWrite and vMsgHandler finish CEventThread::checkEvents() DELETES THE MESSAGE. When pFileWrite->que_Halt() is called, checkEvents() catches the message and calls the vLastRequests function. Which closes the file descriptor in this case. The class then deletes itself after a few seconds. CEventThread uses several virtual functions to allow child classes to alter its behavior. Each child class should contain virtual functions to get the desired behavior. Any child must define the vMsgHandler virtual function. Message ID numbers from 0 to 99 are reserved for the CEventThread class. Child classes can define their own message numbers starting with 100. The message ID numbers used by each child can overlap in most cases. CEventThread public function (based on CThread) ----------------------------------------------- CEventThread(); - constructor, calls initialize() virtual ~CEventThread(); - destructor void initialize(); - initializes member variables, called by CEventThread() static void connector(void *thisPointer); - Connects the new thread to its class. int que_Event(CQMsg *qmsg); - This function is called to add another message to the message queue. It also handles signaling for the message queue. int que_Halt(); - This function causes the checkEvents() function to stop checking events, which causes the thread to stop executing. CEventThread Protected Virtual Functions ---------------------------------------- virtual int vMsgHandler(CQMsg *pqmsg) - This function must be defined in any child class as only the child class knows how to handle its own messages. virtual int vTimedOut(); - This optional virtual function can define an appropriate course of action if no messages received in a reasonable time frame virtual int vStartupActions(); - This optional virtual function can be used take actions before the main event loop starts. virtual int vLastRequests(); - This optional virtual function can be used to take certain actions after the main event loop stops but before the thread terminates. CEventThread Optional Public Virtual Functions ---------------------------------------------- virtual int vQueSockMsg(CEventThread*, CQData*); - An optional virtual function that can be used to handle socket messages virtual int vQueFileRead(int fd, CQData *pData); - An optional virtual function that can be used to handle file descriptor read events. virtual int vQueSockAcceptEvent(CQDataPQData *pData); - An optional virtual function that can be used to handle socket accepted events. virtual int vQueFileErr(int fd, CEventThread *pFileThread, int err); - An optional virtual function to handle file error events. CMsgQueue (msgq.h) ------------------ This class is a container for a queue of CQMsg pointers. CMsgQueue Public Functions -------------------------- CMsgQueue() - constructor ~CMsgQueue() - destructor queMsg(CQMsg*) - Add a message pointer to the back of the queue. getMsg(CQMsg**) - Get a message pointer from the front of the queue, removing the message from the queue in the process. getSize() - Returns the number of message pointers in the queue. CQMsg (qmsg.h) -------------- This class is used to store information related to a single message between threads. Each message has a type, subtype, and a pointer to a CData instance. Type is normally set to the message Id number, and subtype can be used for whatever purpose the user sees fit. These default to type = 99, subtype = 0, and data = NULL. CQMsg Public Functions ---------------------- CQMsg() - Constructor, sets m_nType = 99, m_nSubType = 0, and m_pData = NULL. CQMsg(int type) - Constructor CQMsg(int type, int subtype) - Constructor CQMsg(int type, CQData *pData) - Constructor CQMsg(int type, int subtype, CQData *pData) - Constructor ~CQMsg() - Destructor. int getType() - Returns message type, usually the message Id number. int getSubtype() - Returns the message subtype. CQData *getData() - Returns a pointer to the messages CData class. void getString(string *str) - Returns a string containing the type and subtype. CQData (qmsg.h) --------------- CQData is a very simple class who's only function is to provide a common base class for all the other classes use to store data when passing messages. Using the CQData family of functions is not as efficient in CPU time as some other methods, but it benefits form a great deal of type checking and valid memory checks, which prove to be very powerful in avoiding memory leaks and segmentation faults. CQData Public Functions ----------------------- CQData() - Constructor virtual ~CQData - Destructor. CQDataArray