概述
本文分析了beanstalkd的bury命令和kick命令的实现原理。
bury和kick指令原理
bury和kick是一组相反操作的指令。当我们暂时不想再关注一个job时,可以把它bury掉。这样任何人就无法订阅(reserve)到该job了。直到有人发送了kick指令,此时该job又回到了READY状态,可以被订阅。
bury和kick指令的格式
bury指令
说明 将一个job的状态迁移为buried,处于该状态的job是不能被订阅(reserved)的。bury指令的格式为:
bury <id> <pri>\r\n
参数和响应说明:
- id:为job id
- pri:为优先级
- 响应:BURIED\r\n 表明成功 NOT_FOUND\r\n 如果job不存在或者client没有预订此job
kick指令
通过bury指令把job状态迁移为buried状态,此时的job不能被任何消费者订阅(reserved),直到对该job执行kick指令。kick指令的格式为:
kick <bound>\r\n
参数说明:
- bound: 整型值,唤醒的job上限
注意,该指令只在被use的tube中生效,在使用该指令前,应该先使用use指令来指定一个tube。
bury指令的实现分析
bury指令的实现原理如下:
- 获取job id和job pri参数
- 从保存job的全局hash表:all_jobs中,通过id找到job的指针
- 若该job的状态是Reserved,且reserver是当前连接,则删除该Job
- 把该job放入到当前连接的tube的bury堆中
case OP_BURY:
errno = 0;
//获取参数:bury的job的id
id = strtoull(c->cmd + CMD_BURY_LEN, &pri_buf, 10);
if (errno) return reply_msg(c, MSG_BAD_FORMAT);
//获取参数:bury的pri值(优先级)
r = read_pri(&pri, pri_buf, NULL);
if (r) return reply_msg(c, MSG_BAD_FORMAT);
op_ct[type]++;
//从all_jobs全局job hash表中删除当前连接订阅(reserve)的,状态是Reserved的job
j = remove_reserved_job(c, job_find(id));
if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
j->r.pri = pri;
//把job插入到对应tube的buried链表中
r = bury_job(c->srv, j, 1);
if (!r) return reply_serr(c, MSG_INTERNAL_ERROR);
reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
break;
另外,这里要注意,若设置了srv.wal.use参数,且在需要时还需要把buried的job保存到文件中,该机制在后面持久化时进行讲解。
kick指令的实现分析
kick指令的实现原理如下:
1. 获取kick的参数bound的值
2. 若当前连接使用的tube的buried job链表不为空,遍历当前连接的buried链表,执行以下操作:
* 从对应tube的buried链表中删除该job
* 把该job插入到对应tube的ready对中
* 若放入ready堆没有成功,可能是ready堆满了,再次bury该job
3. 若当前连接的tube的buried job链表为空,则遍历delay job堆中的job:
* 把该job从对应的tube的delay堆中删除
* 把该job添加到对应tube的ready堆中
* 若ready堆满了,还需要把该job放回到buried中
小结:可以看出kick其实就是把job从对应tube的buried和delay队列(若buried堆为空),中删除,然后把该job添加到ready队列中。若有必要还需要把job写入文件中,进行持久化。
case OP_KICK:
errno = 0;
count = strtoul(c->cmd + CMD_KICK_LEN, &end_buf, 10); //获取参数,kick的数量
if (end_buf == c->cmd + CMD_KICK_LEN) {
return reply_msg(c, MSG_BAD_FORMAT);
}
if (errno) return reply_msg(c, MSG_BAD_FORMAT);
op_ct[type]++;
i = kick_jobs(c->srv, c->use, count); //执行kick的操作
return reply_line(c, STATE_SENDWORD, "KICKED %u\r\n", i);
总结
bury是beanstalkd的中间状态,被buried的job,无法被消费,直到该job被kick,job的状态回到ready状态。