C语言 - 数据队列设计
原理
最基础的使用Demo
// 再次封装一层 queue.
typdef struct {
int size; // 描述 buffer 多大
audio_queue_t queue; // queue 实体
char *base; // buffer 实体
}Queue;
Queue *q = malloc(sizeof(Queue));
// 1. 初始化 queue buffer
q->size = 1024;
q->base= malloc((sizeof(audio_queue_t ) + q->size + 1 * sizeof(char)));
if (!q->base) {
goto err_malloc_queuebuffer;
}
q->queue = queue_init(q->base, q->size + 1);
// 2. 将数据写入 queue
Queue *q = getQueue(xxx);
queue_write(q->queue, data, data_size);
// 3. 读取 queue 里面的数据
Queue *q = getQueue(xxx);
queue_read(q->queue, outbuffer, outsize);
// 4. 释放 queue
Queue *q = getQueue(xxx);
if (NULL != q->queue) {
queue_destroy(q->queue);
q->queue = NULL;
free(q->base);
q->base= NULL;
}
free(q);
代码
- 头文件定义
/*
* AudioQueue.h
*
* Created on: 2015年8月17日
* Author: ican
*/
#ifndef AUDIOQUEUE_H_
#define AUDIOQUEUE_H_
#include <string.h>
#include <pthread.h>
#include <stdbool.h>
/**
* 队列数据结构:|***控制块***|***数据区***|
*/
typedef struct audio_queue_t {
pthread_mutex_t mutex; // 互斥锁
pthread_cond_t cond; // 条件锁
int capacity; // 队列容量
int front; // 队头索引
int rear; // 队尾索引
int more; // 写完标记
} audio_queue_t;
#ifdef __cplusplus
extern "C" {
#endif
/**
* 将一块内存初始化为队列数据结构
*
* base 分配给队列的内存块首地址,内存长度为sizeof(audio_queue_t) + capacity
* capacity 队列数据区大小,队列实际容量为capacity - 1
*/
audio_queue_t* queue_init(void* base, int capacity);
char* queue_get_write_addr(audio_queue_t* queue);
void queue_destroy(audio_queue_t* queue);
int queue_real_capacity(audio_queue_t* queue);
int queue_front(audio_queue_t* queue);
int queue_rear(audio_queue_t* queue);
int queue_len(audio_queue_t* queue);
int queue_len_unlock(audio_queue_t* queue);
int queue_left(audio_queue_t* queue);
int queue_left_unlock(audio_queue_t* queue);
int queue_empty(audio_queue_t* queue);
int queue_full(audio_queue_t* queue);
int queue_write(audio_queue_t* queue, char data[], int dataLen);
int queue_read(audio_queue_t* queue, char data[], int readLen);
void queue_set_more(audio_queue_t* queue, int more);
int queue_get_more(audio_queue_t* queue);
#ifdef __cplusplus
}
#endif
#endif /* AUDIOQUEUE_H_ */
逻辑实现
/*
* AudioQueue.cpp
*
* Created on: 2015年8月17日
* Author: ican
*/
#include "AudioQueue.h"
#include <android/log.h>
/**
* base 分配给队列的内存块首地址,内存长度为sizeof(audio_queue_t) + capacity
* capacity 队列数据区大小,队列实际容量为capacity - 1
*/
audio_queue_t* queue_init(void* base, int capacity)
{
if (NULL == base) {
return NULL;
}
audio_queue_t* queue = (audio_queue_t*) base;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_PRIVATE);
pthread_mutex_init(&(queue->mutex), &attr);
pthread_condattr_t condattr;
pthread_condattr_init(&condattr);
pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_PRIVATE);
pthread_cond_init(&(queue->cond), &condattr);
queue->capacity = capacity;
queue->front = 0;
queue->rear = 0;
queue->more = true;
return queue;
}
char* queue_get_write_addr(audio_queue_t* queue)
{
if (NULL != queue) {
char* queueBase = (char*)(queue + 1);
char* begin = &((queueBase)[queue->rear]);
return begin;
}
return NULL;
}
void queue_destroy(audio_queue_t* queue)
{
if (NULL != queue) {
pthread_mutex_destroy(&(queue->mutex));
}
}
int queue_real_capacity(audio_queue_t* queue)
{
return queue->capacity - 1;
}
int queue_front(audio_queue_t* queue)
{
pthread_mutex_lock(&(queue->mutex));
int front = (queue->front);
pthread_mutex_unlock(&(queue->mutex));
return front;
}
int queue_rear(audio_queue_t* queue)
{
pthread_mutex_lock(&(queue->mutex));
int rear = (queue->rear);
pthread_mutex_unlock(&(queue->mutex));
return rear;
}
int queue_len(audio_queue_t* queue)
{
pthread_mutex_lock(&(queue->mutex));
int len = (queue->rear - queue->front + queue->capacity) % queue->capacity;
pthread_mutex_unlock(&(queue->mutex));
return len;
}
int queue_len_unlock(audio_queue_t* queue)
{
return (queue->rear - queue->front + queue->capacity) % queue->capacity;
}
int queue_left(audio_queue_t* queue)
{
pthread_mutex_lock(&(queue->mutex));
int left = queue->capacity - 1 - queue_len_unlock(queue);
pthread_mutex_unlock(&(queue->mutex));
return left;
}
int queue_left_unlock(audio_queue_t* queue)
{
return queue->capacity - 1 - queue_len_unlock(queue);
}
int queue_empty(audio_queue_t* queue)
{
pthread_mutex_lock(&(queue->mutex));
int empty = queue->rear == queue->front;
pthread_mutex_unlock(&(queue->mutex));
return empty;
}
int queue_full(audio_queue_t* queue)
{
pthread_mutex_lock(&(queue->mutex));
int full = ((queue->rear + 1 + queue->capacity) % queue->capacity == queue->front);
pthread_mutex_unlock(&(queue->mutex));
return full;
}
int queue_write(audio_queue_t* queue, char data[], int dataLen)
{
if (queue == NULL || data == NULL || dataLen <= 0) {
return false;
}
pthread_mutex_lock(&(queue->mutex));
int queueLeft = queue_left_unlock(queue);
if (queueLeft < dataLen) {
pthread_mutex_unlock(&(queue->mutex));
__android_log_print(ANDROID_LOG_DEBUG, "audio_queue", "unenough queue space, left=%d", queueLeft);
return false;
}
// 计算数据区起始地址
char* queueBase = (char*)(queue + 1);
char* begin = &((queueBase)[queue->rear]);
if (queue->rear + dataLen <= queue->capacity - 1) {
memcpy(begin, data, dataLen);
} else {
// 分两段写入
int dataLen1 = queue->capacity - 1 - queue->rear + 1;
int dataLen2 = dataLen - dataLen1;
memcpy(begin, data, dataLen1);
memcpy(queueBase, data + dataLen1, dataLen2);
}
queue->rear = (queue->rear + dataLen) % queue->capacity;
pthread_mutex_unlock(&(queue->mutex));
pthread_cond_signal(&(queue->cond));
return true;
}
int queue_read(audio_queue_t* queue, char data[], int readLen)
{
if (queue == NULL || data == NULL || readLen <= 0) {
return 0;
}
pthread_mutex_lock(&(queue->mutex));
// 计算数据区起始地址
char* queueBase = (char*)(queue + 1);
char* begin = &((queueBase)[queue->front]);
int queueLen = queue_len_unlock(queue);
// if (readLen > queueLen) {
// readLen = queueLen;
// }
// 队列中的数据长度不够,则继续等待
while (queueLen < readLen) {
pthread_cond_wait(&(queue->cond), &(queue->mutex));
queueLen = queue_len_unlock(queue);
}
if (queue->front + readLen <= queue->capacity - 1) {
memcpy(data, begin, readLen);
} else {
// 分两段读取
int readLen1 = queue->capacity - 1 - queue->front + 1;
int readLen2 = readLen - readLen1;
memcpy(data, begin, readLen1);
memcpy(&data[readLen1], queueBase, readLen2);
}
queue->front = (queue->front + readLen) % queue->capacity;
pthread_mutex_unlock(&(queue->mutex));
return readLen;
}
void queue_set_more(audio_queue_t* queue, int more)
{
pthread_mutex_lock(&(queue->mutex));
queue->more = more;
pthread_mutex_unlock(&(queue->mutex));
}
int queue_get_more(audio_queue_t* queue)
{
pthread_mutex_lock(&(queue->mutex));
int end = queue->more;
pthread_mutex_unlock(&(queue->mutex));
return end;
}