一个生产者对应多个消费者,通常情况下是多个工作线程,来操作缓存队列,由于涉及到数据同步的问题,难免要用到线程锁机制,但是频繁的加锁解锁会有很大的性能开销,本文是设计针对多个工作线程,来同时创建相应的读写队列,使得线程间实现数据的隔离,对于缓存队列的操作,主要是m_rindex和m_windex索引位置的移动(环形队列)
数据传输结构:
struct Node_t //传输数据节点
{
int dataLen;
char data[BUFFERLEN];
};
struct dataBuffer //队列包含NODECOUNT个节点
{
volatile int m_rindex; //标记读的位置
volatile int m_windex; //标记写的位置
struct Node_t node[NODECOUNT];
};
m_rindex和m_windex索引位置操作:
dBuf[index]->m_rindex = (dBuf[index]->m_rindex + 1) % NODECOUNT;
dBuf[index]->m_windex = (dBuf[index]->m_windex + 1) % NODECOUNT;//修改对应数据块的写索引
判断队列满的情况:
if ((dBuf[index]->m_windex + 1) % NODECOUNT == dBuf[index]->m_rindex)//队尾指针在队首指针后面
{
printf("dBuf[%d] is full\n", dBuf[index]);
sleep(1);
continue;
}
判断队列空的情况:
if (dBuf[index]->m_windex == dBuf[index]->m_rindex)//数据列可能为空的时候
{
//printf("No data\n");
sleep(1);
continue;
}
测试源代码:
//#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#define BUFFERLEN 4096
#define NODECOUNT 10
#define THREADCOUNT 5
struct Node_t //传输数据节点
{
int dataLen;
char data[BUFFERLEN];
};
struct dataBuffer //队列包含NODECOUNT个节点
{
volatile int m_rindex; //标记读的位置
volatile int m_windex; //标记写的位置
struct Node_t node[NODECOUNT];
};
void outPut(struct Node_t *nodes);
void (*callback)(struct Node_t *nodes);
struct dataBuffer *dBuf[THREADCOUNT];//一个线程对应一个dataBuffer块
void *func(void *arg)
{
pthread_detach(pthread_self());
int *thread_id = (int *)arg;
int index = *thread_id;
printf("index = %d\n", index);
while(true)
{
if (dBuf[index] == NULL)//线程启动后,可能未被初始化
continue;
if (dBuf[index]->m_windex == dBuf[index]->m_rindex){//数据列可能为空的时候
//printf("No data\n");
sleep(1);
continue;
}
struct Node_t *tmpNode = &dBuf[index]->node[dBuf[index]->m_rindex];//按照m_rindex位置读取相应节点
if (index == 2)
callback(tmpNode);
printf("channel %d:dBuf[%d]->m_rindex=%d m_windex = %d get Data:%s\n", index,index, dBuf[index]->m_rindex, dBuf[index]->m_windex, tmpNode->data);
memset(tmpNode, sizeof(struct Node_t), 0);
dBuf[index]->m_rindex = (dBuf[index]->m_rindex + 1) % NODECOUNT;
}
}
void outPut(struct Node_t *nodes)
{
printf("CallBack get Data:%s\n", nodes->data);
}
int main(int argc, char const *argv[])
{
pthread_t pth[THREADCOUNT];
for (int i = 0; i < THREADCOUNT; i++)
{
pthread_create(&pth[i], NULL, func, (void *)&i);
usleep(200 * 1000);
}
for (int i = 0; i < THREADCOUNT;i++)//内存初始化
{
dBuf[i] = (struct dataBuffer *)malloc(sizeof(struct dataBuffer));
memset(dBuf[i], sizeof(dataBuffer), 0);
}
callback=outPut;//回调注册
for (int i = 0; i < 100; i++)
{
int index = i % THREADCOUNT;
if ((dBuf[index]->m_windex + 1) % NODECOUNT == dBuf[index]->m_rindex)//队尾指针在队首指针后面
{
printf("dBuf[%d] is full\n", dBuf[index]);
sleep(1);
continue;
}
struct Node_t *Nt = (struct Node_t *)malloc(sizeof(struct Node_t));//测试数据构造
char tmp[1024];
sprintf(tmp, "test thread pool %d\0",i);
Nt->dataLen = strlen(tmp) + 1;
memcpy(Nt->data, tmp, Nt->dataLen);
memcpy(&dBuf[index]->node[dBuf[index]->m_windex], Nt, sizeof(struct Node_t));//往指定结构块你存放数据
dBuf[index]->m_windex = (dBuf[index]->m_windex + 1) % NODECOUNT;//修改对应数据块的写索引
sleep(1);
}
for(int i = 0; i < THREADCOUNT; i++)
{
free(dBuf[i]);
}
return 0;
}
编译:g++ test.cpp -lpthread