您的位置:首页 > 数据库

打造先进的内存KV数据库-2 B树索引的建立(2)

2015-12-19 13:51 323 查看

索引的插入

接上篇文章,我们实现了B树的查找log2n的算法,然而在后来的单元测试中,我发现了bug,在此进行修正,修正后的查找函数:

//查找指定索引 如果找到返回找到的node和position,失败的话返回最近的node和position,
//并返回返回值相对于查询值是大了还是小了 true->小 false->大 优先返回大 供插入时插在前面
func (bt *B_Tree) Select(index []byte) (*node,int,uint64,error) {
if bt.root.hash_key_num == 0 {
return &bt.root,0,0,SelectError{false}
}
hash_key := hash(index)
node_ := &bt.root
pos_r := 0                                  //返回位置
var err SelectError                             //返回错误
var node_r *node
for node_ != nil && node_.hash_key_num > 0{

if node_.hash_key_num > 0 {
//超出节点边界
if node_.hash_key[0] > hash_key {
node_r = node_              //返回node,防止变成nil
pos_r = 0
err.less = false                    //返回大于
node_ = node_.child[0]
continue
}
if node_.hash_key[node_.hash_key_num - 1] < hash_key {
node_r = node_              //返回node,防止变成nil
pos_r = node_.hash_key_num
err.less = true                 //返回小于
node_ = node_.child[node_.hash_key_num]
continue
}
//进行二分查找
pos := node_.hash_key_num / 2
c := node_.hash_key_num / 2
for node_.hash_key_num > 0{
if node_.hash_key[pos] == hash_key {
return node_,pos,node_.primary_key[pos],nil //找到
} else if node_.hash_key[pos] < hash_key {
if node_.hash_key[pos + 1] > hash_key {
node_r = node_              //返回node,防止变成nil
pos_r = pos + 1
err.less = false                    //返回大于
node_ = node_.child[pos + 1]
break
}
c /= 2
if c == 0 { c=1 }
pos += c
} else if node_.hash_key[pos] > hash_key {

if node_.hash_key[pos - 1] < hash_key {
node_r = node_              //返回node,防止变成nil
pos_r = pos
err.less = true                 //返回小于
node_ = node_.child[pos]
break
}
c /= 2
if c == 0 { c=1 }
pos -= c
}
}
}
}
return node_r,pos_r,0,err
}


插入索引时,我们使用如下算法:

1.先通过select寻找应该插入的位置

2.如果被插入的节点中关键字树小于节点最多关键字数 - 1,则直接插入完成,否则到3

3.如果被插入节点关键字数等于节点最多关键字数 - 1,先插入该节点,再将该节点分裂成2个节点,将中间关键字插入他的父节点,如果此时父节点也满,则继续分裂父节点,直到根节点满,此索引树不允许其他任何插入;如果被插入的关键字数等于节点最多关键字数,则说明此树已满,不接受任何插入

实现如下:

// 设置值
func (bt *B_Tree) Set(text []byte,primary_key uint64) (error) {
node_,pos,_,err := bt.Select(text)  //查询
if err == nil {                         //找到了
//TODO 和储存引擎协调进行
} else {
//TODO 储存引擎加入记录
if node_.hash_key_num < node_data_num - 1 { //有空间可以插入
if err.(SelectError).less {         //插在最后一个:只有查询到最后一个还小的时候才会返回less
node_.hash_key[node_.hash_key_num] = hash(text)
node_.primary_key[node_.hash_key_num] = primary_key
node_.hash_key_num++
}else {                             //插在前面
for i := node_.hash_key_num;i > pos;i-- {
node_.hash_key[i] = node_.hash_key[i - 1]
node_.primary_key[i] = node_.primary_key[i - 1]
}
node_.hash_key[pos] = hash(text)
node_.primary_key[pos] = primary_key
node_.hash_key_num++
}
return nil
}else { //没有空间插入,进行分裂
//一定是最右儿子节点不存在的情况
if node_.child[node_data_num] != nil {
//TODO 严重逻辑错误 索引结构可能已损坏
panic(errors.New("严重逻辑错误,索引结构可能已损坏!"))
} else {
//放入节点
if err.(SelectError).less {         //插在最后一个:只有查询到最后一个还小的时候才会返回less
node_.hash_key[node_.hash_key_num] = hash(text)
node_.primary_key[node_.hash_key_num] = primary_key
node_.hash_key_num++
}else {                             //插在前面
for i := node_.hash_key_num;i > pos;i-- {
node_.hash_key[i] = node_.hash_key[i - 1]
node_.primary_key[i] = node_.primary_key[i - 1]
}
node_.hash_key[pos] = hash(text)
node_.primary_key[pos] = primary_key

node_.hash_key_num++
}
split:
//寻找父节点
node_temp_num := node_.hash_key_num
node_.hash_key_num = 0  //伪造空节点,让Select返回父节点
node_parent,pos_parent,_,err_parent := bt.Select(text)

//分裂出一个新节点
node_new := new(node)
mid := node_temp_num / 2
node_.hash_key_num = mid
j := 0
for i := mid + 1;i < node_temp_num;i++ {
node_new.hash_key[j] = node_.hash_key[i]
node_new.primary_key[j] = node_.primary_key[i]
j++
}
node_new.hash_key_num = j

//如果node_是根节点,分裂出的作为儿子节点
if node_ == &bt.root {

node_.hash_key_num++    //mid回归root
if node_.child[mid + 1] != nil {
panic(errors.New("Memory is full!"))
}
node_.child[mid + 1] = node_new
return nil
} else {        //不是根节点,则将mid插入父节点

if node_parent.hash_key_num < node_data_num - 1 {

if err_parent.(SelectError).less {
//一定是最后一个元素都小于hash(text)
node_parent.hash_key[node_parent.hash_key_num] = node_.hash_key[mid]
node_parent.primary_key[node_parent.hash_key_num] = node_.primary_key[mid]
node_parent.hash_key_num++

if node_parent.child[node_parent.hash_key_num] != nil {
panic(errors.New("Memory is full!"))
}
node_parent.child[node_parent.hash_key_num] = node_new

//TODO 父节点满了怎么办
if node_parent.hash_key_num > node_data_num - 1 {

node_ = node_parent
goto split
}

}else {

for i := node_parent.hash_key_num;i > pos_parent;i-- {
node_parent.hash_key[i] = node_parent.hash_key[i - 1]
node_parent.primary_key[i] = node_parent.primary_key[i - 1]
node_parent.child[i] = node_parent.child[i - 1]
}
node_parent.hash_key[pos_parent] = node_.hash_key[mid]
node_parent.primary_key[pos_parent] = node_.primary_key[mid]
if node_parent.hash_key[pos_parent + 1] > node_parent.hash_key[pos_parent + 2] {
panic(errors.New("Memory is full!"))
}
if node_parent.child[pos_parent] != node_{
panic(errors.New("Memory is full!"))
}
node_parent.child[pos_parent + 1] = node_new
node_parent.hash_key_num++
//TODO 父节点满了怎么办
if node_parent.hash_key_num > node_data_num - 1 {
node_ = node_parent
goto split
}
}
return nil
} else {
//父节点已满
if node_.child[mid] != nil {
panic(errors.New("Memory is full!"))
}
node_.child[mid] = node_new
return nil
}
}
}
}
}
return nil
}


单元测试如下,测试了大量插入和查询的数据:

package index
import (
"fmt"
"testing"
"os"
"time"
)

var num int

func expected(t *testing.T,expecting uint64,real uint64) {
if expecting != real {
t.Error("-Expected-")
t.Error(expecting)
t.Error("-Real-")
t.Error(real)
os.Exit(1)
num ++
}
}

func Test_hash(t *testing.T) {
expected(t,15108241,hash([]byte("我")))
expected(t,14990752,hash([]byte("你")))
expected(t,14990230,hash([]byte("他")))
expected(t,0,hash([]byte{}))
fmt.Println(hash([]byte("吴")))
}

func Test_Select(t *testing.T) {
bt := new(B_Tree)
bt.Init()
bt.root.hash_key[2] = 15108241
bt.root.hash_key[1] = 14990752
bt.root.hash_key[0] = 14990230
bt.root.hash_key_num = 3
bt.root.primary_key[2] = 2
bt.root.primary_key[1] = 1
bt.root.primary_key[0] = 0
bt.root.child[0] = new(node)
bt.root.child[0].Init()
bt.root.is_leaf = false
bt.root.child[0].hash_key_num++
bt.root.child[0].hash_key[0] = 65
bt.root.child[0].primary_key[0] = 65
bt.root.child[2] = new(node)
bt.root.child[2].Init()
bt.root.child[2].hash_key_num++
bt.root.child[2].hash_key[0] = 15044788
bt.root.child[2].primary_key[0] = 15044788
_,_,i,_ := bt.Select([]byte("我"))
expected(t,2,i)
_,_,i,_ = bt.Select([]byte("你"))
expected(t,1,i)
_,_,i,_ = bt.Select([]byte("他"))
expected(t,0,i)
_,_,i,_ = bt.Select([]byte("呵呵"))
expected(t,0,i)
_,_,i,_ = bt.Select([]byte("呼呼"))
expected(t,0,i)
_,_,i,_ = bt.Select([]byte("A"))
expected(t,65,i)
_,_,i,_ = bt.Select([]byte("吴"))
expected(t,15044788,i)
}

func Test_Set(t *testing.T) {
bt := new(B_Tree)
bt.Init()
var i uint64

bs := make([]byte,3)
start := time.Now().UnixNano()
for temp := 10000;temp < 15555000;temp++ {

bs[0] = byte(temp / 256 / 256)
bs[1] = byte((temp % (256 * 256)) / 256)
bs[2] = byte(temp % 256)
bt.Set(bs,uint64(temp))
//fmt.Println(i)
//expected(t,uint64(temp),i)
}
fmt.Println(time.Now().UnixNano() - start)
start = time.Now().UnixNano()/*
for temp := 10000;temp < 1555000;temp++ {

bs[0] = byte(temp / 256 / 256)
bs[1] = byte((temp % (256 * 256)) / 256)
bs[2] = byte(temp % 256)
//bt.Set(bs,uint64(temp))
_,_,i,_ = bt.Select(bs)
//if i == 0 {fmt.Println(temp,i)}
expected(t,uint64(temp),i)
}*/
for temp := 15500000-1;temp > 10000;temp-- {

bs[0] = byte(temp / 256 / 256)
bs[1] = byte((temp % (256 * 256)) / 256)
bs[2] = byte(temp % 256)
//bt.Set(bs,uint64(temp))
_,_,i,_ = bt.Select(bs)
//if i == 0 {fmt.Println(temp,i)}
//expected(t,uint64(temp),i)
}
fmt.Println(time.Now().UnixNano() - start)
fmt.Println(i)

}


经测试,我的弱渣笔记本低电压8G内存,i7-4712mq单核插入15M数据,每条数据时间大约为120ns,每条查询时间大约为100ns;15M以上数据可能导致树满,该索引树设计为16G数据,每条1k,约16M条数据设计。暂时测试得,节点大小为L2缓存的3/8时能获得较高性能,不过我是go test测试,可能会有所影响,完整编译后将再次优化参数。目前实现了10M/s单核的吞吐量,8核吞吐量大约估计在50~60M/s之间。Redis在15M环境下的吞吐量在3k/s左右,不过他还有储存引擎,日志记录,持久化等等的消耗,待完全完成此数据库后,将会与Redis PK一番。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: