简单数据库实现——Part12 - 扫描多级B树
我们现在支持构造一个多级B树,但是这破坏了select语句。下面是一个测试用例,他插入15行,然后尝试打印它们。
+ it 'prints all rows in a multi-level tree' do
+ script = []
+ (1..15).each do |i|
+ script << "insert #{i} user#{i} person#{i}@example.com"
+ end
+ script << "select"
+ script << ".exit"
+ result = run_script(script)
+
+ expect(result[15...result.length]).to match_array([
+ "db > (1, user1, [email protected])",
+ "(2, user2, [email protected])",
+ "(3, user3, [email protected])",
+ "(4, user4, [email protected])",
+ "(5, user5, [email protected])",
+ "(6, user6, [email protected])",
+ "(7, user7, [email protected])",
+ "(8, user8, [email protected])",
+ "(9, user9, [email protected])",
+ "(10, user10, [email protected])",
+ "(11, user11, [email protected])",
+ "(12, user12, [email protected])",
+ "(13, user13, [email protected])",
+ "(14, user14, [email protected])",
+ "(15, user15, [email protected])",
+ "Executed.", "db > ",
+ ])
+ end
但是当我们运行时,实际上会发生:
db > select
(2, user1, [email protected])
Executed.
很奇怪只打印了一行,而且这一行也损坏了。这是因为execute_select()
从表的开头开始,而我们现在的table_start()
返回的是根节点的0号元素。但是根节点现在是一个内部节点,它不包含任何行数据。我们打印的数据是当根节点还是一个叶节点的时候遗留的数据。execute_select()
实际上应该返回最左侧的叶节点的0号元素。
所以让我们先删除旧的部分:
-Cursor* table_start(Table* table) {
- Cursor* cursor = malloc(sizeof(Cursor));
- cursor->table = table;
- cursor->page_num = table->root_page_num;
- cursor->cell_num = 0;
-
- void* root_node = get_page(table->pager, table->root_page_num);
- uint32_t num_cells = *leaf_node_num_cells(root_node);
- cursor->end_of_table = (num_cells == 0);
-
- return cursor;
-}
然后添加一个新的搜索,实现查找最小key。如果key0不存在,则返回最小的id所在的位置。
+Cursor* table_start(Table* table) {
+ Cursor* cursor = table_find(table, 0);
+
+ void* node = get_page(table->pager, cursor->page_num);
+ uint32_t num_cells = *leaf_node_num_cells(node);
+ cursor->end_of_table = (num_cells == 0);
+
+ return cursor;
+}
通过以上更改,它依然只打印一个节点里的行数据。
db > select
(1, user1, person1@example.com)
(2, user2, person2@example.com)
(3, user3, person3@example.com)
(4, user4, person4@example.com)
(5, user5, person5@example.com)
(6, user6, person6@example.com)
(7, user7, person7@example.com)
Executed.
db >
有15个元素,我们的B树由一个内部节点和两个叶节点组成,像这样:
为了扫描所有表,我们需要在到达第一个叶节点最后的时候跳转到第二个叶节点。为此,我们将在叶节点的头部保存一个叫next_leaf
的新字段,它将在右侧保存叶节点的兄弟节点的页码。最右边的叶节点的next_leaf
值为0,表示没有兄弟节点。
更新头部:
const uint32_t LEAF_NODE_NUM_CELLS_SIZE = sizeof(uint32_t);
const uint32_t LEAF_NODE_NUM_CELLS_OFFSET = COMMON_NODE_HEADER_SIZE;
-const uint32_t LEAF_NODE_HEADER_SIZE =
- COMMON_NODE_HEADER_SIZE + LEAF_NODE_NUM_CELLS_SIZE;
+const uint32_t LEAF_NODE_NEXT_LEAF_SIZE = sizeof(uint32_t);
+const uint32_t LEAF_NODE_NEXT_LEAF_OFFSET =
+ LEAF_NODE_NUM_CELLS_OFFSET + LEAF_NODE_NUM_CELLS_SIZE;
+const uint32_t LEAF_NODE_HEADER_SIZE = COMMON_NODE_HEADER_SIZE +
+ LEAF_NODE_NUM_CELLS_SIZE +
+ LEAF_NODE_NEXT_LEAF_SIZE;
添加访问新字段的方法:
+uint32_t* leaf_node_next_leaf(void* node) {
+ return node + LEAF_NODE_NEXT_LEAF_OFFSET;
+}
初始化新叶节点时,默认将next_leaf
设置为0。
@@ -322,6 +330,7 @@ void initialize_leaf_node(void* node) {
set_node_type(node, NODE_LEAF);
set_node_root(node, false);
*leaf_node_num_cells(node) = 0;
+ *leaf_node_next_leaf(node) = 0; // 0 represents no sibling
}
当我们拆分一个叶节点时,要更新同级指针。旧叶节点的兄弟变成了新叶节点,而新叶节点的兄弟变成了旧叶节点的兄弟。
@@ -659,6 +671,8 @@ void leaf_node_split_and_insert(Cursor* cursor, uint32_t key, Row* value) {
uint32_t new_page_num = get_unused_page_num(cursor->table->pager);
void* new_node = get_page(cursor->table->pager, new_page_num);
initialize_leaf_node(new_node);
+ *leaf_node_next_leaf(new_node) = *leaf_node_next_leaf(old_node);
+ *leaf_node_next_leaf(old_node) = new_page_num;
还需要更新几个常量:
it 'prints constants' do
script = [
".constants",
@@ -199,9 +228,9 @@ describe 'database' do
"db > Constants:",
"ROW_SIZE: 293",
"COMMON_NODE_HEADER_SIZE: 6",
- "LEAF_NODE_HEADER_SIZE: 10",
+ "LEAF_NODE_HEADER_SIZE: 14",
"LEAF_NODE_CELL_SIZE: 297",
- "LEAF_NODE_SPACE_FOR_CELLS: 4086",
+ "LEAF_NODE_SPACE_FOR_CELLS: 4082",
"LEAF_NODE_MAX_CELLS: 13",
"db > ",
])
现在,当我们希望将游标移动到叶节点的末尾,我们可以不断查询叶节点是否有兄弟节点,如果有就不停往后跳,如果没有,则说明已经在末尾了。
@@ -428,7 +432,15 @@ void cursor_advance(Cursor* cursor) {
cursor->cell_num += 1;
if (cursor->cell_num >= (*leaf_node_num_cells(node))) {
- cursor->end_of_table = true;
+ /* Advance to next leaf node */
+ uint32_t next_page_num = *leaf_node_next_leaf(node);
+ if (next_page_num == 0) {
+ /* This was rightmost leaf */
+ cursor->end_of_table = true;
+ } else {
+ cursor->page_num = next_page_num;
+ cursor->cell_num = 0;
+ }
}
}
通过以上更改,我们可以打印15行了。
db > select
(1, user1, [email protected])
(2, user2, [email protected])
(3, user3, [email protected])
(4, user4, [email protected])
(5, user5, [email protected])
(6, user6, [email protected])
(7, user7, [email protected])
(8, user8, [email protected])
(9, user9, [email protected])
(10, user10, [email protected])
(11, user11, [email protected])
(12, user12, [email protected])
(13, user13, [email protected])
(1919251317, 14, [email protected])
(15, user15, [email protected])
Executed.
db >
但是有一个看起来不正确。
(1919251317, 14, [email protected])
这是因为在分裂叶节点时有一个错误:
@@ -676,7 +690,9 @@ void leaf_node_split_and_insert(Cursor* cursor, uint32_t key, Row* value) {
void* destination = leaf_node_cell(destination_node, index_within_node);
if (i == cursor->cell_num) {
- serialize_row(value, destination);
+ serialize_row(value,
+ leaf_node_value(destination_node, index_within_node));
+ *leaf_node_key(destination_node, index_within_node) = key;
} else if (i > cursor->cell_num) {
memcpy(destination, leaf_node_cell(old_node, i - 1), LEAF_NODE_CELL_SIZE);
} else {
记住叶节点中的每个单元首先包含一个键,然后才是一个值。
现在,我们的输出终于和预期一样了。
db > select
(1, user1, [email protected])
(2, user2, [email protected])
(3, user3, [email protected])
(4, user4, [email protected])
(5, user5, [email protected])
(6, user6, [email protected])
(7, user7, [email protected])
(8, user8, [email protected])
(9, user9, [email protected])
(10, user10, [email protected])
(11, user11, [email protected])
(12, user12, [email protected])
(13, user13, [email protected])
(14, user14, [email protected])
(15, user15, [email protected])
Executed.
db >