概述
本文讲述了beanstalkd中use命令命令的实现原理。分析了保存tube的存储结构,和查询tube时的具体操作。并分析了这样实现的优缺点。
引子
前面已经讲解过命令的处理流程,本章直接进入use命令的实现部分。
就像数据库,在使用时需要先选择一个数据库的名字。在beanstalkd中也需要指定一个tube的名字。具体的指定操作是通过use命令来实现的。要注意的是:
- 若不指定tube名称,默认使用的是default这个tube
- 若指定的tube不存在,beanstalkd会自动创建该tube
另外,还需要注意以下两点:
- use和using命令,影响的是put操作。而且只能有一个tube被use。
- watch和watching,影响的是reserve操作,watch的tube可以是多个,只要其中有一个有数据,reserve都会返回该数据。
客户端使用use命令
通过beanstalkc客户端,我们可以先查看一下use命令的使用:
>>> import beanstalkc
>>> beanstalk = beanstalkc.Connection(host='localhost', port=11300)
>>> beanstalk.use('foo') # 指定一个tube
'foo'
>>> beanstalk.using() # 查看现在使用的tube的名字
'foo'
>>> beanstalk.tubes() # 查看目前可用的tube名列表
['default']
use命令的实现
实现过程概述
前面已经说过,当use的tube不存在时,beanstalkd会自动创建该tube。下面我们来看一下该命令是如何实现的,数据结构是如何安排的?
use命令的实现过程如下:
1. 检查tube的名字的长度和格式是否合理,若不合理返回错误
2. 从全局变量tubes中,查找对应名称的tube是否存在,若存在直接返回其指针,若不存在创建一个tube并把该tube添加到全局变量tubes中,并返回该tube的指针。
在dispatch_cmd(Conn *c)函数中,use命令的处理的代码如下:
case OP_USE:
name = c->cmd + CMD_USE_LEN;
if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
op_ct[type]++;
TUBE_ASSIGN(t, tube_find_or_make(name));
if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
c->use->using_ct--;
TUBE_ASSIGN(c->use, t);
TUBE_ASSIGN(t, NULL);
c->use->using_ct++;
reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
break;
从实现角度来看,这段代码的核心是tube_find_or_make函数,该函数从全局变量tubes中查找对应名字的tube是否存在。若存在直接返回该tube的指针,若不存在创建一个tube并把该tube添加到全局变量中。
tubes全局变量
所有的tube的指针都被保存在一个叫tubes的全局变量中,该变量的结构如下:
struct ms {
size_t used, cap, last;
void **items;
ms_event_fn oninsert, onremove;
};
struct ms tubes;
从以上代码实现上来看,其实是一个tube指针的数组,形式如下:
&tube2{}
/|\
|
[0][1][2][3][4][5]
|
\|/
&tube1{}
注意:这是一种简单的实现,要是有大量的tube的话,查找的性能可能会受到影响。但在实际使用过程中,tube的数量应该不会太多。
tube_find_or_make
查找tube
查找tube时,其实就是在tubes这个全局变量的中遍历items这个数组,并通过tube的名字进行字符串匹配。创建一个tube
若在tubes中找不到匹配的名称,需要创建一个tube。创建的操作通过make_tube函数来完成。该函数的代码如下:
tube
make_tube(const char *name)
{
tube t;
// 分配内存
t = new(struct tube);
if (!t) return NULL;
// 设置tube名称
t->name[MAX_TUBE_NAME_LEN - 1] = '\0';
strncpy(t->name, name, MAX_TUBE_NAME_LEN - 1);
if (t->name[MAX_TUBE_NAME_LEN - 1] != '\0') twarnx("truncating tube name");
// 设置回调函数
// 入堆时需要用到less函数,来和堆中的父节点进行值的比较
t->ready.less = job_pri_less;
t->delay.less = job_delay_less;
// 设置堆数组中的index的值
t->ready.rec = job_setheappos;
t->delay.rec = job_setheappos;
// 初始化buried链表
t->buried = (struct job) { };
t->buried.prev = t->buried.next = &t->buried;
// 初始化waiting数组(保存等待的链接conn对象)
ms_init(&t->waiting, NULL, NULL);
return t;
}
在创建tube时完成tube结构的初始化,主要有:
- 设置保存job的最小堆的比较回调函数
- 设置记录堆的index的回调函数
- 初始化job的buried链表
- 初始化tube的等待链表,该队列主要用来保存等待的连接实体conn的指针
另外,在添加tube时,若tubes的len(长度)大于其cap(最大容量)就需要扩容。扩容的操作,是新申请一块原来2倍大小的内存,然后把老的内容复制到新的内存块中。
设置连接(conn)正在使用的tube指针
tube创建完成后,需要把该tube设置为目前的conn使用的tube,若连接原来设置过tube还需要对老的tube进行检查,若老的tube没有使用(该tube的引用数为0),则需要释放内存。
TUBE_ASSIGN(c->use, t);
TUBE_ASSIGN(t, NULL);
以上代码中的宏定义如下:
#define TUBE_ASSIGN(a,b) (tube_dref(a), (a) = (b), tube_iref(a))
最后把临时变量t的值设置为NULL,以便后来再使用。
总结
本文讲述了创建tube的实现原理,并分析了tube的数据存储结构。