python 访问hbase数据库代码,乱写
2015-08-12 18:17
232 查看
#! /usr/bin/env python #coding=utf-8 import signal import threading import sys import time import random import string import hashlib #Hbase.thrift生成的py文件放在这里 #sys.path.append('/usr/local/lib/python2.7/dist-packages/hbase') from thrift import Thrift from thrift.transport import TSocket from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol from hbase import Hbase #如ColumnDescriptor 等在hbase.ttypes中定义 from hbase.ttypes import * class hbase_client: def __init__(self,ip,port): # Make socket #此处可以修改地址和端口 self.transport = TSocket.TSocket(ip, port) # Buffering is critical. Raw sockets are very slow, very very very slow!!! # 还可以用TFramedTransport,也是高效传输方式 self.transport = TTransport.TBufferedTransport(self.transport) # Wrap in a protocol #传输协议和传输过程是分离的,可以支持多协议 self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport) #客户端代表一个用户 self.client = Hbase.Client(self.protocol) #打开连接 self.transport.open() def delete_table(self): #删除表 tbs = self.client.getTableNames() for tb in tbs: print tb if self.client.isTableEnabled(tb): self.client.disableTable(tb) self.client.deleteTable( tb ) def create_table(self,table_name, column_families): #创建表 i = 0 contents = [ len(column_families) ] for cf in column_families: contents[i] = ColumnDescriptor(name=cf+':', maxVersions=1) i = i+1 self.client.createTable( table_name, contents) #打印表名 tb_users = self.client.getTableNames() print print ("HBase 中存在的表有:") print( tb_users ) print def generate_rowkey(self,uuid): myuuid = string.zfill(uuid, 11) m = hashlib.md5() m.update(myuuid) return m.hexdigest() + '_' + myuuid def test_add_user_data(self,table_name, user_num): #填充数据 print ("开始填充 "+ str(user_num) +" 条数据") time_begin = time.time() for i in range(1, user_num+1): row_key= self.generate_rowkey( str(i) ) mutations = [Mutation(column="info:uuid", value=str(i) ), Mutation(column="info:name", value="user_ada("+ str(i) +")" )] self.client.mutateRow(table_name, row_key, mutations, None) print ("填充 "+ str(user_num) +" 条数据完成, 耗时(s)"+str(time.time()-time_begin) ) print def test_add_grp_data(self,table_name, num): #填充数据 print ("开始填充 "+ str(num) +" 条数据") time_begin = time.time() for i in range(1, num+1): row_key= self.generate_rowkey( str(i) ) mutations = [Mutation(column="info:uuid", value=str(i) ), Mutation(column="info:name", value="grp_name("+ str(i) +")" )] self.client.mutateRow(table_name, row_key, mutations, None) print ("填充 "+ str(user_num) +" 条数据完成, 耗时(s)"+str(time.time()-time_begin) ) print def test_add_grp_members2(self,table_name, group_num, members_num): #填充数据 print ("开始填充 "+ str(group_num) +"*"+str(members_num) +" 条数据") time_begin = time.time() for i in range(1,group_num+1): j = i #print i, j row_key= self.generate_rowkey( str(i) ) mutations = [] for j in range(i, members_num+i): #print i, j, num, members_num str_rand = string.zfill(random.randint(1, group_num* members_num), 11) mutations.extend( [Mutation(column="info:uuid", value=str_rand ), Mutation(column="info:name", value="user_ada(" + str_rand +")" )] ) #print mutations #print row_key, len(mutations), mutations self.client.mutateRow(table_name, row_key+"_"+ str_rand, mutations, None) print ("填充 "+ str(group_num*members_num) +" 条数据完成, 耗时(s)"+str(time.time()-time_begin) ) print def test_get_a_random_data(self,table_name, user_num): #打印数据 #print( "开始随机取出一条数据") row_key= self.generate_rowkey( str(random.randint(1, user_num) ) ) time_begin = time.time() a_row_data = self.client.getRow(table_name, row_key,None) if len(a_row_data) > 0: cf_user_uuid = a_row_data[0].columns.get('info:uuid') cf_user_name = a_row_data[0].columns.get('info:name') #print(a_row_data[0].row, cf_user_uuid.value, cf_user_uuid.timestamp, cf_user_name.value, cf_user_name.timestamp) #print( "取出"+ str( len(a_row_data) )+"条数据完成, 耗时(s)" + str(time.time()-time_begin) ) print '.' def test_get_members_of_grp(self, group_num): #打印数据 # print( "开始随机取出一个群组("+str(group_num)+")的成员信息") grp_rowkey_prefix = self.generate_rowkey( random.randint(1, group_num)) time_begin = time.time() scanner_id = self.client.scannerOpenWithPrefix('tb_group_mem', grp_rowkey_prefix, ['info:uuid', 'info:name'], None) result = self.client.scannerGetList(scanner_id, 1000) self.client.scannerClose(scanner_id) # print len(result) # print result # for r in result: # print r.row, r.columns.get('info:uuid').value, r.columns.get('info:name').value, r.columns.get('info:name').timestamp #print( "取出 "+ str( len(result) )+" 个成员信息完成, 耗时(s)" + str(time.time()-time_begin) ) print def close(self): #关闭连接 self.transport.close()
class deal_data_thread(threading.Thread): #The deal_data_thread class is derived from the class threading.Thread def __init__(self,name,group_num, user_num): threading.Thread.__init__(self) self.setName(name) self.thread_stop = False self.client = hbase_client("127.0.0.1",9090) self.group_num = group_num self.user_num = user_num def run(self): #Overwrite run() method, put what you want the thread do here global query_count while not self.thread_stop: #print 'Thread Object(%s), Time:%s\n' %(self.getName(), time.ctime()) #self.client.test_get_members_of_grp(random.randint(1, self.group_num)) self.client.test_get_a_random_data('tb_user', random.randint(1, self.user_num)) query_count +=1 time.sleep(0.0000001) self.client.close() def stop(self): print self.getName(),'stopped' self.thread_stop = True
def sig_handler(sig, thread): try: global g_running g_running = False print sig_handler.__name__, g_running except Exception, ex: exit(0) def init(user_num, group_num): tb_user = 'tb_user' tb_group = 'tb_group' tb_group_mem = 'tb_group_mem' client = hbase_client("127.0.0.1",9090) client.delete_table() client.create_table(tb_user,['info']) client.create_table(tb_group,['info']) client.create_table(tb_group_mem,['info']) client.test_add_user_data(tb_user,user_num) # client.test_add_grp_data(tb_group, group_num ) # client.test_add_grp_members2(tb_group_mem, group_num, user_num/group_num) # client.test_get_a_random_data(tb_user, user_num) # client.test_get_a_random_data(tb_group, group_num) # client.test_get_members_of_grp(group_num) #close client.close() if __name__ == "__main__": #for thread ##set signal handler signal.signal(signal.SIGTERM, sig_handler) signal.signal(signal.SIGINT, sig_handler) user_num = 100 group_num = 10 if len (sys.argv) > 1: user_num = string.atoi(sys.argv[1] ) if len (sys.argv) > 2: group_num = string.atoi(sys.argv[2] ) if len(sys.argv) > 3: init(user_num, group_num) # thread1.setDaemon(True) # thread2.setDaemon(True) global g_running g_running = True thread1 = deal_data_thread('thread1',group_num,user_num) thread2 = deal_data_thread('thread2',group_num,user_num) thread3 = deal_data_thread('thread3',group_num,user_num) thread4 = deal_data_thread('thread4',group_num,user_num) thread5 = deal_data_thread('thread5',group_num,user_num) thread1.start() thread2.start() thread3.start() thread4.start() thread5.start() global query_count query_count = 0 time_begin = time.time() while g_running: #print g_running time.sleep(1) time_end = time.time() time_spent = time_end-time_begin print 'qps:%d, 持续时间:%d s' %(query_count/time_spent, time_spent) thread1.stop() thread1.join() thread2.stop() thread2.join() thread3.stop() thread3.join() thread4.stop() thread4.join() thread5.stop() thread5.join()
相关文章推荐
- 创建mysql用户以及权限设置
- 修改 mysql root 用户的密码
- ubuntu10.04 安装oracle server 版 笔记
- JDBC连接SQL Server数据库
- WebX5 sqllite 增删改查
- Lock wait timeout exceeded数据库死锁问题
- 【工具设置】无人值守安装SQL Server 2008 R2
- mysql 分组总和求最大值 的两种方式
- MySQL____存储过程学习笔记
- 细小知识点之MySQL(二)
- mysql 查询使用*和具体字段的问题
- SQL SERVER简单的增,删,查,改 语句 多表关联查询 (去掉关键列查询)
- Redis集群的安装测试(伪分布模式 - 主从复制)
- 安装MySQL
- mysql 数据查询出排序中的最大值和最小值
- sqlserver 存储过程 try catch TRANSACTION (转)
- Linux中使用xmanager安装oracle数据库
- MSSQL表分区的创建, 横向 纵向 多维度,多指标 分表
- oracle官网下载老版本jdk + 如何命令行下wget下载jdk
- 简单地把Excel导入mysql