Beanstalkd源码分析—release命令的实现

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zg_hover/article/details/82179283

概述

本文分析了beanstalkd的release命令的实现原理。

release命令的说明

功能说明

release命令将一个reserved的job放回ready堆中。它通常在job执行失败时使用。

命令行说明

release <id> <pri> <delay>\r\n
  • id:为job id
  • pri:为job的优先级
  • delay:为延迟ready的秒数
  • 响应
    • RELEASED\r\n 表明成功
    • BURIED\r\n 操作失败,可能是由于内存分配失败等原因,此时要重新放回buried堆中
    • NOT_FOUND\r\n 如果job不存在或者client没有预订此job

release命令的实现原理

release命令的实现流程如下:
1. 获取命令行参数,包括:job的id,job的优先级,job的delay时间
2. 从全局工作哈希表:all_jobs(保存所有job的全局hash表)中查找该job的指针
3. 若该job的状态是Reserved,且该job的reserver是当前连接(当前连接在reserved),则把该job从all_jobs哈希表中删除
4. 若设置了wal参数,还需要对job进行持久化(一般没有设置该选项,这里暂时不讲解)
5. 根据参数重新设置该job的优先级,delay时间等参数
6. 根据delay的判断完成以下流程中的一个:

a. 若delay大于0:
    * 把job插入到job所属tube的delay堆中
    * 修改job的状态为Delayed
    * 重新设置该job的被处理的等待时间:j->deadline_at
b. 若delay小于等于0
    * 把job插入到该job所属tube的ready堆中
    * 修改job的状态为Ready
c. 处理所有job中已经过期的job,把这些job放回到对应连接的reserved_jobs队列中

7. 若第6步失败,则会把该job重新放回buried堆中,并把状态设置为:Buried

release命令的源码分析

static void
dispatch_cmd(Conn *c)
{
    ...
    case OP_RELEASE:
        errno = 0; 
        //获取参数:job id
        id = strtoull(c->cmd + CMD_RELEASE_LEN, &pri_buf, 10); 
        if (errno) return reply_msg(c, MSG_BAD_FORMAT);

         //获取参数:job优先级
        r = read_pri(&pri, pri_buf, &delay_buf);
        if (r) return reply_msg(c, MSG_BAD_FORMAT);

         //获取参数:delay参数
        r = read_delay(&delay, delay_buf, NULL);
        if (r) return reply_msg(c, MSG_BAD_FORMAT);
        op_ct[type]++;

        //从all_jobs中查找该job的指针,若job被当前连接reserved,且状态是Reserved,则删除之
        j = remove_reserved_job(c, job_find(id)); 
        if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);

        /* We want to update the delay deadline on disk, so reserve space for
         * that. */
        if (delay) {
            z = walresvupdate(&c->srv->wal, j);
            if (!z) return reply_serr(c, MSG_OUT_OF_MEMORY);
            j->walresv += z;
        }    

        j->r.pri = pri; //把优先级设置成参数的值
        j->r.delay = delay; //把delay的值设置成参数的值
        j->r.release_ct++; //释放统计数量+1

        //若delay大于0,则放入tube的delay堆,否则放入tube的ready堆中
        // 在这一步还要处理已经过期的job,把这些job放回到对应连接的reserved_jobs队列中
        r = enqueue_job(c->srv, j, delay, !!delay); 
        if (r < 0) return reply_serr(c, MSG_INTERNAL_ERROR);
        if (r == 1) { 
            return reply(c, MSG_RELEASED, MSG_RELEASED_LEN, STATE_SENDWORD);
        }

        /* out of memory trying to grow the queue, so it gets buried */  
        // 以上操作不成功,重新放入buried堆
        bury_job(c->srv, j, 0);
        reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
        break;
    ...
}

总结

本文分析了release的实现原理,并对其源码进行了分析。可以看出,release命令主要是把job返回到ready或delay堆(若设置了delay参数)中,并重新设置该job的处理时间,让该job的处理时间得到延长。

猜你喜欢

转载自blog.csdn.net/zg_hover/article/details/82179283