您的位置:首页 > 数据库

python 访问hbase数据库代码,乱写

2015-08-12 18:17 232 查看
#! /usr/bin/env python
#coding=utf-8
import signal
import threading  
import sys
import time
import random
import string
import hashlib
#Hbase.thrift生成的py文件放在这里
#sys.path.append('/usr/local/lib/python2.7/dist-packages/hbase')
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from hbase import Hbase
#如ColumnDescriptor 等在hbase.ttypes中定义
from hbase.ttypes import *

class hbase_client:
    def __init__(self,ip,port):
        # Make socket
        #此处可以修改地址和端口
        self.transport = TSocket.TSocket(ip, port)
        # Buffering is critical. Raw sockets are very slow, very very very slow!!!
        # 还可以用TFramedTransport,也是高效传输方式
        self.transport = TTransport.TBufferedTransport(self.transport)
        # Wrap in a protocol
        #传输协议和传输过程是分离的,可以支持多协议
        self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
        #客户端代表一个用户
        self.client = Hbase.Client(self.protocol)
        #打开连接
        self.transport.open()
    def delete_table(self):
     #删除表
        tbs =  self.client.getTableNames()
        for tb in tbs:
            print tb
            if self.client.isTableEnabled(tb):
                self.client.disableTable(tb)
            self.client.deleteTable( tb )
    def create_table(self,table_name, column_families):
        #创建表
        i = 0
        contents = [ len(column_families) ]
        for cf in column_families:
            contents[i] = ColumnDescriptor(name=cf+':', maxVersions=1)
            i = i+1
        self.client.createTable( table_name, contents)
        #打印表名
        tb_users =  self.client.getTableNames()
        print
        print ("HBase 中存在的表有:")
        print( tb_users )
        print
    def generate_rowkey(self,uuid):
        myuuid = string.zfill(uuid, 11)
        m = hashlib.md5()
        m.update(myuuid)
        return m.hexdigest() + '_' + myuuid
    def test_add_user_data(self,table_name, user_num):
        #填充数据
        print ("开始填充 "+ str(user_num) +" 条数据")
        time_begin = time.time()
        for i in range(1, user_num+1):
            row_key= self.generate_rowkey( str(i) )
            mutations = [Mutation(column="info:uuid", value=str(i) ), Mutation(column="info:name", value="user_ada("+ str(i) +")" )]
            self.client.mutateRow(table_name, row_key, mutations, None)
        
        print ("填充 "+ str(user_num) +" 条数据完成, 耗时(s)"+str(time.time()-time_begin) )
        print
    def test_add_grp_data(self,table_name, num):
        #填充数据
        print ("开始填充 "+ str(num) +" 条数据")
        time_begin = time.time()
        for i in range(1, num+1):
            row_key= self.generate_rowkey( str(i) )
            mutations = [Mutation(column="info:uuid", value=str(i) ), Mutation(column="info:name", value="grp_name("+ str(i) +")" )]
            self.client.mutateRow(table_name, row_key, mutations, None)
        print ("填充 "+ str(user_num) +" 条数据完成, 耗时(s)"+str(time.time()-time_begin) )
        print
    def test_add_grp_members2(self,table_name, group_num, members_num):
        #填充数据
        print ("开始填充 "+ str(group_num) +"*"+str(members_num) +" 条数据")
        time_begin = time.time()
        for i in range(1,group_num+1):
            j = i
            #print i, j
            row_key= self.generate_rowkey( str(i) )
            mutations = []
            for j in range(i, members_num+i):
                #print i, j, num, members_num
                str_rand = string.zfill(random.randint(1, group_num* members_num), 11)
                mutations.extend( [Mutation(column="info:uuid", value=str_rand  ), Mutation(column="info:name", value="user_ada(" + str_rand +")" )] )
                #print mutations
                #print row_key, len(mutations), mutations
                self.client.mutateRow(table_name, row_key+"_"+ str_rand, mutations, None)
        print ("填充 "+ str(group_num*members_num) +" 条数据完成, 耗时(s)"+str(time.time()-time_begin) )
        print
    def test_get_a_random_data(self,table_name, user_num):
        #打印数据
        #print( "开始随机取出一条数据")
        row_key= self.generate_rowkey( str(random.randint(1, user_num) ) )
        time_begin = time.time()
        a_row_data = self.client.getRow(table_name, row_key,None)
       if len(a_row_data) > 0:
            cf_user_uuid =  a_row_data[0].columns.get('info:uuid')
            cf_user_name =  a_row_data[0].columns.get('info:name')
            #print(a_row_data[0].row, cf_user_uuid.value,  cf_user_uuid.timestamp, cf_user_name.value, cf_user_name.timestamp)
        #print( "取出"+ str( len(a_row_data) )+"条数据完成, 耗时(s)" + str(time.time()-time_begin) )
        print '.'
    def test_get_members_of_grp(self, group_num):
        #打印数据
#        print( "开始随机取出一个群组("+str(group_num)+")的成员信息")
        grp_rowkey_prefix = self.generate_rowkey( random.randint(1, group_num)) 
        time_begin = time.time()
        scanner_id = self.client.scannerOpenWithPrefix('tb_group_mem', grp_rowkey_prefix, ['info:uuid', 'info:name'], None)
        result = self.client.scannerGetList(scanner_id, 1000)
        self.client.scannerClose(scanner_id)
#        print len(result)
#        print result
#        for r in result:
#            print r.row, r.columns.get('info:uuid').value, r.columns.get('info:name').value, r.columns.get('info:name').timestamp
        #print( "取出 "+ str( len(result) )+" 个成员信息完成, 耗时(s)" + str(time.time()-time_begin) )
        print
    def close(self):
        #关闭连接
        self.transport.close()
class deal_data_thread(threading.Thread): #The deal_data_thread class is derived from the class threading.Thread  
    def __init__(self,name,group_num, user_num):  
        threading.Thread.__init__(self)  
        self.setName(name)
        self.thread_stop = False  
        self.client = hbase_client("127.0.0.1",9090)
        self.group_num = group_num
        self.user_num = user_num
    def run(self): #Overwrite run() method, put what you want the thread do here  
        global query_count
        while not self.thread_stop:  
            #print 'Thread Object(%s), Time:%s\n' %(self.getName(), time.ctime())  
            #self.client.test_get_members_of_grp(random.randint(1, self.group_num))
            self.client.test_get_a_random_data('tb_user', random.randint(1, self.user_num))
            query_count +=1
            time.sleep(0.0000001)  
        self.client.close()
    def stop(self):  
        print self.getName(),'stopped'
        self.thread_stop = True
def sig_handler(sig, thread):
    try:
        global g_running
        g_running = False
        print sig_handler.__name__, g_running
    except Exception, ex:
        exit(0)
   
def init(user_num, group_num):
    tb_user = 'tb_user'
    tb_group = 'tb_group'
    tb_group_mem = 'tb_group_mem'
    client = hbase_client("127.0.0.1",9090)
    client.delete_table()
    client.create_table(tb_user,['info'])
    client.create_table(tb_group,['info'])
    client.create_table(tb_group_mem,['info'])
    client.test_add_user_data(tb_user,user_num)
#    client.test_add_grp_data(tb_group, group_num )
#    client.test_add_grp_members2(tb_group_mem, group_num, user_num/group_num)
#    client.test_get_a_random_data(tb_user, user_num)
#    client.test_get_a_random_data(tb_group, group_num)
#    client.test_get_members_of_grp(group_num)
    #close
    client.close()

if __name__ == "__main__":
    #for thread
    ##set signal handler 
    signal.signal(signal.SIGTERM, sig_handler)
    signal.signal(signal.SIGINT, sig_handler)

    user_num = 100
    group_num = 10

    if len (sys.argv) > 1:
        user_num = string.atoi(sys.argv[1] )
    if len (sys.argv) > 2:
        group_num = string.atoi(sys.argv[2] )
    if len(sys.argv) > 3:
        init(user_num, group_num)
#    thread1.setDaemon(True)
#    thread2.setDaemon(True)
    global g_running
    g_running = True
    thread1 = deal_data_thread('thread1',group_num,user_num)
    thread2 = deal_data_thread('thread2',group_num,user_num)
    thread3 = deal_data_thread('thread3',group_num,user_num)
    thread4 = deal_data_thread('thread4',group_num,user_num)
    thread5 = deal_data_thread('thread5',group_num,user_num)

    thread1.start()  
    thread2.start()  
    thread3.start()  
    thread4.start()  
    thread5.start()  

    global query_count
    query_count = 0
    time_begin = time.time()
    while g_running:
        #print g_running
        time.sleep(1)  
    time_end = time.time()
    time_spent = time_end-time_begin
    print 'qps:%d, 持续时间:%d s' %(query_count/time_spent, time_spent)

    thread1.stop()  
    thread1.join()
    thread2.stop()  
    thread2.join()
    thread3.stop()  
    thread3.join()
    thread4.stop()  
    thread4.join()
    thread5.stop()  
    thread5.join()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: