打造先进的内存KV数据库-2 B树索引的建立(2)
2015-12-19 13:51
323 查看
索引的插入
接上篇文章,我们实现了B树的查找log2n的算法,然而在后来的单元测试中,我发现了bug,在此进行修正,修正后的查找函数://查找指定索引 如果找到返回找到的node和position,失败的话返回最近的node和position, //并返回返回值相对于查询值是大了还是小了 true->小 false->大 优先返回大 供插入时插在前面 func (bt *B_Tree) Select(index []byte) (*node,int,uint64,error) { if bt.root.hash_key_num == 0 { return &bt.root,0,0,SelectError{false} } hash_key := hash(index) node_ := &bt.root pos_r := 0 //返回位置 var err SelectError //返回错误 var node_r *node for node_ != nil && node_.hash_key_num > 0{ if node_.hash_key_num > 0 { //超出节点边界 if node_.hash_key[0] > hash_key { node_r = node_ //返回node,防止变成nil pos_r = 0 err.less = false //返回大于 node_ = node_.child[0] continue } if node_.hash_key[node_.hash_key_num - 1] < hash_key { node_r = node_ //返回node,防止变成nil pos_r = node_.hash_key_num err.less = true //返回小于 node_ = node_.child[node_.hash_key_num] continue } //进行二分查找 pos := node_.hash_key_num / 2 c := node_.hash_key_num / 2 for node_.hash_key_num > 0{ if node_.hash_key[pos] == hash_key { return node_,pos,node_.primary_key[pos],nil //找到 } else if node_.hash_key[pos] < hash_key { if node_.hash_key[pos + 1] > hash_key { node_r = node_ //返回node,防止变成nil pos_r = pos + 1 err.less = false //返回大于 node_ = node_.child[pos + 1] break } c /= 2 if c == 0 { c=1 } pos += c } else if node_.hash_key[pos] > hash_key { if node_.hash_key[pos - 1] < hash_key { node_r = node_ //返回node,防止变成nil pos_r = pos err.less = true //返回小于 node_ = node_.child[pos] break } c /= 2 if c == 0 { c=1 } pos -= c } } } } return node_r,pos_r,0,err }
插入索引时,我们使用如下算法:
1.先通过select寻找应该插入的位置
2.如果被插入的节点中关键字树小于节点最多关键字数 - 1,则直接插入完成,否则到3
3.如果被插入节点关键字数等于节点最多关键字数 - 1,先插入该节点,再将该节点分裂成2个节点,将中间关键字插入他的父节点,如果此时父节点也满,则继续分裂父节点,直到根节点满,此索引树不允许其他任何插入;如果被插入的关键字数等于节点最多关键字数,则说明此树已满,不接受任何插入
实现如下:
// 设置值 func (bt *B_Tree) Set(text []byte,primary_key uint64) (error) { node_,pos,_,err := bt.Select(text) //查询 if err == nil { //找到了 //TODO 和储存引擎协调进行 } else { //TODO 储存引擎加入记录 if node_.hash_key_num < node_data_num - 1 { //有空间可以插入 if err.(SelectError).less { //插在最后一个:只有查询到最后一个还小的时候才会返回less node_.hash_key[node_.hash_key_num] = hash(text) node_.primary_key[node_.hash_key_num] = primary_key node_.hash_key_num++ }else { //插在前面 for i := node_.hash_key_num;i > pos;i-- { node_.hash_key[i] = node_.hash_key[i - 1] node_.primary_key[i] = node_.primary_key[i - 1] } node_.hash_key[pos] = hash(text) node_.primary_key[pos] = primary_key node_.hash_key_num++ } return nil }else { //没有空间插入,进行分裂 //一定是最右儿子节点不存在的情况 if node_.child[node_data_num] != nil { //TODO 严重逻辑错误 索引结构可能已损坏 panic(errors.New("严重逻辑错误,索引结构可能已损坏!")) } else { //放入节点 if err.(SelectError).less { //插在最后一个:只有查询到最后一个还小的时候才会返回less node_.hash_key[node_.hash_key_num] = hash(text) node_.primary_key[node_.hash_key_num] = primary_key node_.hash_key_num++ }else { //插在前面 for i := node_.hash_key_num;i > pos;i-- { node_.hash_key[i] = node_.hash_key[i - 1] node_.primary_key[i] = node_.primary_key[i - 1] } node_.hash_key[pos] = hash(text) node_.primary_key[pos] = primary_key node_.hash_key_num++ } split: //寻找父节点 node_temp_num := node_.hash_key_num node_.hash_key_num = 0 //伪造空节点,让Select返回父节点 node_parent,pos_parent,_,err_parent := bt.Select(text) //分裂出一个新节点 node_new := new(node) mid := node_temp_num / 2 node_.hash_key_num = mid j := 0 for i := mid + 1;i < node_temp_num;i++ { node_new.hash_key[j] = node_.hash_key[i] node_new.primary_key[j] = node_.primary_key[i] j++ } node_new.hash_key_num = j //如果node_是根节点,分裂出的作为儿子节点 if node_ == &bt.root { node_.hash_key_num++ //mid回归root if node_.child[mid + 1] != nil { panic(errors.New("Memory is full!")) } node_.child[mid + 1] = node_new return nil } else { //不是根节点,则将mid插入父节点 if node_parent.hash_key_num < node_data_num - 1 { if err_parent.(SelectError).less { //一定是最后一个元素都小于hash(text) node_parent.hash_key[node_parent.hash_key_num] = node_.hash_key[mid] node_parent.primary_key[node_parent.hash_key_num] = node_.primary_key[mid] node_parent.hash_key_num++ if node_parent.child[node_parent.hash_key_num] != nil { panic(errors.New("Memory is full!")) } node_parent.child[node_parent.hash_key_num] = node_new //TODO 父节点满了怎么办 if node_parent.hash_key_num > node_data_num - 1 { node_ = node_parent goto split } }else { for i := node_parent.hash_key_num;i > pos_parent;i-- { node_parent.hash_key[i] = node_parent.hash_key[i - 1] node_parent.primary_key[i] = node_parent.primary_key[i - 1] node_parent.child[i] = node_parent.child[i - 1] } node_parent.hash_key[pos_parent] = node_.hash_key[mid] node_parent.primary_key[pos_parent] = node_.primary_key[mid] if node_parent.hash_key[pos_parent + 1] > node_parent.hash_key[pos_parent + 2] { panic(errors.New("Memory is full!")) } if node_parent.child[pos_parent] != node_{ panic(errors.New("Memory is full!")) } node_parent.child[pos_parent + 1] = node_new node_parent.hash_key_num++ //TODO 父节点满了怎么办 if node_parent.hash_key_num > node_data_num - 1 { node_ = node_parent goto split } } return nil } else { //父节点已满 if node_.child[mid] != nil { panic(errors.New("Memory is full!")) } node_.child[mid] = node_new return nil } } } } } return nil }
单元测试如下,测试了大量插入和查询的数据:
package index import ( "fmt" "testing" "os" "time" ) var num int func expected(t *testing.T,expecting uint64,real uint64) { if expecting != real { t.Error("-Expected-") t.Error(expecting) t.Error("-Real-") t.Error(real) os.Exit(1) num ++ } } func Test_hash(t *testing.T) { expected(t,15108241,hash([]byte("我"))) expected(t,14990752,hash([]byte("你"))) expected(t,14990230,hash([]byte("他"))) expected(t,0,hash([]byte{})) fmt.Println(hash([]byte("吴"))) } func Test_Select(t *testing.T) { bt := new(B_Tree) bt.Init() bt.root.hash_key[2] = 15108241 bt.root.hash_key[1] = 14990752 bt.root.hash_key[0] = 14990230 bt.root.hash_key_num = 3 bt.root.primary_key[2] = 2 bt.root.primary_key[1] = 1 bt.root.primary_key[0] = 0 bt.root.child[0] = new(node) bt.root.child[0].Init() bt.root.is_leaf = false bt.root.child[0].hash_key_num++ bt.root.child[0].hash_key[0] = 65 bt.root.child[0].primary_key[0] = 65 bt.root.child[2] = new(node) bt.root.child[2].Init() bt.root.child[2].hash_key_num++ bt.root.child[2].hash_key[0] = 15044788 bt.root.child[2].primary_key[0] = 15044788 _,_,i,_ := bt.Select([]byte("我")) expected(t,2,i) _,_,i,_ = bt.Select([]byte("你")) expected(t,1,i) _,_,i,_ = bt.Select([]byte("他")) expected(t,0,i) _,_,i,_ = bt.Select([]byte("呵呵")) expected(t,0,i) _,_,i,_ = bt.Select([]byte("呼呼")) expected(t,0,i) _,_,i,_ = bt.Select([]byte("A")) expected(t,65,i) _,_,i,_ = bt.Select([]byte("吴")) expected(t,15044788,i) } func Test_Set(t *testing.T) { bt := new(B_Tree) bt.Init() var i uint64 bs := make([]byte,3) start := time.Now().UnixNano() for temp := 10000;temp < 15555000;temp++ { bs[0] = byte(temp / 256 / 256) bs[1] = byte((temp % (256 * 256)) / 256) bs[2] = byte(temp % 256) bt.Set(bs,uint64(temp)) //fmt.Println(i) //expected(t,uint64(temp),i) } fmt.Println(time.Now().UnixNano() - start) start = time.Now().UnixNano()/* for temp := 10000;temp < 1555000;temp++ { bs[0] = byte(temp / 256 / 256) bs[1] = byte((temp % (256 * 256)) / 256) bs[2] = byte(temp % 256) //bt.Set(bs,uint64(temp)) _,_,i,_ = bt.Select(bs) //if i == 0 {fmt.Println(temp,i)} expected(t,uint64(temp),i) }*/ for temp := 15500000-1;temp > 10000;temp-- { bs[0] = byte(temp / 256 / 256) bs[1] = byte((temp % (256 * 256)) / 256) bs[2] = byte(temp % 256) //bt.Set(bs,uint64(temp)) _,_,i,_ = bt.Select(bs) //if i == 0 {fmt.Println(temp,i)} //expected(t,uint64(temp),i) } fmt.Println(time.Now().UnixNano() - start) fmt.Println(i) }
经测试,我的弱渣笔记本低电压8G内存,i7-4712mq单核插入15M数据,每条数据时间大约为120ns,每条查询时间大约为100ns;15M以上数据可能导致树满,该索引树设计为16G数据,每条1k,约16M条数据设计。暂时测试得,节点大小为L2缓存的3/8时能获得较高性能,不过我是go test测试,可能会有所影响,完整编译后将再次优化参数。目前实现了10M/s单核的吞吐量,8核吞吐量大约估计在50~60M/s之间。Redis在15M环境下的吞吐量在3k/s左右,不过他还有储存引擎,日志记录,持久化等等的消耗,待完全完成此数据库后,将会与Redis PK一番。
相关文章推荐
- 关于NoSQL与SQL的区别
- SQL2008R2的 遍历所有表更新统计信息和索引重建
- Redis深入学习(1)前言&Redis简介
- MyBatis使用动态SQL批量插入数据
- MySql安装
- mysql一个数据库允许存放多少个表?数据库里表多少个才合理?
- 查看mysql当前连接数
- SQL Server 游标运用:游标模板
- SQL Server 游标运用:批量创建、删除链接服务器
- 链接mysql数据库,输出数据
- service mysql start出错,mysql启动不了,解决mysql: unrecognized service错误
- 查询 + 添加 + 修改 一条sql 搞定~
- 数据库访问
- linux下redis的安装及启动
- 带有EXISTS谓词的子查询
- MySQL的timeout超时你遇到过几种情况
- 使用 Node.js、Express、AngularJS 和 MongoDB 构建一个Web程序
- AIX5.3中将Oracle SGA PIN在内存中的步骤
- mysql5.7修改root密码
- 学生-课程数据库