前面讲了B树的实现方法,今天一起看一下B+树,这是什么东东?
它是一种类似B树的东西,它融合了多路查找树的快速查找和线性表的顺序查找两种优点,所以特别适合做线性表的索引,比如你现在有一个线性表,其中的数据有1000万条,我们要从中查出连续的10条数据,如果从头到脚的一个一个地查找下去,效率会很低,速度很慢,你的老板也会让你拜拜,那么我们该如何设计一个算法,既能实现快速定位,又能连续取出某段范围的数据,这就需要用到B+树,因为B+树的叶子节点就是有序的线性表,分支结点就是索引项,所以从B+树很容易快速定位到要找的数据,然后连续取出满足条件的数据即可。
至于B+树的具体定义,网上很多,大家自己看。本篇我们的重点关注是如何实现,所以我们略过概念性的东西,直接转入算法的讨论,在这里我们主要讨论两个操作的算法,一个是插入,一个是删除,好,我们先从插入算法开始。
B+树的插入算法如下:
1)从根结点开始,沿纵向搜索路径找到x所在的叶结点d,如果d未满,把x插入到叶结点d的有序位置上,插入完毕。否则进入下一步。
2)叶d已满,叶长达到K+1,将d一分为二,具体做法如下:
1 产生新叶e,将d的下部K/2个元素从d移入e,另一半留在d中。
2 设d的父亲是f,给f加一个儿子e,e排在d之右侧,将e的最小元素的值加到f中作为新索引项。如果f不超长,则插入结束;否则,进入下一步。
3 将f分裂成f和g,并递归地将g插入上一层中。
4 若插入波及到根,把根一分为二,并产生新根。
具体代码如下:
bool BPlusTree::Insert(int nKey, string strName)
{
int i = 0;
int m = 0;
LeafNode * pLeafNode = new LeafNode;
if (!pLeafNode)
{
return false;
}
pLeafNode->m_nKey = nKey;
pLeafNode->m_strName = strName;
if (!m_pRoot)
{
//树空
Node * pNew = new Node;
if (!pNew)
{
if (pLeafNode)
delete pLeafNode;
return false;
}
pNew->m_Keys[0] = nKey;
pNew->m_bLeaf = true;
pNew->m_Pointers[0] = pLeafNode;
pNew->m_nKeyNum = 1;
pNew->m_pParent = NULL;
m_pRoot = pNew;
m_pFirst = pNew;
m_pLast = pNew;
return true;
}
//找到插入的叶节点
Node * pTmp = m_pRoot;
while (pTmp->m_bLeaf == false)
{
for (i = 0; i < pTmp->m_nKeyNum; i++)
{
if (nKey == pTmp->m_Keys[i])
{
if (pLeafNode)
delete pLeafNode;
return false;
}
if (nKey < pTmp->m_Keys[i])
break;
}
if (i == 0)
pTmp = (Node *)pTmp->m_Pointers[0];
else if (i == pTmp->m_nKeyNum)
pTmp = (Node *)pTmp->m_Pointers[i];
else
pTmp = (Node *)pTmp->m_Pointers[i];
}
//判断叶节点中是否存在
for (i = 0; i < pTmp->m_nKeyNum; i++)
{
if (nKey == pTmp->m_Keys[i])
{
if (pLeafNode)
delete pLeafNode;
return false;
}
}
//判断叶子节点的数量
if (pTmp->m_nKeyNum < LEAF_ORDER)
{
for (i = 0; i < pTmp->m_nKeyNum; i++)
{
if (nKey < pTmp->m_Keys[i])
{
break;
}
}
//移动
for (m = pTmp->m_nKeyNum - 1; m >= i; m--)
{
pTmp->m_Keys[m + 1] = pTmp->m_Keys[m];
pTmp->m_Pointers[m + 1] = pTmp->m_Pointers[m];
}
pTmp->m_Keys[i] = nKey;
pTmp->m_Pointers[i] = pLeafNode;
pTmp->m_nKeyNum++;
return true;
}
//叶子节点已经满了
int nMid = (LEAF_ORDER + 1) / 2;
int * pTmpKeys = new int[LEAF_ORDER + 1];
void ** pTmpPointers = new void * [LEAF_ORDER + 1];
if (!pTmpKeys || !pTmpPointers)
{
if (pTmpKeys)
delete[] pTmpKeys;
if (pTmpPointers)
delete[] pTmpPointers;
if (pLeafNode)
delete pLeafNode;
return false;
}
for (i = 0; i < pTmp->m_nKeyNum; i++)
{
if (pTmp->m_Keys[i] > nKey)
break;
}
for (m = pTmp->m_nKeyNum - 1; m >= i; m--)
{
pTmpKeys[m + 1] = pTmp->m_Keys[m];
pTmpPointers[m + 1] = pTmp->m_Pointers[m];
}
for (m = 0; m < i; m++)
{
pTmpKeys[m] = pTmp->m_Keys[m];
pTmpPointers[m] = pTmp->m_Pointers[m];
}
pTmpKeys[i] = nKey;
pTmpPointers[i] = pLeafNode;
Node * pNew = new Node;
if (!pNew)
{
if (pLeafNode)
delete pLeafNode;
if (pTmpKeys)
delete[] pTmpKeys;
if (pTmpPointers)
delete[] pTmpPointers;
return false;
}
for (m = 0, i = nMid; i < LEAF_ORDER + 1; i++,m++)
{
pNew->m_Keys[m] = pTmpKeys[i];
pNew->m_Pointers[m] = pTmpPointers[i];
pNew->m_nKeyNum++;
}
pNew->m_pParent = pTmp->m_pParent;
pNew->m_bLeaf = pTmp->m_bLeaf;
for (i = 0; i < pTmp->m_nKeyNum; i++)
{
pTmp->m_Keys[i] = 0;
pTmp->m_Pointers[i] = NULL;
}
pTmp->m_nKeyNum = 0;
for (i = 0; i < nMid; i++)
{
pTmp->m_Keys[i] = pTmpKeys[i];
pTmp->m_Pointers[i] = pTmpPointers[i];
pTmp->m_nKeyNum++;
}
pNew->m_pPrev = pTmp;
pNew->m_pNext = pTmp->m_pNext;
if (pTmp->m_pNext)
pTmp->m_pNext->m_pPrev = pNew;
pTmp->m_pNext = pNew;
if (pNew->m_pNext == NULL)
{
m_pLast = pNew;
}
if (InsertKeyAndPointer(pTmp->m_pParent, pTmp, pTmpKeys[nMid], pNew) == false)
{
if (pLeafNode)
delete pLeafNode;
if (pTmpKeys)
delete[] pTmpKeys;
if (pTmpPointers)
delete[] pTmpPointers;
if (pNew)
delete pNew;
return false;
}
if (pTmpKeys)
delete[] pTmpKeys;
if (pTmpPointers)
delete[] pTmpPointers;
return true;
}
删除具体算法如下:
1)沿根纵向找到关键字x所在的叶结点d,如果d中不存在关键字x,则删除失败,否则,进入下一步。
2)删除d中的x,如果删除x后,叶d不下溢,则删除结束;否则,进入下一步。
3)找叶d的一个临近兄弟e,如果e处于半满状态,进入步骤4,否则进入下面的处理。
从e中移一个元素给叶d,如果e在d的左侧,则移的是e中最大元素;如果e在d的右侧,则移的是e中最小元素,移走元素的同时,要相应地修改上层结点中的索引信息,删除结束。
4)将d合并到e,删除d,并相应地修改上层结点中的索引信息。当然,也可将e合并到d。
5)如果删除d不引起其父f的下溢,则删除结束;否则,将递归地波及到更上层结点,上层结点的操作和B树是一样的。
具体代码如下:
bool BPlusTree::Remove(int nKey)
{
if (!m_pRoot)
return false;
int i = 0;
int m = 0;
Node * pTmp = m_pRoot;
while (pTmp->m_bLeaf == false)
{
for (i = 0; i < pTmp->m_nKeyNum; i++)
{
if (nKey < pTmp->m_Keys[i])
{
break;
}
}
pTmp = (Node *)pTmp->m_Pointers[i];
}
for (i = 0; i < pTmp->m_nKeyNum; i++)
{
if (nKey == pTmp->m_Keys[i])
break;
}
if (i == pTmp->m_nKeyNum)
return false;
LeafNode * pCur = (LeafNode *)pTmp->m_Pointers[i];
if (pCur)
delete pCur;
for (m = i + 1; m < pTmp->m_nKeyNum; m++)
{
pTmp->m_Keys[m-1] = pTmp->m_Keys[m];
pTmp->m_Pointers[m-1] = pTmp->m_Pointers[m];
}
pTmp->m_nKeyNum--;
int nLowNum = (LEAF_ORDER + 1) / 2;
if (pTmp->m_nKeyNum >= nLowNum)
{
return true;
}
//下溢出
Node * pParent = pTmp->m_pParent;
if (!pParent)
{
//根结点
if (pTmp->m_nKeyNum < 1)
{
m_pRoot = NULL;
delete pTmp;
m_pFirst = m_pLast = NULL;
}
return true;
}
for (i = 0; i <= pParent->m_nKeyNum; i++)
{
if (pTmp == pParent->m_Pointers[i])
{
break;
}
}
Node * pNeighbor = NULL;
int nNeighbor = -1;
int nIndex = -1;
if (i == 0)
{
pNeighbor = (Node *)pParent->m_Pointers[1];
nNeighbor = 1;
nIndex = 0;
}
else
{
pNeighbor = (Node *)pParent->m_Pointers[i - 1];
nNeighbor = i - 1;
nIndex = i;
}
if (pNeighbor->m_nKeyNum > nLowNum)
{
//借一个元素
if (nNeighbor < nIndex)
{
pParent->m_Keys[nNeighbor] = pNeighbor->m_Keys[pNeighbor->m_nKeyNum - 1];
for (i = pTmp->m_nKeyNum - 1; i >= 0; i--)
{
pTmp->m_Keys[i + 1] = pTmp->m_Keys[i];
pTmp->m_Pointers[i + 1] = pTmp->m_Pointers[i];
}
pTmp->m_Keys[0] = pNeighbor->m_Keys[pNeighbor->m_nKeyNum - 1];
pTmp->m_Pointers[0] = pNeighbor->m_Pointers[pNeighbor->m_nKeyNum - 1];
pTmp->m_nKeyNum++;
pNeighbor->m_nKeyNum--;
}
else
{
pParent->m_Keys[nIndex] = pNeighbor->m_Keys[1];
pTmp->m_Keys[pTmp->m_nKeyNum] = pNeighbor->m_Keys[0];
pTmp->m_Pointers[pTmp->m_nKeyNum] = pNeighbor->m_Pointers[0];
pTmp->m_nKeyNum++;
for (i = 1; i <= pNeighbor->m_nKeyNum - 1; i++)
{
pNeighbor->m_Keys[i - 1] = pNeighbor->m_Keys[i];
pNeighbor->m_Pointers[i - 1] = pNeighbor->m_Pointers[i];
}
pNeighbor->m_nKeyNum--;
}
return true;
}
else
{
//合并邻居和自己
if (nNeighbor < nIndex)
{
for (i = 0; i < pTmp->m_nKeyNum; i++)
{
pNeighbor->m_Keys[pNeighbor->m_nKeyNum] = pTmp->m_Keys[i];
pNeighbor->m_Pointers[pNeighbor->m_nKeyNum] = pTmp->m_Pointers[i];
pNeighbor->m_nKeyNum++;
}
for (i = nIndex; i < pParent->m_nKeyNum; i++)
{
pParent->m_Keys[i - 1] = pParent->m_Keys[i];
}
for (i = nIndex + 1; i <= pParent->m_nKeyNum; i++)
{
pParent->m_Pointers[i - 1] = pParent->m_Pointers[i];
}
pParent->m_nKeyNum--;
if (pTmp->m_pPrev)
pTmp->m_pPrev->m_pNext = pTmp->m_pNext;
else
{
pTmp->m_pNext->m_pPrev = NULL;
m_pFirst = pTmp->m_pNext;
}
if (pTmp->m_pNext)
pTmp->m_pNext->m_pPrev = pTmp->m_pPrev;
else
{
pTmp->m_pPrev->m_pNext = NULL;
m_pLast = pTmp->m_pPrev;
}
delete pTmp;
}
else
{
for (i = 0; i < pNeighbor->m_nKeyNum; i++)
{
pTmp->m_Keys[pTmp->m_nKeyNum] = pNeighbor->m_Keys[i];
pTmp->m_Pointers[pTmp->m_nKeyNum] = pNeighbor->m_Pointers[i];
pTmp->m_nKeyNum++;
}
for (i = nNeighbor; i < pParent->m_nKeyNum; i++)
{
pParent->m_Keys[i - 1] = pParent->m_Keys[i];
}
for (i = nNeighbor + 1; i <= pParent->m_nKeyNum; i++)
{
pParent->m_Pointers[i - 1] = pParent->m_Pointers[i];
}
pParent->m_nKeyNum--;
if (pNeighbor->m_pPrev)
pNeighbor->m_pPrev->m_pNext = pNeighbor->m_pNext;
else
{
pNeighbor->m_pNext->m_pPrev = NULL;
m_pFirst = pNeighbor->m_pNext;
}
if (pNeighbor->m_pNext)
pNeighbor->m_pNext->m_pPrev = pNeighbor->m_pPrev;
else
{
pNeighbor->m_pPrev->m_pNext = NULL;
m_pLast = pNeighbor->m_pPrev;
}
delete pNeighbor;
}
Node * pCurTmp = pParent;
int nInternalLowNum = (ORDER + 1) / 2;
//分支结点
while (pCurTmp)
{
if (pCurTmp->m_nKeyNum >= nInternalLowNum)
{
break;
}
//借或者合并
Node * pCurParent = pCurTmp->m_pParent;
Node * pCurNeighbor = NULL;
int nCurIndex = 0;
int nNeighborIndex = 0;
int nTmp = 0;
if (!pCurParent)
{
//根结点
if (pCurTmp->m_nKeyNum < 1)
{
((Node *)pCurTmp->m_Pointers[0])->m_pParent = NULL;
m_pRoot = (Node *)pCurTmp->m_Pointers[0];
delete pCurTmp;
}
break;
}
else
{
//非根结点
for (i = 0; i <= pCurParent->m_nKeyNum; i++)
{
if (pCurTmp == pCurParent->m_Pointers[i])
{
break;
}
}
if (i == 0)
{
pCurNeighbor = (Node *)pCurParent->m_Pointers[1];
nCurIndex = 0;
nNeighborIndex = 1;
}
else
{
pCurNeighbor = (Node *)pCurParent->m_Pointers[i-1];
nCurIndex = i;
nNeighborIndex = i-1;
}
if (pCurNeighbor->m_nKeyNum > nInternalLowNum)
{
//借
if (nNeighborIndex < nCurIndex)
{
//左借
nTmp = pCurParent->m_Keys[nNeighborIndex];
pCurParent->m_Keys[nNeighborIndex] = pCurNeighbor->m_Keys[pCurNeighbor->m_nKeyNum - 1];
for (i = pCurTmp->m_nKeyNum - 1; i >= 0; i--)
{
pCurTmp->m_Keys[i + 1] = pCurTmp->m_Keys[i];
}
for (i = pCurTmp->m_nKeyNum; i >= 0; i--)
{
pCurTmp->m_Pointers[i + 1] = pCurTmp->m_Pointers[i];
}
pCurTmp->m_Keys[0] = nTmp;
if (pCurNeighbor->m_Pointers[pCurNeighbor->m_nKeyNum])
{
static_cast<Node *>(pCurNeighbor->m_Pointers[pCurNeighbor->m_nKeyNum])->m_pParent = pCurTmp;
}
pCurTmp->m_Pointers[0] = pCurNeighbor->m_Pointers[pCurNeighbor->m_nKeyNum];
pCurTmp->m_nKeyNum++;
pCurNeighbor->m_nKeyNum--;
}
else
{
//右借
nTmp = pCurParent->m_Keys[nCurIndex];
pCurParent->m_Keys[nCurIndex] = pCurNeighbor->m_Keys[0];
pCurTmp->m_Keys[pCurTmp->m_nKeyNum] = nTmp;
if (pCurNeighbor->m_Pointers[0])
{
static_cast<Node *>(pCurNeighbor->m_Pointers[0])->m_pParent = pCurTmp;
}
pCurTmp->m_Pointers[pCurTmp->m_nKeyNum + 1] = pCurNeighbor->m_Pointers[0];
pCurTmp->m_nKeyNum++;
for (i = 1; i < pCurNeighbor->m_nKeyNum; i++)
{
pCurNeighbor->m_Keys[i - 1] = pCurNeighbor->m_Keys[i];
}
for (i = 1; i <= pCurNeighbor->m_nKeyNum; i++)
{
pCurNeighbor->m_Pointers[i-1] = pCurNeighbor->m_Pointers[i];
}
pCurNeighbor->m_nKeyNum--;
}
break;
}
else
{
if (nNeighborIndex < nCurIndex)
{
//左合并
pCurNeighbor->m_Keys[pCurNeighbor->m_nKeyNum] = pCurParent->m_Keys[nNeighborIndex];
pCurNeighbor->m_nKeyNum++;
for (i = 0; i < pCurTmp->m_nKeyNum; i++)
{
pCurNeighbor->m_Keys[pCurNeighbor->m_nKeyNum] = pCurTmp->m_Keys[i];
pCurNeighbor->m_Pointers[pCurNeighbor->m_nKeyNum] = pCurTmp->m_Pointers[i];
Node * pChild = (Node *)pCurNeighbor->m_Pointers[pCurNeighbor->m_nKeyNum];
if (pChild)
pChild->m_pParent = pCurNeighbor;
pCurNeighbor->m_nKeyNum++;
}
pCurNeighbor->m_Pointers[pCurNeighbor->m_nKeyNum] = pCurTmp->m_Pointers[i];
if (pCurNeighbor->m_Pointers[pCurNeighbor->m_nKeyNum])
{
static_cast<Node *>(pCurNeighbor->m_Pointers[pCurNeighbor->m_nKeyNum])->m_pParent = pCurNeighbor;
}
for (i = nNeighborIndex + 1; i < pCurParent->m_nKeyNum; i++)
{
pCurParent->m_Keys[i - 1] = pCurParent->m_Keys[i];
}
for (i = nCurIndex + 1; i <= pCurParent->m_nKeyNum; i++)
{
pCurParent->m_Pointers[i - 1] = pCurParent->m_Pointers[i];
}
pCurParent->m_nKeyNum--;
delete pCurTmp;
}
else
{
//右合并
pCurTmp->m_Keys[pCurTmp->m_nKeyNum] = pCurParent->m_Keys[nCurIndex];
pCurTmp->m_nKeyNum++;
for (i = 0; i < pCurNeighbor->m_nKeyNum; i++)
{
pCurTmp->m_Keys[pCurTmp->m_nKeyNum] = pCurNeighbor->m_Keys[i];
pCurTmp->m_Pointers[pCurTmp->m_nKeyNum] = pCurNeighbor->m_Pointers[i];
Node * pChild = (Node *)(pCurTmp->m_Pointers[pCurTmp->m_nKeyNum]);
if (pChild)
pChild->m_pParent = pCurTmp;
pCurTmp->m_nKeyNum++;
}
pCurTmp->m_Pointers[pCurTmp->m_nKeyNum] = pCurNeighbor->m_Pointers[i];
if (pCurTmp->m_Pointers[pCurTmp->m_nKeyNum])
{
((Node *)(pCurTmp->m_Pointers[pCurTmp->m_nKeyNum]))->m_pParent = pCurTmp;
}
for (i = nCurIndex + 1; i < pCurParent->m_nKeyNum; i++)
{
pCurParent->m_Keys[i - 1] = pCurParent->m_Keys[i];
}
for (i = nNeighborIndex + 1; i <= pCurParent->m_nKeyNum; i++)
{
pCurParent->m_Pointers[i - 1] = pCurParent->m_Pointers[i];
}
pCurParent->m_nKeyNum--;
delete pCurNeighbor;
}
pCurTmp = pCurParent;
}
}
}
return true;
}
}
有需要代码的小伙伴,可以从下面的地址下载:
https://download.csdn.net/download/u011711997/10432745