Beanstalkd源码分析—reserve命令实现

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zg_hover/article/details/81986689

概述

本文分析了beanstalkd的reserve命令的实现原理。
reserve命令是由消费者发起的命令,消费者通过该命令来获取tube中的job。消费者也可以为该命令设置超时时间。

实现原理分析

reserve命令是由消费者端发起的,该指令会从使用的tube的ready堆中取出一个job,返回给消费者。若没有准备好的job,消费者会阻塞,直到有准备好的job为止。
reserve命令也可以指定超时时间:timeout,若在指定的时间内,对应的tube上没有准备好的job,直接返回空,这样可以提供一个非阻塞的消费模式,方便消费者在代码中使用。

reserve指令的实现逻辑如下:

(1) 首先把conn的类型设置为CONN_TYPE_WORKER。因为,执行reserve命令的客户端的状态此时被认为是worker,是自然的。

(2) 若满足一下两个条件,来执行后面对应的操作

a.从参数c(连接)的reserved_jobs链表中找到一个ttr最小的job(若没有设置ttr选择就是最先插入的job),并判断该Job是否至少还有1秒过期。
b.检查当前conn watch的tube的ready job堆是否有准备好的job

若存在满足条件a的reserved的job,并且watch的tube没有准备好的job,则直接返回MSG_DEADLINE_SOON消息,并把连接设置为等待’w’(写入)的状态(等待服务向连接写数据),否则继续执行下面的步骤。

(3) 在连接(conn)watch的tube的waiting数组中,添加conn的指针。把当前的conn的监控事件改成’h’挂起,然后从新设置超时时间。

(4) 遍历所有tube,找出tube中已经过期且pri最小的job,把该job从对应的tube的ready对中删除,然后把该job添加到当前连接的reserved_jobs链表中,把对应连接从对应的tube的waiting队列中删除,并把找到的job返回给对应连接的客户端。

代码实现分析

reserve和reserve_timeout命令的实现代码如下。
从代码中可以看出,其reserve和reserve timeout的执行的代码是同一段,不过参数不同。

static void
dispatch_cmd(Conn *c) {
    ...
    case OP_RESERVE_TIMEOUT: //设置超时时间的reserve命令
        errno = 0; 
        timeout = strtol(c->cmd + CMD_RESERVE_TIMEOUT_LEN, &end_buf, 10); 
        if (errno) return reply_msg(c, MSG_BAD_FORMAT);
    case OP_RESERVE: /* FALLTHROUGH */
        // 检查命令格式和reserve命令的长度
        /* don't allow trailing garbage */
        if (type == OP_RESERVE && c->cmd_len != CMD_RESERVE_LEN + 2) { // 检查命令格式:reserve命令的长度
            return reply_msg(c, MSG_BAD_FORMAT);
        }    

        op_ct[type]++; //统计执行命令的连接数
        connsetworker(c); //设置连接类型为CONN_TYPE_WORKER

        // 若conn的reserved_job链表中存在一个ttr最小且距目前时间(now)大于1秒的job 并且
        // 连接(conn)watch的tube的ready堆中没有准备好的job
        if (conndeadlinesoon(c) && !conn_ready(c)) { // 取出job后,判断是否有watch的tube
            return reply_msg(c, MSG_DEADLINE_SOON);
        }    

        /* try to get a new job for this guy */
        // 把连接状态设置成wait 且 在conn watch的tube的waiting数组中,添加当前的指针
        // 并重新设置等待时间
        wait_for_job(c, timeout);

        // 遍历所有的tube,选出过期的tube中pri最低的job
        // 且 把找到的job从tube的ready堆中删除
        // job添加到conn结构的reserved_jobs链表中,把job返回给连接
        // 若job的ttr比目前的soonest小,把该job赋予soonest变量
        process_queue();
        break;
    ...
}

实现函数分析

conndeadlinesoon

该函数的功能是:
从参数c的reserved_jobs链表中找到一个ttr最小的job,并判断该Job是否至少还有1秒过期,若是返回true。否则,返回false。

/* return the reserved job with the earliest deadline,
 * or NULL if there's no reserved job */
job
connsoonestjob(Conn *c)  //从reserved_jobs链表中找到一个ttr最小的job
{
    job j = NULL;
    job soonest = c->soonest_job;

    // 若soonest为NULL
    if (soonest == NULL) { 
        //遍历当前conn的reserved_jobs链表
        for (j = c->reserved_jobs.next; j != &c->reserved_jobs; j = j->next) { 
            //找出该链表中ttr最小的job
            if (j->r.deadline_at <= (soonest ? : j)->r.deadline_at) soonest = j; 
        }   
    }   
    //把该job赋给soonest,并返回
    c->soonest_job = soonest; 
    return soonest;
}


/* return true if c has a reserved job with less than one second until its
 * deadline */
// 若当前连接c的reserved_jobs链表中存在一个还有1秒到期的job,则返回true 
int
conndeadlinesoon(Conn *c) 
{
    int64 t = nanoseconds(); //获取目前的时间戳
    job j = connsoonestjob(c); //从conn的reserved_jobs链表中选择一个过期时间最小的job,若没有则返回null

    return j && t >= j->r.deadline_at - SAFETY_MARGIN; //判断该job的超时时间是否小于1秒,若是返回该job
}
  • conn_ready函数

该函数的功能是:检查当前连接c的watch数组(tube数组)的ready job堆是否有准备好的job。

int
conn_ready(Conn *c)
{
    size_t i;

    for (i = 0; i < c->watch.used; i++) { //遍历该conn的watch的tube数组
        if (((tube) c->watch.items[i])->ready.len) return 1; //若有一个watch的tube的ready job堆长度大于0,则返回1
    }        
    return 0;
}
  • wait_for_job函数

该函数的功能是:修改当前连接的状态为STATE_WAIT,并把该连接添加到,watch的tube的waiting数组中。然后把当前连接添加到dirty链表中。

static void 
wait_for_job(Conn *c, int timeout)
{
    c->state = STATE_WAIT; //连接状态设置成wait
    enqueue_waiting_conn(c); //在conn watch的tube的waiting数组中,添加conn的指针

    /* Set the pending timeout to the requested timeout amount */
    c->pending_timeout = timeout;

    connwant(c, 'h'); // only care if they hang up //为连接c重新设置超时时间
    c->next = dirty; //把conn加入dirty全局队列中
    dirty = c; 
}
  • process_queue函数

该函数遍历所有tube,找出tube中已经过期且pri最小的job,把该job从对应的tube的ready对中删除,然后把该job添加到当前连接的reserved_jobs链表中,并把找到的job返回给连接的客户端。

static void
process_queue()
{
    job j;
    int64 now = nanoseconds();

    while ((j = next_eligible_job(now))) { //遍历所有的tube,选出过期的tube中pri最低的job
        heapremove(&j->tube->ready, j->heap_index); //把找到的job从tube的ready堆中删除
        ready_ct--; //ready堆中的job减少1
        if (j->r.pri < URGENT_THRESHOLD) { //若选出的job的优先级小于URGENT_THRESHOLD,需要把相应的统计值减少1
            global_stat.urgent_ct--;  //全局统计值
            j->tube->stat.urgent_ct--; //针对该job的统计值
        }
        reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j); //把job添加到conn结构的reserved_jobs链表中,把job返回给连接
    }
}

总结

本文分析了beanstalkd的reserve命令的实现原理。并对其函数细节进行了分析。

猜你喜欢

转载自blog.csdn.net/zg_hover/article/details/81986689