00001 #ifndef __THREAD_H__ 00002 #define __THREAD_H__ 00003 00004 #include <stdint.h> 00005 #include <queue> 00006 #include <map> 00007 #include <set> 00008 00009 #include "stm_export.h" 00010 #include "threadutils.h" 00011 00012 namespace thread { 00013 00014 class Condition; 00015 class Farmer; 00016 class ThreadPool; 00017 00019 00020 class STM_EXPORT Job 00021 { 00022 public: 00023 Job() 00024 : m_processed(false) 00025 { 00026 } 00027 00028 virtual ~Job() 00029 { 00030 } 00031 00032 bool Processed(void) const 00033 { 00034 return m_processed; 00035 } 00036 00037 void DoProcess(void) 00038 { 00039 Process(); 00040 m_processed = true; 00041 } 00042 00043 protected: 00044 virtual void Process(void) = 0; 00045 00046 protected: 00047 bool m_processed; 00048 }; 00049 00051 00052 class STM_EXPORT JobQueue : public Fifo<Job> 00053 { 00054 }; 00055 00057 00058 class STM_EXPORT WorkerThread : public Thread 00059 { 00060 public: 00061 WorkerThread(JobQueue &); 00062 ~WorkerThread(); 00063 00064 void Push(Job *); 00065 void Terminate(void); 00066 00067 protected: 00068 virtual void *MainLoop(void); 00069 00070 volatile bool m_terminate; 00071 JobQueue m_input; 00072 JobQueue &m_output; 00073 }; 00074 00076 00077 class STM_EXPORT ThreadPool : public Thread 00078 { 00079 public: 00080 00081 static ThreadPool *GetInstance(void); 00082 static void Shutdown(void); 00083 static bool CanUsePool(void); 00084 00085 void Push(Job *, Farmer *); 00086 00087 void RegisterFarmer(Farmer *); 00088 void UnRegisterFarmer(Farmer *); 00089 00090 void SetNumThreads(size_t); 00091 00092 private: 00093 bool _CanUsePool(void); 00094 virtual void *MainLoop(void); 00095 00096 ThreadPool(); 00097 ~ThreadPool(); 00098 void Resize(size_t); 00099 void Terminate(void); 00100 void Join(void); 00101 00102 void AssignJobs(void); 00103 void TrimThreads(void); 00104 00105 STM_THREAD_TYPE m_thread; 00106 00107 Mutex m_lock; 00108 bool m_allowJobs; 00109 00110 std::map<Job *, Farmer *> m_destinations; 00111 std::map<Job *, size_t> m_jobs; 00112 00113 std::set<Farmer *> m_farmers; 00114 std::set<size_t> m_activeThreads; 00115 std::set<size_t> m_idleThreads; 00116 std::map<size_t, WorkerThread*> m_threads; 00117 00118 std::queue<Job *> m_pending; 00119 size_t m_targetThreadCount; 00120 size_t m_nextThreadId; 00121 JobQueue m_queue; 00122 volatile bool m_terminate; 00123 }; 00124 00126 00127 class STM_EXPORT Farmer 00128 { 00129 friend class WorkerThread; 00130 friend class ThreadPool; 00131 public: 00132 Farmer(); 00133 virtual ~Farmer(); 00134 00135 void Push(Job *); 00136 void Process(void); 00137 protected: 00138 virtual void ProcessCompletedJob(Job *); 00139 JobQueue &GetPending(void) {return m_pending;} 00140 JobQueue &GetComplete(void){return m_complete;} 00141 bool GetTerminate(void) const {return m_terminate;} 00142 00143 size_t GetNumSubmitted(void) const{return m_numSubmitted;} 00144 size_t GetNumCompleted(void) const{return m_numCompleted;} 00145 00146 private: 00147 00148 JobQueue m_pending, m_complete; 00149 size_t m_numSubmitted, m_numCompleted; 00150 bool m_terminate; 00151 ThreadPool *m_pool; 00152 }; 00153 00154 00155 } 00156 #endif