平台 | 内核版本 |
---|---|
RK1108 | Linux3.1 |
在状态机这篇文章中,异步消息的收发是通过队列实现,下面我们看下队列的实现:
队列结构体
typedef struct _Queue {
struct _QueuePriv *priv;
void (*post)(struct _Queue *,void *data);
int (*get)(struct _Queue *,void *data);
void (*setDataSize)(struct _Queue *,int size);
void (*destroy)(struct _Queue *This);
}Queue;
其中_QueuePriv
定义如下:
#define MAX_COMMAND_QUEUE_SIZE 32
typedef struct _QueuePriv {
char name[32];
QueueType type;
unsigned int size;
int queue;
void *buf[MAX_COMMAND_QUEUE_SIZE];
sem_t *sem; //信号量
int current_length; // 当前fifo已存储的长度
int index_post; //索引
int index_get; //索引
pthread_mutex_t mutex; //队列控制互斥信号
pthread_mutexattr_t mutexattr2;
}QueuePriv;
其中QueueType
为:非阻塞和阻塞
typedef enum QueueType {
QUEUE_NONBLOCK,
QUEUE_BLOCK,
}QueueType;
操作函数:
/* ----------------------------------------------------------------*/
/**
* @brief queueCreate 创建对列
* * @param Size 参数大小
* * @returns
*/
/* ----------------------------------------------------------------*/
Queue * queueCreate(const char *queue_name,QueueType type,unsigned int Size)
{
Queue * This = (Queue *)calloc(1,sizeof(Queue));
if(This == NULL) {
printf("fifo calloc failed !!!\n");
return NULL;
}
This->priv = (QueuePriv *)calloc(1,sizeof(QueuePriv));
if(This == NULL) {
printf("fifo priv calloc failed !!!\n");
free(This);
return NULL;
}
This->priv->type = type;
if (This->priv->type == QUEUE_NONBLOCK) {
strcpy(This->priv->name,queue_name);
This->priv->queue = msgget((key_t)1234,0666|IPC_CREAT);
if (This->priv->queue < 0)
fprintf(stderr, "mq_create failed: %s\n", strerror(errno));
} else {
//设置互斥锁属性
pthread_mutexattr_init(&This->priv->mutexattr2);
/* Set the mutex as a recursive mutex */
pthread_mutexattr_settype(&This->priv->mutexattr2, PTHREAD_MUTEX_RECURSIVE_NP);
// PTHREAD_MUTEX_RECURSIVE_NP);
/* create the mutex with the attributes set */
pthread_mutex_init(&This->priv->mutex, &This->priv->mutexattr2);
/* destroy the attribute */
pthread_mutexattr_destroy(&This->priv->mutexattr2);
This->priv->sem = (sem_t *) calloc(1,sizeof(sem_t));
//读信号量初始化为0
sem_init (This->priv->sem, 0,0);
This->priv->current_length = 0;
This->priv->index_get = 0;
This->priv->index_post = 0;
}
This->priv->size = Size;
This->post = queuePost;
This->get = queueGet;
This->destroy = destroy;
return This;
}
其中msgget
的详细解释如下:定义
然后看一下其他的功能函数:
- queuePost
- queueGet
- destroy
static void queuePost(Queue * This,void *data)
{
if (This->priv->type == QUEUE_NONBLOCK) {
if (msgsnd(This->priv->queue, (const char*)data, This->priv->size, IPC_NOWAIT))
fprintf(stderr, "mq_send failed: %s\n", strerror(errno));
} else {
if (This->priv->current_length >= MAX_COMMAND_QUEUE_SIZE) {
printf("fifo full!!%s\n",This->priv->name);
return;
}
QUEUE_LOCK();
if (This->priv->index_post >= MAX_COMMAND_QUEUE_SIZE) {
This->priv->index_post = 0;
}
This->priv->buf[This->priv->index_post] = (void *) calloc (1,This->priv->size);
if (This->priv->buf[This->priv->index_post] == NULL) {
printf("[%s] can't calloc memory\n", __func__);
return;
}
if (This->priv->size)
memcpy(This->priv->buf[This->priv->index_post],data,This->priv->size);
This->priv->index_post++;
This->priv->current_length++;
QUEUE_UNLOCK();
semPost(This);
}
}
static int queueGet(Queue *This,void *data)
{
if (This->priv->type == QUEUE_NONBLOCK) {
return msgrcv(This->priv->queue, (void *)data, This->priv->size, 0,IPC_NOWAIT);
// return mq_receive(This->priv->queue, (char*)data, This->priv->size, 0);
}
semWait(This);
if (This->priv->current_length == 0) {
printf("fifo empty!!\n");
return 0;
}
QUEUE_LOCK();
if (This->priv->index_get >= MAX_COMMAND_QUEUE_SIZE) {
This->priv->index_get = 0;
}
if (This->priv->size)
memcpy(data,This->priv->buf[This->priv->index_get],This->priv->size);
free(This->priv->buf[This->priv->index_get]);
This->priv->buf[This->priv->index_get] = NULL;
This->priv->index_get++;
This->priv->current_length--;
QUEUE_UNLOCK();
return 0;
}