#ifndef TPM_H
#define TPM_H
#include <iostream>
#include <queue>
#include <windows.h>
#include "RingBuffer.hpp"
//class T
//{
//public:
// T(){}
// virtual ~T(){};
//
// virtual void process()
// {
// std::cout<<"I am processed by thread number : " << std::endl;
// }
//
//};
template <class T> class WorkQueueManager
{
public:
WorkQueueManager(int numTask) {
rb.Init(numTask);
}
virtual ~WorkQueueManager()
{
}
bool EnqueueWork(T* t) {
//lock wait
return rb.set(t);
}
T* DequeueWork() {
//lock wait
T *t =NULL;
t = rb.get();
if (t == NULL)
return NULL;
return t;
}
private:
RingBuffer<T> rb;
};
template <class T> class ThreadPoolManager
{
public:
ThreadPoolManager(int numThread, int numTask = 10, bool run = true) {
mNumThread = numThread;
mWQM = new WorkQueueManager<T>(numTask);
if (run) {
runThreads();
}
}
virtual ~ThreadPoolManager(){
for (int i=0; i<mNumThread; i++) {
WaitForSingleObject(threadPool.back(), INFINITE);
CloseHandle(threadPool.back());
threadPool.pop();
}
}
void runThreads()
{
for (int i=0; i<mNumThread; i++) {
HANDLE tH=CreateThread (NULL, 0,
(LPTHREAD_START_ROUTINE)Process, this, 0, NULL);
threadPool.push(tH);
}
}
static void Process(void* arg) {
ThreadPoolManager *tpm = (ThreadPoolManager*)arg;
while (true) {
T *task = tpm->mWQM->DequeueWork();
if (NULL != task)
task->process();
}
}
void AddTask(T* t) {
mWQM->EnqueueWork(t);
}
public:
WorkQueueManager<T> *mWQM;
int mNumThread;
std::queue<HANDLE> threadPool;
};
#endif // TPM_H
_____________________________________________________________________________
#include <iostream>
#include "ThreadPoolManager.hpp"
class Task//1 : public Task
{
public:
Task():mTNum(0){}
virtual ~Task(){}
void set(int i) { mTNum = i;}
virtual void process()
{
std::cout<<"I am processed by thread number : " << mTNum << std::endl;
}
private:
int mTNum;
};
int main()
{
ThreadPoolManager<Task> tpm(1, 100, false);
Task t[25];
for (int i=0; i<5; i++) {
t[i].set(i);
tpm.AddTask(&t[i]);
}
tpm.runThreads();
for (int i=0; i<2500; i++) {
t[i].set(i);
tpm.AddTask(&t[i]);
Sleep(1);
}
return 0;
}
#define TPM_H
#include <iostream>
#include <queue>
#include <windows.h>
#include "RingBuffer.hpp"
//class T
//{
//public:
// T(){}
// virtual ~T(){};
//
// virtual void process()
// {
// std::cout<<"I am processed by thread number : " << std::endl;
// }
//
//};
template <class T> class WorkQueueManager
{
public:
WorkQueueManager(int numTask) {
rb.Init(numTask);
}
virtual ~WorkQueueManager()
{
}
bool EnqueueWork(T* t) {
//lock wait
return rb.set(t);
}
T* DequeueWork() {
//lock wait
T *t =NULL;
t = rb.get();
if (t == NULL)
return NULL;
return t;
}
private:
RingBuffer<T> rb;
};
template <class T> class ThreadPoolManager
{
public:
ThreadPoolManager(int numThread, int numTask = 10, bool run = true) {
mNumThread = numThread;
mWQM = new WorkQueueManager<T>(numTask);
if (run) {
runThreads();
}
}
virtual ~ThreadPoolManager(){
for (int i=0; i<mNumThread; i++) {
WaitForSingleObject(threadPool.back(), INFINITE);
CloseHandle(threadPool.back());
threadPool.pop();
}
}
void runThreads()
{
for (int i=0; i<mNumThread; i++) {
HANDLE tH=CreateThread (NULL, 0,
(LPTHREAD_START_ROUTINE)Process, this, 0, NULL);
threadPool.push(tH);
}
}
static void Process(void* arg) {
ThreadPoolManager *tpm = (ThreadPoolManager*)arg;
while (true) {
T *task = tpm->mWQM->DequeueWork();
if (NULL != task)
task->process();
}
}
void AddTask(T* t) {
mWQM->EnqueueWork(t);
}
public:
WorkQueueManager<T> *mWQM;
int mNumThread;
std::queue<HANDLE> threadPool;
};
#endif // TPM_H
_____________________________________________________________________________
#include <iostream>
#include "ThreadPoolManager.hpp"
class Task//1 : public Task
{
public:
Task():mTNum(0){}
virtual ~Task(){}
void set(int i) { mTNum = i;}
virtual void process()
{
std::cout<<"I am processed by thread number : " << mTNum << std::endl;
}
private:
int mTNum;
};
int main()
{
ThreadPoolManager<Task> tpm(1, 100, false);
Task t[25];
for (int i=0; i<5; i++) {
t[i].set(i);
tpm.AddTask(&t[i]);
}
tpm.runThreads();
for (int i=0; i<2500; i++) {
t[i].set(i);
tpm.AddTask(&t[i]);
Sleep(1);
}
return 0;
}
No comments:
Post a Comment