通过给集合(set)中的每个元素设定一个分数(score),这样集合中的成员可以根据分数进行从大到小的排序,这样的集合叫做有序集合(zset)。Redis中有序集合有两种实现方式:跳表和压缩列表。这篇文章将介绍Redis中跳表的实现原理。
一. 跳表
如果希望一个集合中的元素是有序的,很自然的,我们会想到用有序链表来实现:
这样,对这个链表进行插入和查找操作的时间复杂度是O(N),那么有没有更加高效的方法呢?
我们将有序链表中的部分节点进行分层,每一层都是一个有序链表,如图:
如果要查找30, 先从第三层的第一个节点开始,发现是1,小于30,查找下一个节点;下一个节点是9,小于30,继续查找下一个节点;发现是null,那么下降一层,来到第二层,仍然是9,查找下一个节点,发现等于30,查找结束。
如果采用有序链表,需要进行5次查找,而采用跳表只需要3次查找,在节点数很大的时候,性能的提升会更加明显。如果跳表的节点高度设置的合理,时间复杂度可以达到O(lgN),跳表是一个典型的用空间换时间的优化案例。
二. 跳表结构
Redis中跳表的结构如下图所示,结构体zskiplist表示整个跳表,结构体zskiplistNode表示跳表的一个节点,涉及的属性的含义如下:
- ele:该节点所存储的字符串
- score:该节点排序的分值
- backward:当前节点最底层的前一个节点,头节点和第一个节点的backward指向NULL
- level:每个zskiplistNode节点都有多层,一层为一个zskiplistLevel,一个zskiplistNode的所有 zskiplistLevel用一个level数组存储
zskiplistLevel包含以下两个属性:
- forward:指向同一层的下一个节点,为节点的forward指向NULL
- span:forward指向的节点与本节点之间的节点的个数,span越大说明跳过的节点的个数越多
跳跃表zskiplist包含以下属性:
- header:指向跳表的头节点,头节点是跳表的一个标记节点,他不存储任何元素信息(ele永 远为NULL,score永远为0),他的level数组长度为64,头节点不计入跳表总长 度,头节点在初始化时,64个元素的forward都指向NULL,span值都为0
- tail:指向跳表的尾节点
- length:跳表的节点的个数(不包含头节点)
- level:跳表的节点的最大高度(不包括头节点)
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;
typedef struct zskiplistNode {
sds ele;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned long span;
} level[];
} zskiplistNode;
二. 创建跳表
创建跳表主要涉及到以下操作
1. 确定节点层高
跳表节点层高最小为1,最大为ZSKIPLIST_MAXLEVEL = 64。zslRandomLevel函数随机生成一个1~64的整数,作为节点的高度,高度越大,出现的概率越低。
#define ZSKIPLIST_P 0.25
#define ZSKIPLIST_MAXLEVEL 64
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
上述代码中,通过while循环每次随机生成一个随机值,取这个随机值的低16位 (random()&0xFFFF)作为x,当x小于0.25*0xFFFF时,level+=1,否则退出while循环,最终返回level和64两者中的较小值。这样,很容易计算每个层高出现的概率。设p=ZSKIPLIST_P=0.25:
- 节点层高为1的概率为 (1-p)
- 节点层高为2的概率为 (1-p)*p
- 节点层高为3的概率为 (1-p)*p^2
- ...
- 节点层高为n的概率为 (1-p)*p^2
当然,当节点层高大于64时,最终也会取64,所以实际上节点层高为64的概率为:
1 - (节点层高小于64的概率值和)
2. 创建跳表节点
zslCreateNode函数创建指定层高的节点,改函数为节点分配内存空间,然后对节点属性值进行初始化就可以了,注意方法最后会返回指向ele的指针。
/* Create a skiplist node with the specified number of levels.
* The SDS string 'ele' is referenced by the node after the call. */
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
zskiplistNode *zn =
zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->ele = ele;
return zn;
}
3. 创建跳表
zslCreate函数创建一个空的跳表,跳表中仅有一个头节点。
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
三. 插入节点
跳表最复杂的操作就是插入节点了,调用zslInsert插入一个节点:
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele)
插入节点主要有以下几个步骤
1. 查找要插入的位置
仍然以上面的例子为例,假如现在要插入一个score为12,层高为4的节点,我们把将要插入的节点记作Node M,显然他将被插入到Node 4和Node 5之间,如图所示
插入节点的过程中,需要维护两个长度为64的数组来辅助插入操作:
- update[]:用来保存插入节点每一层的前一个节点。update[i]表示Node M插入后的第i层的前一个节点
- rank[]:用来保存头节点到update[i]节点的距离。rank[i]表示header节点到update[i]节点的距离
我们通过下面这段代码,详细分析如何更新update和rank这两个数组的值。在一个for循环中,从数组下标为level-1开始一直更新到下标为0,在这个例子中,level=3,因此从数组下标2开始更新,先计算update[2]和rank[2],一直到update[0]和rank[0]。
- 第一次进入循环,rank[2]的初始值为0,然后通过一个while循环,从头节点的第2层开始一直往后查找,找到最后一个小于Node M的节点,这个节点就是Node M插入后的第2层的前一个节点,显然这个节点是Node 3,在查找的过程中,会将经过的节点的第2层的span值累加到rank[2]上,因此rank[2]=0(初始值)+1+2=3,(查找路径为从header -> Node 1 -> Node 3)。最后得到update[2]=Node 3,即Node M第2层的前一个节点为Node 3,rank[2]=3,即从header节点到Node 3的距离为3
- 第二次进入循环,rank[1]的初始值为rank[2]=3,从Node 3的第1层开始往后查找最后一个小于Node M的节点,这个节点仍然是Node 3,因此update[1]=Node 3,即Node M第1层的前一个节点也是Node 3,rank[1]=3(查找路径停留在Node 3,因此rank[1]最终还是为初始值)。
- 第三次进入循环,rank[0]的初始值为rank[1]=3,从Node 3的第0层开始往后查找最后一个小于Node M的节点,这个节点是Node 4,因此update[0]=Node 4,即Node M第0层的前一个节点是Node 4,rank[0]=3+1=4,(查找路径为Node 3 -> Node 4),即header节点到Node 4的距离为4
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
至此,update和rank数组的下标0~2的值都计算出来了。
2. 调整跳表高度
之前说过,插入节点的高度是随机的,这里会调用zslRandomLevel函数为节点随机生成高度,我们假设节点的高度为4,大于跳表的高度3,因此需要对跳表的高度进行更新。在上一步骤中,我们已经计算完了update和rank数组下标从0 ~ zsl->level-1的值,这里则会在for循环中计算下标zsl->level~node->level-1的值。显然Node M从zsl->level到level-1这一段层高的前一节点都是header,因此update[i]=header,rank[i]=0,以上述例子为例,zsl->level=3,level=4,for循环只运行一次,设置update[3]=header,rank[3]=0,update[3]->level[3].span先暂时设定为zsl->length=5,之后会对该值进行重新计算赋值。
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
3. 插入节点
当update和rank数组都赋值完后,就可以插入节点了,代码如下。插入节点需要做这几件事
- 设置Node M的forward属性
- 将update[i]的forward修改为Node M
- 设置Node M每一层的span值
- 修改update[i]的span值
先对0~level-1层做以上操作(第一个for循环),再对level~zsl->level-1层做以上操作(第二个for循环),如果新插入节点的高度大于跳表原来的高度,则第二个for循环不会执行,但如果新插入的节点的高度小于跳表的原高度,则从level~zsl->level-1层的update[i]节点的forward不会指向新插入的节点,所以不用更新update[i]的forward指针,只需要将span值加1即可。
x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
//如果Node M的高度小于跳表的原来的高度,这个for循环就会被执行
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
4. 调整backward
因为新插入节点Node M的前一个节点一定是update[0],因此将Node M的backward设置为update[0],Node M的下一节点的backward设置为Node M,然后跳表的长度+1。至此,整个节点的插入操作就完成了。
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
四. 删除节点
删除节点可以分为两步:
- 查找需要更新的节点,给update数组赋值。这一步骤和插入节点是一样的,不再赘述
- 更新span和forward
- 更新backward
- 更新跳表的level和length
相关的代码如下。zslDelete函数接收四个参数:zsl,score,ele,node。该函数将分数为score,内容为ele的节点从跳表zsl中删除,如果node为null,则将该节点的内从空间释放掉,如果不为null,则将节点从跳表中删除,但不释放节点的内存空间,而是将删除节点的指针赋值给node,这样函数调用者可以获取到这个被删除的节点。
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
zslDeleteNode(zsl, x, update);
if (!node)
zslFreeNode(x);
else
*node = x;
return 1;
}
return 0; /* not found */
}
如果找到了符合条件的节点,则调用zslDeleteNode函数,该函数会将该节点从跳表中移除,并相应地更新span和forward。
将被删除的节点记为x:
- 如果update[i]的第i层的forward为x,记x的第i层的span值为a,update[i]的第i层的span值为b,由于删除了一个节点,所以update[i]的第i层的span值应该更新为a+b-1,update[i]的第i层的新的forward应该为x的第i层的forward
- 如果update[i]的第i层的forward不为x,说明update[i]的层高大于x,i>x的level-1,即update[i]的第i层的forward指向了x后面的节点或者null,因此不需要更新forward,只要将span减1就行
然后是更新backward,如果x不为最后一个节点,直接将x的第0层的forward节点的backward设置为x的backward即可;如果x为最后一个节点,将跳表的尾指针指向x的backward即可。
最后更新跳表的level和length,这个逻辑比较简单,不再赘述了
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
//更新span和forward
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}
//更新backward
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
//更新跳表的level
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
//更新跳表的length
zsl->length--;
}
(完)