blockingqueue.hpp
#pragma once
#include<iostream>
#include<semaphore.h>
#include<vector>
using namespace std;
template<class T>
class BlockingQueue
{
public:
BlockingQueue(int maxsize=100)
:_queue(maxsize)
,_head(0)
,_tail(0)
,_size(0)
,_maxsize(maxsize)
{
sem_init(&_lock,0,1);
sem_init(&_used,0,0);
sem_init(&_rem,0,maxsize);
}
~BlockingQueue()
{
sem_destroy(&_lock);
sem_destroy(&_used);
sem_destroy(&_rem);
}
void Push(const T& data)
{
sem_wait(&_rem);
sem_wait(&_lock);
_queue[_tail]=data;
_tail=(_tail+1)%_maxsize;
++_size;
sem_post(&_lock);
sem_post(&_used);
}
void Pop(T& data)
{
sem_wait(&_used);
sem_wait(&_lock);
data=_queue[_head];
_head=(_head+1)%_maxsize;
--_size;
sem_post(&_lock);
sem_post(&_rem);
}
private:
int _head;
int _tail;
int _size;
int _maxsize;
vector<T> _queue;
sem_t _lock;
sem_t _used;
sem_t _rem;
};
threadpool.hpp
#pragma once
#include"blockingqueue.hpp"
class Stack
{
public:
virtual void Run()
{
cout<<"Stack-----\n";
}
virtual ~Stack()
{}
};
class ThreadPool
{
public:
ThreadPool(int thread_size,int queue_size=100)
:_queue(queue_size)
,_size(thread_size)
{
for(int i=0;i<_size;++i)
{
pthread_t tid;
pthread_create(&tid,NULL,PthreadEntry,this);
tid_arr.push_back(tid);
}
}
~ThreadPool()
{
for(int i=0;i<_size;++i)
{
pthread_cancel(tid_arr[i]);
}
for(int i=0;i<_size;++i)
{
pthread_join(tid_arr[i],NULL);
}
}
void AddStack(Stack* ptr)
{
_queue.Push(ptr);
}
private:
BlockingQueue<Stack*> _queue;
int _size;
vector<pthread_t> tid_arr;
static void* PthreadEntry(void* arg)
{
ThreadPool* ptr=(ThreadPool*)arg;
while(1)
{
Stack* pst=NULL;
ptr->_queue.Pop(pst);
pst->Run();
delete pst;
}
}
};
main.cc
#include"threadpool.hpp"
#include<sys/syscall.h>
#include<unistd.h>
#include<stdio.h>
class MyStack:public Stack{
public:
MyStack(int n)
:_id(n)
{}
void Run()override
{
printf("MyStack---tid=%d,id=%d\n",syscall(SYS_gettid),_id);
}
~MyStack()override
{}
private:
int _id;
};
int main()
{
ThreadPool pool(10);
for(int i=0;i<20;++i)
{
pool.AddStack(new MyStack(i));
}
while(1)
sleep(1);
return 0;
}