简单数据库实现——Part8 - B树的叶节点结构
我们正在将表的结构更改为B树。这是一个很大的变化,需要很多篇文章。本文中,我们将定义叶节点的布局,并支持将键值对插入到单节点树中。首先,我们来回顾一下切换到树形结构的原因。
选择表结构
使用当前的结构,每个页面只存储行,因此空间利用率很高,我们只在末尾追加,所以插入也很快。但是只能通过扫描整个表来查找特定的行。如果我们想删除一行,必须移动后面所有的行。
如果我们将表存储为数组,按ID排序,我们可以用二分来查找特定的ID。但是插入会特别慢,因为我们必须移动很多行来腾出空间。
所以我们要使用树结构。树中的每个节点可以包含可变数量的行,因此我们必须在每个节点中存储一些信息来表示包含的行数。另外,所有不存储任何行的内部节点都有存储开销。我们用更大的数据库文件来换取更快的插入、删除和查找。
未排序数组 | 排序数组 | 树结构 | |
---|---|---|---|
页包含 | 仅数据 | 仅数据 | 元数据、key和数据 |
行每页 | 多 | 多 | 少 |
插入 | O(1) | O(n) | O(log(n)) |
删除 | O(n) | O(n) | O(log(n)) |
查询 | O(n) | O(log(n)) | O(log(n)) |
节点头部结构
叶节点和内部节点有不同的布局。我们先定义一个enum来区别这两种类型。
+typedef enum { NODE_INTERNAL, NODE_LEAF } NodeType;
每个节点对应一个页面。内部节点将通过存储页号来指向它的子节点。B树向寻呼机(pager)询问特定的页码,并获取指向页面缓存的指针。页面按照页码顺序依次存储在数据库文件中。
节点需要在页面开头的头部(header)存储一些元数据(metadata)。这个节点将存储它是什么类型的节点,是否是根节点,以及其父节点的指针。为每个头部(header)的大小和偏移量定义常量:
+/*
+ * Common Node Header Layout
+ */
+const uint32_t NODE_TYPE_SIZE = sizeof(uint8_t);
+const uint32_t NODE_TYPE_OFFSET = 0;
+const uint32_t IS_ROOT_SIZE = sizeof(uint8_t);
+const uint32_t IS_ROOT_OFFSET = NODE_TYPE_SIZE;
+const uint32_t PARENT_POINTER_SIZE = sizeof(uint32_t);
+const uint32_t PARENT_POINTER_OFFSET = IS_ROOT_OFFSET + IS_ROOT_SIZE;
+const uint8_t COMMON_NODE_HEADER_SIZE =
+ NODE_TYPE_SIZE + IS_ROOT_SIZE + PARENT_POINTER_SIZE;
叶节点结构
除了最基础的头部信息,叶节点还需要存储它们包含的键值对个数。
+/*
+ * Leaf Node Header Layout
+ */
+const uint32_t LEAF_NODE_NUM_CELLS_SIZE = sizeof(uint32_t);
+const uint32_t LEAF_NODE_NUM_CELLS_OFFSET = COMMON_NODE_HEADER_SIZE;
+const uint32_t LEAF_NODE_HEADER_SIZE =
+ COMMON_NODE_HEADER_SIZE + LEAF_NODE_NUM_CELLS_SIZE;
叶节点的主体是一个键值对数组,每个键值对都是一个键,后面跟一个值(序列化的行)。
+/*
+ * Leaf Node Body Layout
+ */
+const uint32_t LEAF_NODE_KEY_SIZE = sizeof(uint32_t);
+const uint32_t LEAF_NODE_KEY_OFFSET = 0;
+const uint32_t LEAF_NODE_VALUE_SIZE = ROW_SIZE;
+const uint32_t LEAF_NODE_VALUE_OFFSET =
+ LEAF_NODE_KEY_OFFSET + LEAF_NODE_KEY_SIZE;
+const uint32_t LEAF_NODE_CELL_SIZE = LEAF_NODE_KEY_SIZE + LEAF_NODE_VALUE_SIZE;
+const uint32_t LEAF_NODE_SPACE_FOR_CELLS = PAGE_SIZE - LEAF_NODE_HEADER_SIZE;
+const uint32_t LEAF_NODE_MAX_CELLS =
+ LEAF_NODE_SPACE_FOR_CELLS / LEAF_NODE_CELL_SIZE;
基于这些常量,下面是目前叶节点的布局:
在头部为每个布尔值使用整个字节可能会占用一些空间,但这使编写代码来访问这些值更容易。
还要注意的是最后还有一些被浪费的空间,我们在头部之后尽可能多地存储键值对,但是剩余的空间不能容纳整个单元格,我们让它为空,避免节点之间分割数据。
访问叶节点字段
访问键、值和元数据的代码都涉及到刚才定义的常量进行指针运算。
+uint32_t* leaf_node_num_cells(void* node) {
+ return node + LEAF_NODE_NUM_CELLS_OFFSET;
+}
+
+void* leaf_node_cell(void* node, uint32_t cell_num) {
+ return node + LEAF_NODE_HEADER_SIZE + cell_num * LEAF_NODE_CELL_SIZE;
+}
+
+uint32_t* leaf_node_key(void* node, uint32_t cell_num) {
+ return leaf_node_cell(node, cell_num);
+}
+
+void* leaf_node_value(void* node, uint32_t cell_num) {
+ return leaf_node_cell(node, cell_num) + LEAF_NODE_KEY_SIZE;
+}
+
+void initialize_leaf_node(void* node) { *leaf_node_num_cells(node) = 0; }
+
这些方法返回一个指向相关值的指针,因此他们可以同时用来获取和设置数据。
修改分页器和表
即使每个节点不满,每个节点也会完全占据一页,这意味着我们的分页器(pager)不再需要支持读取/写入部分页面。
-void pager_flush(Pager* pager, uint32_t page_num, uint32_t size) {
+void pager_flush(Pager* pager, uint32_t page_num) {
if (pager->pages[page_num] == NULL) {
printf("Tried to flush null page\n");
exit(EXIT_FAILURE);
@@ -242,7 +337,7 @@ void pager_flush(Pager* pager, uint32_t page_num, uint32_t size) {
}
ssize_t bytes_written =
- write(pager->file_descriptor, pager->pages[page_num], size);
+ write(pager->file_descriptor, pager->pages[page_num], PAGE_SIZE);
if (bytes_written == -1) {
printf("Error writing: %d\n", errno);
void db_close(Table* table) {
Pager* pager = table->pager;
- uint32_t num_full_pages = table->num_rows / ROWS_PER_PAGE;
- for (uint32_t i = 0; i < num_full_pages; i++) {
+ for (uint32_t i = 0; i < pager->num_pages; i++) {
if (pager->pages[i] == NULL) {
continue;
}
- pager_flush(pager, i, PAGE_SIZE);
+ pager_flush(pager, i);
free(pager->pages[i]);
pager->pages[i] = NULL;
}
- // There may be a partial page to write to the end of the file
- // This should not be needed after we switch to a B-tree
- uint32_t num_additional_rows = table->num_rows % ROWS_PER_PAGE;
- if (num_additional_rows > 0) {
- uint32_t page_num = num_full_pages;
- if (pager->pages[page_num] != NULL) {
- pager_flush(pager, page_num, num_additional_rows * ROW_SIZE);
- free(pager->pages[page_num]);
- pager->pages[page_num] = NULL;
- }
- }
-
int result = close(pager->file_descriptor);
if (result == -1) {
printf("Error closing db file.\n");
现在,在数据库中存储页面(page)数比存储行(row)数更有意义。页的数量应该与分页器相关联,而不是与表相关联,因为它是数据库使用的页的数量,而不是某个表。B树由它的根节点页号标识,所以表需要跟踪它。
const uint32_t PAGE_SIZE = 4096;
const uint32_t TABLE_MAX_PAGES = 100;
-const uint32_t ROWS_PER_PAGE = PAGE_SIZE / ROW_SIZE;
-const uint32_t TABLE_MAX_ROWS = ROWS_PER_PAGE * TABLE_MAX_PAGES;
typedef struct {
int file_descriptor;
uint32_t file_length;
+ uint32_t num_pages;
void* pages[TABLE_MAX_PAGES];
} Pager;
typedef struct {
Pager* pager;
- uint32_t num_rows;
+ uint32_t root_page_num;
} Table;
@@ -127,6 +200,10 @@ void* get_page(Pager* pager, uint32_t page_num) {
}
pager->pages[page_num] = page;
+
+ if (page_num >= pager->num_pages) {
+ pager->num_pages = page_num + 1;
+ }
}
return pager->pages[page_num];
@@ -184,6 +269,12 @@ Pager* pager_open(const char* filename) {
Pager* pager = malloc(sizeof(Pager));
pager->file_descriptor = fd;
pager->file_length = file_length;
+ pager->num_pages = (file_length / PAGE_SIZE);
+
+ if (file_length % PAGE_SIZE != 0) {
+ printf("Db file is not a whole number of pages. Corrupt file.\n");
+ exit(EXIT_FAILURE);
+ }
for (uint32_t i = 0; i < TABLE_MAX_PAGES; i++) {
pager->pages[i] = NULL;
修改游标
游标表示的是表中的一个位置,当我们的表是一个简单的行数组时,我们可以访问一个给定的行号。现在它是一个树,我们通过节点的页号(page_num
)和该节点中的号码(cell_num
)来标识位置。
typedef struct {
Table* table;
- uint32_t row_num;
+ uint32_t page_num;
+ uint32_t cell_num;
bool end_of_table; // Indicates a position one past the last element
} Cursor;
Cursor* table_start(Table* table) {
Cursor* cursor = malloc(sizeof(Cursor));
cursor->table = table;
- cursor->row_num = 0;
- cursor->end_of_table = (table->num_rows == 0);
+ cursor->page_num = table->root_page_num;
+ cursor->cell_num = 0;
+
+ void* root_node = get_page(table->pager, table->root_page_num);
+ uint32_t num_cells = *leaf_node_num_cells(root_node);
+ cursor->end_of_table = (num_cells == 0);
return cursor;
}
Cursor* table_end(Table* table) {
Cursor* cursor = malloc(sizeof(Cursor));
cursor->table = table;
- cursor->row_num = table->num_rows;
+ cursor->page_num = table->root_page_num;
+
+ void* root_node = get_page(table->pager, table->root_page_num);
+ uint32_t num_cells = *leaf_node_num_cells(root_node);
+ cursor->cell_num = num_cells;
cursor->end_of_table = true;
return cursor;
}
void* cursor_value(Cursor* cursor) {
- uint32_t row_num = cursor->row_num;
- uint32_t page_num = row_num / ROWS_PER_PAGE;
+ uint32_t page_num = cursor->page_num;
void* page = get_page(cursor->table->pager, page_num);
- uint32_t row_offset = row_num % ROWS_PER_PAGE;
- uint32_t byte_offset = row_offset * ROW_SIZE;
- return page + byte_offset;
+ return leaf_node_value(page, cursor->cell_num);
}
void cursor_advance(Cursor* cursor) {
- cursor->row_num += 1;
- if (cursor->row_num >= cursor->table->num_rows) {
+ uint32_t page_num = cursor->page_num;
+ void* node = get_page(cursor->table->pager, page_num);
+
+ cursor->cell_num += 1;
+ if (cursor->cell_num >= (*leaf_node_num_cells(node))) {
cursor->end_of_table = true;
}
}
插入到叶节点中
我们现在只实现单节点树(single-node tree),回顾上一部分,树一开始是一个空叶节点。
可以添加键值对,直到叶节点存满:
当我们第一次打开数据库时,数据库文件将是空的,因此我们将第0页初始化为空叶节点(根节点)。
Table* db_open(const char* filename) {
Pager* pager = pager_open(filename);
- uint32_t num_rows = pager->file_length / ROW_SIZE;
Table* table = malloc(sizeof(Table));
table->pager = pager;
- table->num_rows = num_rows;
+ table->root_page_num = 0;
+
+ if (pager->num_pages == 0) {
+ // New database file. Initialize page 0 as leaf node.
+ void* root_node = get_page(pager, 0);
+ initialize_leaf_node(root_node);
+ }
return table;
}
接下来,我们将创建一个将键值对插入叶节点的函数,它将以游标作为参数,来表示应该插入的位置。
+void leaf_node_insert(Cursor* cursor, uint32_t key, Row* value) {
+ void* node = get_page(cursor->table->pager, cursor->page_num);
+
+ uint32_t num_cells = *leaf_node_num_cells(node);
+ if (num_cells >= LEAF_NODE_MAX_CELLS) {
+ // Node full
+ printf("Need to implement splitting a leaf node.\n");
+ exit(EXIT_FAILURE);
+ }
+
+ if (cursor->cell_num < num_cells) {
+ // Make room for new cell
+ for (uint32_t i = num_cells; i > cursor->cell_num; i--) {
+ memcpy(leaf_node_cell(node, i), leaf_node_cell(node, i - 1),
+ LEAF_NODE_CELL_SIZE);
+ }
+ }
+
+ *(leaf_node_num_cells(node)) += 1;
+ *(leaf_node_key(node, cursor->cell_num)) = key;
+ serialize_row(value, leaf_node_value(node, cursor->cell_num));
+}
+
我们现在还没实现分裂(split),因此如果节点已满,就会出错,接下来,我们将键值对向右移动一个空间,为新键值对腾出空间。然后我们将新的键值对写入空白处。
因为我们假设树只有一个节点,因此execute_insert()
函数只需要调用此辅助方法。
ExecuteResult execute_insert(Statement* statement, Table* table) {
- if (table->num_rows >= TABLE_MAX_ROWS) {
+ void* node = get_page(table->pager, table->root_page_num);
+ if ((*leaf_node_num_cells(node) >= LEAF_NODE_MAX_CELLS)) {
return EXECUTE_TABLE_FULL;
}
Row* row_to_insert = &(statement->row_to_insert);
Cursor* cursor = table_end(table);
- serialize_row(row_to_insert, cursor_value(cursor));
- table->num_rows += 1;
+ leaf_node_insert(cursor, row_to_insert->id, row_to_insert);
free(cursor);
有了这些更改,我们的数据库应该可以像以前一样工作了。除了现在它返回表已满的错误会更快,因为我们现在还不能分裂节点(只有一个节点)。
一个叶节点可以容纳多少行?
打印常量的命令
我添加了一个新的元命令来打印出几个我们感兴趣的常量。
+void print_constants() {
+ printf("ROW_SIZE: %d\n", ROW_SIZE);
+ printf("COMMON_NODE_HEADER_SIZE: %d\n", COMMON_NODE_HEADER_SIZE);
+ printf("LEAF_NODE_HEADER_SIZE: %d\n", LEAF_NODE_HEADER_SIZE);
+ printf("LEAF_NODE_CELL_SIZE: %d\n", LEAF_NODE_CELL_SIZE);
+ printf("LEAF_NODE_SPACE_FOR_CELLS: %d\n", LEAF_NODE_SPACE_FOR_CELLS);
+ printf("LEAF_NODE_MAX_CELLS: %d\n", LEAF_NODE_MAX_CELLS);
+}
+
@@ -294,6 +376,14 @@ MetaCommandResult do_meta_command(InputBuffer* input_buffer, Table* table) {
if (strcmp(input_buffer->buffer, ".exit") == 0) {
db_close(table);
exit(EXIT_SUCCESS);
+ } else if (strcmp(input_buffer->buffer, ".constants") == 0) {
+ printf("Constants:\n");
+ print_constants();
+ return META_COMMAND_SUCCESS;
} else {
return META_COMMAND_UNRECOGNIZED_COMMAND;
}
然后还有一些测试,最后我们的表现在可以容纳13行。
树的可视化
为了帮助调试和可视化,我还添加了一个命令来打印B树的结构。
+void print_leaf_node(void* node) {
+ uint32_t num_cells = *leaf_node_num_cells(node);
+ printf("leaf (size %d)\n", num_cells);
+ for (uint32_t i = 0; i < num_cells; i++) {
+ uint32_t key = *leaf_node_key(node, i);
+ printf(" - %d : %d\n", i, key);
+ }
+}
+
@@ -294,6 +376,14 @@ MetaCommandResult do_meta_command(InputBuffer* input_buffer, Table* table) {
if (strcmp(input_buffer->buffer, ".exit") == 0) {
db_close(table);
exit(EXIT_SUCCESS);
+ } else if (strcmp(input_buffer->buffer, ".btree") == 0) {
+ printf("Tree:\n");
+ print_leaf_node(get_page(table->pager, 0));
+ return META_COMMAND_SUCCESS;
} else if (strcmp(input_buffer->buffer, ".constants") == 0) {
printf("Constants:\n");
print_constants();
return META_COMMAND_SUCCESS;
} else {
return META_COMMAND_UNRECOGNIZED_COMMAND;
}
我们目前还没有按照排序的顺序存储。你会发现execute_insert()
插入叶节点在table_end()
返回的位置。所以行和以前一样按照插入的顺序存储。
下一部分
这看起来是一种退步,因为我们的数据库现在存储的行比以前少了,而且我们仍然以未排序的顺序存储行。但这是一个很大的变化。
下一部分,我们将实现通过主键查找记录,并开始按排序顺序存储。
修改的部分
@@ -62,29 +62,101 @@ const uint32_t ROW_SIZE = ID_SIZE + USERNAME_SIZE + EMAIL_SIZE;
const uint32_t PAGE_SIZE = 4096;
#define TABLE_MAX_PAGES 100
-const uint32_t ROWS_PER_PAGE = PAGE_SIZE / ROW_SIZE;
-const uint32_t TABLE_MAX_ROWS = ROWS_PER_PAGE * TABLE_MAX_PAGES;
typedef struct {
int file_descriptor;
uint32_t file_length;
+ uint32_t num_pages;
void* pages[TABLE_MAX_PAGES];
} Pager;
typedef struct {
Pager* pager;
- uint32_t num_rows;
+ uint32_t root_page_num;
} Table;
typedef struct {
Table* table;
- uint32_t row_num;
+ uint32_t page_num;
+ uint32_t cell_num;
bool end_of_table; // Indicates a position one past the last element
} Cursor;
+typedef enum { NODE_INTERNAL, NODE_LEAF } NodeType;
+
+/*
+ * Common Node Header Layout
+ */
+const uint32_t NODE_TYPE_SIZE = sizeof(uint8_t);
+const uint32_t NODE_TYPE_OFFSET = 0;
+const uint32_t IS_ROOT_SIZE = sizeof(uint8_t);
+const uint32_t IS_ROOT_OFFSET = NODE_TYPE_SIZE;
+const uint32_t PARENT_POINTER_SIZE = sizeof(uint32_t);
+const uint32_t PARENT_POINTER_OFFSET = IS_ROOT_OFFSET + IS_ROOT_SIZE;
+const uint8_t COMMON_NODE_HEADER_SIZE =
+ NODE_TYPE_SIZE + IS_ROOT_SIZE + PARENT_POINTER_SIZE;
+
+/*
+ * Leaf Node Header Layout
+ */
+const uint32_t LEAF_NODE_NUM_CELLS_SIZE = sizeof(uint32_t);
+const uint32_t LEAF_NODE_NUM_CELLS_OFFSET = COMMON_NODE_HEADER_SIZE;
+const uint32_t LEAF_NODE_HEADER_SIZE =
+ COMMON_NODE_HEADER_SIZE + LEAF_NODE_NUM_CELLS_SIZE;
+
+/*
+ * Leaf Node Body Layout
+ */
+const uint32_t LEAF_NODE_KEY_SIZE = sizeof(uint32_t);
+const uint32_t LEAF_NODE_KEY_OFFSET = 0;
+const uint32_t LEAF_NODE_VALUE_SIZE = ROW_SIZE;
+const uint32_t LEAF_NODE_VALUE_OFFSET =
+ LEAF_NODE_KEY_OFFSET + LEAF_NODE_KEY_SIZE;
+const uint32_t LEAF_NODE_CELL_SIZE = LEAF_NODE_KEY_SIZE + LEAF_NODE_VALUE_SIZE;
+const uint32_t LEAF_NODE_SPACE_FOR_CELLS = PAGE_SIZE - LEAF_NODE_HEADER_SIZE;
+const uint32_t LEAF_NODE_MAX_CELLS =
+ LEAF_NODE_SPACE_FOR_CELLS / LEAF_NODE_CELL_SIZE;
+
+uint32_t* leaf_node_num_cells(void* node) {
+ return node + LEAF_NODE_NUM_CELLS_OFFSET;
+}
+
+void* leaf_node_cell(void* node, uint32_t cell_num) {
+ return node + LEAF_NODE_HEADER_SIZE + cell_num * LEAF_NODE_CELL_SIZE;
+}
+
+uint32_t* leaf_node_key(void* node, uint32_t cell_num) {
+ return leaf_node_cell(node, cell_num);
+}
+
+void* leaf_node_value(void* node, uint32_t cell_num) {
+ return leaf_node_cell(node, cell_num) + LEAF_NODE_KEY_SIZE;
+}
+
+void print_constants() {
+ printf("ROW_SIZE: %d\n", ROW_SIZE);
+ printf("COMMON_NODE_HEADER_SIZE: %d\n", COMMON_NODE_HEADER_SIZE);
+ printf("LEAF_NODE_HEADER_SIZE: %d\n", LEAF_NODE_HEADER_SIZE);
+ printf("LEAF_NODE_CELL_SIZE: %d\n", LEAF_NODE_CELL_SIZE);
+ printf("LEAF_NODE_SPACE_FOR_CELLS: %d\n", LEAF_NODE_SPACE_FOR_CELLS);
+ printf("LEAF_NODE_MAX_CELLS: %d\n", LEAF_NODE_MAX_CELLS);
+}
+
+void print_leaf_node(void* node) {
+ uint32_t num_cells = *leaf_node_num_cells(node);
+ printf("leaf (size %d)\n", num_cells);
+ for (uint32_t i = 0; i < num_cells; i++) {
+ uint32_t key = *leaf_node_key(node, i);
+ printf(" - %d : %d\n", i, key);
+ }
+}
+
void print_row(Row* row) {
printf("(%d, %s, %s)\n", row->id, row->username, row->email);
}
@@ -101,6 +173,8 @@ void deserialize_row(void *source, Row* destination) {
memcpy(&(destination->email), source + EMAIL_OFFSET, EMAIL_SIZE);
}
+void initialize_leaf_node(void* node) { *leaf_node_num_cells(node) = 0; }
+
void* get_page(Pager* pager, uint32_t page_num) {
if (page_num > TABLE_MAX_PAGES) {
printf("Tried to fetch page number out of bounds. %d > %d\n", page_num,
@@ -128,6 +202,10 @@ void* get_page(Pager* pager, uint32_t page_num) {
}
pager->pages[page_num] = page;
+
+ if (page_num >= pager->num_pages) {
+ pager->num_pages = page_num + 1;
+ }
}
return pager->pages[page_num];
@@ -136,8 +214,12 @@ void* get_page(Pager* pager, uint32_t page_num) {
Cursor* table_start(Table* table) {
Cursor* cursor = malloc(sizeof(Cursor));
cursor->table = table;
- cursor->row_num = 0;
- cursor->end_of_table = (table->num_rows == 0);
+ cursor->page_num = table->root_page_num;
+ cursor->cell_num = 0;
+
+ void* root_node = get_page(table->pager, table->root_page_num);
+ uint32_t num_cells = *leaf_node_num_cells(root_node);
+ cursor->end_of_table = (num_cells == 0);
return cursor;
}
@@ -145,24 +227,28 @@ Cursor* table_start(Table* table) {
Cursor* table_end(Table* table) {
Cursor* cursor = malloc(sizeof(Cursor));
cursor->table = table;
- cursor->row_num = table->num_rows;
+ cursor->page_num = table->root_page_num;
+
+ void* root_node = get_page(table->pager, table->root_page_num);
+ uint32_t num_cells = *leaf_node_num_cells(root_node);
+ cursor->cell_num = num_cells;
cursor->end_of_table = true;
return cursor;
}
void* cursor_value(Cursor* cursor) {
- uint32_t row_num = cursor->row_num;
- uint32_t page_num = row_num / ROWS_PER_PAGE;
+ uint32_t page_num = cursor->page_num;
void* page = get_page(cursor->table->pager, page_num);
- uint32_t row_offset = row_num % ROWS_PER_PAGE;
- uint32_t byte_offset = row_offset * ROW_SIZE;
- return page + byte_offset;
+ return leaf_node_value(page, cursor->cell_num);
}
void cursor_advance(Cursor* cursor) {
- cursor->row_num += 1;
- if (cursor->row_num >= cursor->table->num_rows) {
+ uint32_t page_num = cursor->page_num;
+ void* node = get_page(cursor->table->pager, page_num);
+
+ cursor->cell_num += 1;
+ if (cursor->cell_num >= (*leaf_node_num_cells(node))) {
cursor->end_of_table = true;
}
}
@@ -185,6 +271,12 @@ Pager* pager_open(const char* filename) {
Pager* pager = malloc(sizeof(Pager));
pager->file_descriptor = fd;
pager->file_length = file_length;
+ pager->num_pages = (file_length / PAGE_SIZE);
+
+ if (file_length % PAGE_SIZE != 0) {
+ printf("Db file is not a whole number of pages. Corrupt file.\n");
+ exit(EXIT_FAILURE);
+ }
for (uint32_t i = 0; i < TABLE_MAX_PAGES; i++) {
pager->pages[i] = NULL;
@@ -194,11 +285,15 @@ Pager* pager_open(const char* filename) {
@@ -195,11 +287,16 @@ Pager* pager_open(const char* filename) {
Table* db_open(const char* filename) {
Pager* pager = pager_open(filename);
- uint32_t num_rows = pager->file_length / ROW_SIZE;
Table* table = malloc(sizeof(Table));
table->pager = pager;
- table->num_rows = num_rows;
+ table->root_page_num = 0;
+
+ if (pager->num_pages == 0) {
+ // New database file. Initialize page 0 as leaf node.
+ void* root_node = get_page(pager, 0);
+ initialize_leaf_node(root_node);
+ }
return table;
}
@@ -234,7 +331,7 @@ void close_input_buffer(InputBuffer* input_buffer) {
free(input_buffer);
}
-void pager_flush(Pager* pager, uint32_t page_num, uint32_t size) {
+void pager_flush(Pager* pager, uint32_t page_num) {
if (pager->pages[page_num] == NULL) {
printf("Tried to flush null page\n");
exit(EXIT_FAILURE);
@@ -242,7 +337,7 @@ void pager_flush(Pager* pager, uint32_t page_num, uint32_t size) {
@@ -249,7 +346,7 @@ void pager_flush(Pager* pager, uint32_t page_num, uint32_t size) {
}
ssize_t bytes_written =
- write(pager->file_descriptor, pager->pages[page_num], size);
+ write(pager->file_descriptor, pager->pages[page_num], PAGE_SIZE);
if (bytes_written == -1) {
printf("Error writing: %d\n", errno);
@@ -252,29 +347,16 @@ void pager_flush(Pager* pager, uint32_t page_num, uint32_t size) {
@@ -260,29 +357,16 @@ void pager_flush(Pager* pager, uint32_t page_num, uint32_t size) {
void db_close(Table* table) {
Pager* pager = table->pager;
- uint32_t num_full_pages = table->num_rows / ROWS_PER_PAGE;
- for (uint32_t i = 0; i < num_full_pages; i++) {
+ for (uint32_t i = 0; i < pager->num_pages; i++) {
if (pager->pages[i] == NULL) {
continue;
}
- pager_flush(pager, i, PAGE_SIZE);
+ pager_flush(pager, i);
free(pager->pages[i]);
pager->pages[i] = NULL;
}
- // There may be a partial page to write to the end of the file
- // This should not be needed after we switch to a B-tree
- uint32_t num_additional_rows = table->num_rows % ROWS_PER_PAGE;
- if (num_additional_rows > 0) {
- uint32_t page_num = num_full_pages;
- if (pager->pages[page_num] != NULL) {
- pager_flush(pager, page_num, num_additional_rows * ROW_SIZE);
- free(pager->pages[page_num]);
- pager->pages[page_num] = NULL;
- }
- }
-
int result = close(pager->file_descriptor);
if (result == -1) {
printf("Error closing db file.\n");
@@ -305,6 +389,14 @@ MetaCommandResult do_meta_command(InputBuffer* input_buffer, Table *table) {
if (strcmp(input_buffer->buffer, ".exit") == 0) {
db_close(table);
exit(EXIT_SUCCESS);
+ } else if (strcmp(input_buffer->buffer, ".btree") == 0) {
+ printf("Tree:\n");
+ print_leaf_node(get_page(table->pager, 0));
+ return META_COMMAND_SUCCESS;
+ } else if (strcmp(input_buffer->buffer, ".constants") == 0) {
+ printf("Constants:\n");
+ print_constants();
+ return META_COMMAND_SUCCESS;
} else {
return META_COMMAND_UNRECOGNIZED_COMMAND;
}
@@ -354,16 +446,39 @@ PrepareResult prepare_statement(InputBuffer* input_buffer,
return PREPARE_UNRECOGNIZED_STATEMENT;
}
+void leaf_node_insert(Cursor* cursor, uint32_t key, Row* value) {
+ void* node = get_page(cursor->table->pager, cursor->page_num);
+
+ uint32_t num_cells = *leaf_node_num_cells(node);
+ if (num_cells >= LEAF_NODE_MAX_CELLS) {
+ // Node full
+ printf("Need to implement splitting a leaf node.\n");
+ exit(EXIT_FAILURE);
+ }
+
+ if (cursor->cell_num < num_cells) {
+ // Make room for new cell
+ for (uint32_t i = num_cells; i > cursor->cell_num; i--) {
+ memcpy(leaf_node_cell(node, i), leaf_node_cell(node, i - 1),
+ LEAF_NODE_CELL_SIZE);
+ }
+ }
+
+ *(leaf_node_num_cells(node)) += 1;
+ *(leaf_node_key(node, cursor->cell_num)) = key;
+ serialize_row(value, leaf_node_value(node, cursor->cell_num));
+}
+
ExecuteResult execute_insert(Statement* statement, Table* table) {
- if (table->num_rows >= TABLE_MAX_ROWS) {
+ void* node = get_page(table->pager, table->root_page_num);
+ if ((*leaf_node_num_cells(node) >= LEAF_NODE_MAX_CELLS)) {
return EXECUTE_TABLE_FULL;
}
Row* row_to_insert = &(statement->row_to_insert);
Cursor* cursor = table_end(table);
- serialize_row(row_to_insert, cursor_value(cursor));
- table->num_rows += 1;
+ leaf_node_insert(cursor, row_to_insert->id, row_to_insert);
free(cursor);