Beanstalkd源码分析--use命令的实现

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zg_hover/article/details/81907506

概述

本文讲述了beanstalkd中use命令命令的实现原理。分析了保存tube的存储结构,和查询tube时的具体操作。并分析了这样实现的优缺点。

引子

前面已经讲解过命令的处理流程,本章直接进入use命令的实现部分。
就像数据库,在使用时需要先选择一个数据库的名字。在beanstalkd中也需要指定一个tube的名字。具体的指定操作是通过use命令来实现的。要注意的是:

  • 若不指定tube名称,默认使用的是default这个tube
  • 若指定的tube不存在,beanstalkd会自动创建该tube

另外,还需要注意以下两点:

  • use和using命令,影响的是put操作。而且只能有一个tube被use。
  • watch和watching,影响的是reserve操作,watch的tube可以是多个,只要其中有一个有数据,reserve都会返回该数据。

客户端使用use命令

通过beanstalkc客户端,我们可以先查看一下use命令的使用:

>>> import beanstalkc
>>> beanstalk = beanstalkc.Connection(host='localhost', port=11300)
>>> beanstalk.use('foo')   # 指定一个tube
'foo'
>>> beanstalk.using()   # 查看现在使用的tube的名字 
'foo'
>>> beanstalk.tubes()   # 查看目前可用的tube名列表
['default']

use命令的实现

实现过程概述

前面已经说过,当use的tube不存在时,beanstalkd会自动创建该tube。下面我们来看一下该命令是如何实现的,数据结构是如何安排的?

use命令的实现过程如下:
1. 检查tube的名字的长度和格式是否合理,若不合理返回错误
2. 从全局变量tubes中,查找对应名称的tube是否存在,若存在直接返回其指针,若不存在创建一个tube并把该tube添加到全局变量tubes中,并返回该tube的指针。

在dispatch_cmd(Conn *c)函数中,use命令的处理的代码如下:

    case OP_USE:
        name = c->cmd + CMD_USE_LEN;
        if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
        op_ct[type]++;

        TUBE_ASSIGN(t, tube_find_or_make(name));
        if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);

        c->use->using_ct--;
        TUBE_ASSIGN(c->use, t);
        TUBE_ASSIGN(t, NULL);
        c->use->using_ct++;

        reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
        break;

从实现角度来看,这段代码的核心是tube_find_or_make函数,该函数从全局变量tubes中查找对应名字的tube是否存在。若存在直接返回该tube的指针,若不存在创建一个tube并把该tube添加到全局变量中。

tubes全局变量

所有的tube的指针都被保存在一个叫tubes的全局变量中,该变量的结构如下:

struct ms {
    size_t used, cap, last;
    void **items;
    ms_event_fn oninsert, onremove;
};

struct ms tubes;

从以上代码实现上来看,其实是一个tube指针的数组,形式如下:

 &tube2{}
    /|\
     |
 [0][1][2][3][4][5]
  |
 \|/
&tube1{} 

注意:这是一种简单的实现,要是有大量的tube的话,查找的性能可能会受到影响。但在实际使用过程中,tube的数量应该不会太多。

tube_find_or_make

  • 查找tube
    查找tube时,其实就是在tubes这个全局变量的中遍历items这个数组,并通过tube的名字进行字符串匹配。

  • 创建一个tube
    若在tubes中找不到匹配的名称,需要创建一个tube。创建的操作通过make_tube函数来完成。该函数的代码如下:

tube
make_tube(const char *name)
{
    tube t;

    // 分配内存
    t = new(struct tube);
    if (!t) return NULL;

    // 设置tube名称
    t->name[MAX_TUBE_NAME_LEN - 1] = '\0';
    strncpy(t->name, name, MAX_TUBE_NAME_LEN - 1);
    if (t->name[MAX_TUBE_NAME_LEN - 1] != '\0') twarnx("truncating tube name");

    // 设置回调函数

    // 入堆时需要用到less函数,来和堆中的父节点进行值的比较
    t->ready.less = job_pri_less;
    t->delay.less = job_delay_less;
    // 设置堆数组中的index的值
    t->ready.rec = job_setheappos;
    t->delay.rec = job_setheappos;
    // 初始化buried链表
    t->buried = (struct job) { };
    t->buried.prev = t->buried.next = &t->buried;
    // 初始化waiting数组(保存等待的链接conn对象)
    ms_init(&t->waiting, NULL, NULL);

    return t;
}

在创建tube时完成tube结构的初始化,主要有:

  • 设置保存job的最小堆的比较回调函数
  • 设置记录堆的index的回调函数
  • 初始化job的buried链表
  • 初始化tube的等待链表,该队列主要用来保存等待的连接实体conn的指针

另外,在添加tube时,若tubes的len(长度)大于其cap(最大容量)就需要扩容。扩容的操作,是新申请一块原来2倍大小的内存,然后把老的内容复制到新的内存块中。

设置连接(conn)正在使用的tube指针

tube创建完成后,需要把该tube设置为目前的conn使用的tube,若连接原来设置过tube还需要对老的tube进行检查,若老的tube没有使用(该tube的引用数为0),则需要释放内存。

TUBE_ASSIGN(c->use, t);
TUBE_ASSIGN(t, NULL);

以上代码中的宏定义如下:

#define TUBE_ASSIGN(a,b) (tube_dref(a), (a) = (b), tube_iref(a))

最后把临时变量t的值设置为NULL,以便后来再使用。

总结

本文讲述了创建tube的实现原理,并分析了tube的数据存储结构。

猜你喜欢

转载自blog.csdn.net/zg_hover/article/details/81907506