Search This Blog

19.3.17

Thread Pool

#ifndef TPM_H
#define TPM_H

#include <iostream>
#include <queue>
#include <windows.h>
#include "RingBuffer.hpp"


//class T
//{
//public:
// T(){}
// virtual ~T(){};
//
// virtual void process()
// {
// std::cout<<"I am processed by thread number : " << std::endl;
// }
//
//};



template <class T> class WorkQueueManager
{
public:
WorkQueueManager(int numTask) {
rb.Init(numTask);
}

virtual ~WorkQueueManager()
{
}

bool EnqueueWork(T* t) {
//lock wait

return rb.set(t);
}

T* DequeueWork() {
//lock wait
T *t =NULL;
t = rb.get();
if (t == NULL)
return NULL;
return t;
}

private:
RingBuffer<T> rb;
};



template <class T> class ThreadPoolManager
{
public:
ThreadPoolManager(int numThread, int numTask = 10, bool run = true) {
mNumThread = numThread;
mWQM = new WorkQueueManager<T>(numTask);

if (run) {
runThreads();
}
}

virtual ~ThreadPoolManager(){
for (int i=0; i<mNumThread; i++) {
WaitForSingleObject(threadPool.back(), INFINITE);
CloseHandle(threadPool.back());
threadPool.pop();
}
}

void runThreads()
{
for (int i=0; i<mNumThread; i++) {
HANDLE tH=CreateThread (NULL, 0,
(LPTHREAD_START_ROUTINE)Process, this, 0, NULL);
threadPool.push(tH);
}
}

static void Process(void* arg) {
ThreadPoolManager *tpm = (ThreadPoolManager*)arg;
while (true) {
T *task = tpm->mWQM->DequeueWork();
if (NULL != task)
task->process();
}
}

void AddTask(T* t) {
mWQM->EnqueueWork(t);
}


public:
WorkQueueManager<T> *mWQM;
int mNumThread;
std::queue<HANDLE> threadPool;
};


#endif // TPM_H
_____________________________________________________________________________

#include <iostream>
#include "ThreadPoolManager.hpp"


class Task//1 : public Task
{
public:
Task():mTNum(0){}
virtual ~Task(){}
void set(int i) { mTNum = i;}
virtual void process()
{
std::cout<<"I am processed by thread number : " << mTNum << std::endl;
}
private:
int mTNum;
};


int main()
{
ThreadPoolManager<Task> tpm(1, 100, false);

Task t[25];

for (int i=0; i<5; i++) {
t[i].set(i);
tpm.AddTask(&t[i]);
}

tpm.runThreads();

for (int i=0; i<2500; i++) {
t[i].set(i);
tpm.AddTask(&t[i]);
Sleep(1);
}

return 0;
}



No comments: