版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zg_hover/article/details/82179283
概述
本文分析了beanstalkd的release命令的实现原理。
release命令的说明
功能说明
release命令将一个reserved的job放回ready堆中。它通常在job执行失败时使用。
命令行说明
release <id> <pri> <delay>\r\n
- id:为job id
- pri:为job的优先级
- delay:为延迟ready的秒数
- 响应
- RELEASED\r\n 表明成功
- BURIED\r\n 操作失败,可能是由于内存分配失败等原因,此时要重新放回buried堆中
- NOT_FOUND\r\n 如果job不存在或者client没有预订此job
release命令的实现原理
release命令的实现流程如下:
1. 获取命令行参数,包括:job的id,job的优先级,job的delay时间
2. 从全局工作哈希表:all_jobs(保存所有job的全局hash表)中查找该job的指针
3. 若该job的状态是Reserved,且该job的reserver是当前连接(当前连接在reserved),则把该job从all_jobs哈希表中删除
4. 若设置了wal参数,还需要对job进行持久化(一般没有设置该选项,这里暂时不讲解)
5. 根据参数重新设置该job的优先级,delay时间等参数
6. 根据delay的判断完成以下流程中的一个:
a. 若delay大于0:
* 把job插入到job所属tube的delay堆中
* 修改job的状态为Delayed
* 重新设置该job的被处理的等待时间:j->deadline_at
b. 若delay小于等于0
* 把job插入到该job所属tube的ready堆中
* 修改job的状态为Ready
c. 处理所有job中已经过期的job,把这些job放回到对应连接的reserved_jobs队列中
7. 若第6步失败,则会把该job重新放回buried堆中,并把状态设置为:Buried
release命令的源码分析
static void
dispatch_cmd(Conn *c)
{
...
case OP_RELEASE:
errno = 0;
//获取参数:job id
id = strtoull(c->cmd + CMD_RELEASE_LEN, &pri_buf, 10);
if (errno) return reply_msg(c, MSG_BAD_FORMAT);
//获取参数:job优先级
r = read_pri(&pri, pri_buf, &delay_buf);
if (r) return reply_msg(c, MSG_BAD_FORMAT);
//获取参数:delay参数
r = read_delay(&delay, delay_buf, NULL);
if (r) return reply_msg(c, MSG_BAD_FORMAT);
op_ct[type]++;
//从all_jobs中查找该job的指针,若job被当前连接reserved,且状态是Reserved,则删除之
j = remove_reserved_job(c, job_find(id));
if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
/* We want to update the delay deadline on disk, so reserve space for
* that. */
if (delay) {
z = walresvupdate(&c->srv->wal, j);
if (!z) return reply_serr(c, MSG_OUT_OF_MEMORY);
j->walresv += z;
}
j->r.pri = pri; //把优先级设置成参数的值
j->r.delay = delay; //把delay的值设置成参数的值
j->r.release_ct++; //释放统计数量+1
//若delay大于0,则放入tube的delay堆,否则放入tube的ready堆中
// 在这一步还要处理已经过期的job,把这些job放回到对应连接的reserved_jobs队列中
r = enqueue_job(c->srv, j, delay, !!delay);
if (r < 0) return reply_serr(c, MSG_INTERNAL_ERROR);
if (r == 1) {
return reply(c, MSG_RELEASED, MSG_RELEASED_LEN, STATE_SENDWORD);
}
/* out of memory trying to grow the queue, so it gets buried */
// 以上操作不成功,重新放入buried堆
bury_job(c->srv, j, 0);
reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
break;
...
}
总结
本文分析了release的实现原理,并对其源码进行了分析。可以看出,release命令主要是把job返回到ready或delay堆(若设置了delay参数)中,并重新设置该job的处理时间,让该job的处理时间得到延长。