游戏服务器之角色数据管理工具(python版)
2014-02-07 20:18
387 查看
之所以要做个工具,是方便非程序人员也可以容易操作和管理数据库,进而可以管理游戏程序。
使用python作为工具是python容易写图形工具,而原来的项目也用到python作为逻辑脚本。
1、python 的一个第三方库wx图形支持
2、mysql角色数据管理
3、redis角色数据管理
1、python 的一个第三方库wx图形支持
python 的一个第三方库wx可以提供一些图形支持,支持事件循环和响应。
if __name__ == "__main__":
import sys
reload(sys)
sys.setdefaultencoding('utf8')#设置编码
#app = wx.PySimpleApp(True, 'log.txt')
app = wx.PySimpleApp()#创建一个应用程序,wx.PySimpleApp
frame = MainFrame(None)#MainFrame(wx.Frame)
gSetMainframe(frame)
frame.Show()
app.MainLoop()#开始图形事件循环
管理工具以一个标签页来作为工具的一部分而存在。
检查页的ui是一个wx.Panel : class VerifyDataUI(wx.Panel)
功能是查询、删除redis和mysql的角色数据。
2、mysql角色数据管理
(1)mysql数据访问
首先需要说明的是mysql的查询用到orm的技术(对象关系映射,使用到的第三方库是sqlalchemy),而角色表的orm内容如下:
含有角色id、角色背包(二进制)、角色商城背包(二进制)、角色仓库(二进制)、角色技能(二进制)。
#导入sqlalchemy 的类和类型
from sqlalchemy import Table, Column
from sqlalchemy.dialects.mysql import TINYINT, SMALLINT, INTEGER, BIGINT, \
TEXT, CHAR, VARCHAR, FLOAT, DATETIME, MEDIUMTEXT, BLOB
from sqlalchemy.orm import Mapper
from core.db.connect import metadata, TableObject
#类Tjuese_shuju映射的是mysql里的表t_juese_shuju。
#映射的方式是通过sqlalchemy.orm 的类 Mapper
tab_t_juese_wupin = Table("t_juese_shuju", metadata,
Column("jue_se", INTEGER, primary_key = True), # 角色id
Column("b_beibao", BLOB), # 角色背包
Column("b_cangku", BLOB), # 角色商城背包
Column("b_shangcheng_beibao", BLOB), # 角色仓库
Column("b_jineng", BLOB) # 角色技能
)
class Tjuese_shuju(TableObject):
COL_NAMES = ['jue_se', 'b_beibao', 'b_cangku', 'b_shangcheng_beibao',
'b_jineng']
TABLE_NAME = 't_juese_shuju'
ROW_INDEX = ['jue_se']
def __init__(self):
TableObject.__init__(self)
def get_name(self):
return Tjuese_shuju.TABLE_NAME
def get_col_names(self):
return Tjuese_shuju.COL_NAMES
def get_key_names(self):
return Tjuese_shuju.ROW_INDEX
def session_obj_copy(self,obj):#拷贝orm对象
self.__dict__ = dict(obj.__dict__,**self.__dict__)#保留自身的状态成员_sa_instance_state
mapper_t_juese_wupin = Mapper(Tjuese_shuju, tab_t_juese_wupin)
mysql的连接会话。会话的创建是使用sqlalchemy的接口。
@staticmethod
def make_session_class(ip,user,pw,port,database):
try:
print "ip,port,database",ip,port,database
if False == isinstance(port,int):
print "port must be int type"
return None
_connect_str = "mysql://%s:%s@%s:%d/%s?charset=utf8" % (user,pw,ip,port,database)
_engine = create_engine(_connect_str, echo=False)
print _engine.connect()
Session = sessionmaker(bind=_engine, autoflush=False, autocommit=True)
print "Session",Session
return Session
except:
traceback.print_exc()
return None
(2)mysql角色数据管理工具
获取mysql角色数据事件响应函数:
def OnVerifyMysqlOneJuesePackage(self,evt):
juese_id_str = self.ctl_mysql_juese_id.GetValue()
juese_id_str = juese_id_str.replace(" ", "")
juese_name_str = ""
if "" == juese_id_str:
juese_name_str = self.ctl_mysql_juese_name.GetValue()
juese_name_str = juese_name_str.encode("utf8")
juese_name_str = juese_name_str.replace(" ","")
if "" == juese_id_str and "" == juese_name_str:
title = u"请输入关键字(角色id或角色名)"
self.show_warnning(title)
return
juese_id = -1
if "" != juese_id_str:
try:
juese_id = int(juese_id_str)
if 0 >= juese_id:
title = u"角色id需要是正数"
self.show_warnning(title)
return
except:
title = u"角色id需要是数字"
self.show_warnning(title)
return
try:
ret = Consistency.check_one_juese_beibao(juese_id,juese_name_str)
if -1 == ret:
title = u"该角色数据为空"
self.show_warnning(title)
elif -2 == ret:
title = u"校验出错"
self.show_warnning(title)
elif -3 == ret:
title = u"数据库为连接或连接出错"
self.show_warnning(title)
else:
title = u"校验成功,请查看juese_beibao_check文件"
self.show_warnning(title)
except:
traceback.print_exc()
title = u"检查角色背包出错"
self.show_warnning(title)
通过会话把检查的角色数据写到文件里
检查所有角色数据,获取所有角色的背包数据,并以json的格式写入到文件中。
@staticmethod
def check_juese_beibao():
global ConsistencySession
if None == ConsistencySession:
return -3
session = ConsistencySession()
try:
juese_shuju_list = session.query(Tjuese_shuju).all()
if 0 == len(juese_shuju_list):
print "juese_shuju_list empty"
return -1
print "juese_shuju_list",juese_shuju_list
for one_juese_shuju in juese_shuju_list:
#检验一个角色背包
b_one_juese_beibao = one_juese_shuju.b_beibao
juese_id = one_juese_shuju.jue_se
one_juese_beibao_list = xjson.loads(b_one_juese_beibao)
one_juese_beibao = JueseBeibao(one_juese_beibao_list)
search_result = True
for one_juese_beibao_item in one_juese_beibao.juese_beibao:
if 0 < one_juese_beibao_item.wupin_guid and 0 < one_juese_beibao_item.flag:
one_t_wupin = session.query(Twupin).filter(Twupin.guid \
== one_juese_beibao_item.wupin_guid).first()
print "one_juese_beibao_item.wupin_guid",one_juese_beibao_item.wupin_guid
print "one_t_wupin",one_t_wupin
# time.sleep(0.01)
if None == one_t_wupin:
print "juese(%d) wupin guid (%d) don't exist" % \
(juese_id,one_juese_beibao_item.wupin_guid)
File.write_juese_beibao(one_juese_beibao_item,one_juese_shuju.jue_se)
search_result = False
if True == search_result:
File.write_juese_beibao_result("juese(%d) check juese_beibao consistency successfully" % (juese_id))
else:
File.write_juese_beibao_result("juese(%d) check juese_beibao consistency failed" % (juese_id))
except:
traceback.print_exc()
return -2
finally:
session.close()
return 0
根据指定的角色的id或名字来查询该角色的背包数据,并写入文件:
@staticmethod
def check_one_juese_beibao(juese_id,juese_name):
global ConsistencySession
if None == ConsistencySession:
return -3
session = ConsistencySession()
try:
one_juese_shuju = None
if 0 < juese_id:
one_juese_shuju = session.query(Tjuese_shuju).filter(
Tjuese_shuju.jue_se == juese_id).first()
if None == one_juese_shuju:
return -1
else:
one_juese = session.query(Tjuese).filter(Tjuese.str_name == juese_name).first()
if None == one_juese:
return -1
juese_id = one_juese.guid
one_juese_shuju = session.query(Tjuese_shuju).filter(
Tjuese_shuju.jue_se == one_juese.guid).first()
if None == one_juese_shuju:
return -1
#检验一个角色背包
b_one_juese_beibao = one_juese_shuju.b_beibao
one_juese_beibao_list = xjson.loads(b_one_juese_beibao)
one_juese_beibao = JueseBeibao(one_juese_beibao_list)
search_result = True
for one_juese_beibao_item in one_juese_beibao.juese_beibao:
if 0 < one_juese_beibao_item.wupin_guid and 0 < one_juese_beibao_item.flag:
one_t_wupin = session.query(Twupin).filter(Twupin.guid \
== one_juese_beibao_item.wupin_guid).first()
print "one_juese_beibao_item.wupin_guid",one_juese_beibao_item.wupin_guid
print "one_t_wupin",one_t_wupin
if None == one_t_wupin:
print "juese(%d) wupin guid (%d) don't exist" % \
(juese_id,one_juese_beibao_item.wupin_guid)
File.write_juese_beibao(one_juese_beibao_item,one_juese_shuju.jue_se)
search_result = False
if True == search_result:
File.write_juese_beibao_result("juese(%d) check juese_beibao consistency successfully" % (juese_id))
else:
File.write_juese_beibao_result("juese(%d) check juese_beibao juese(%d) consistency failed" % (juese_id))
except:
traceback.print_exc()
return -2
finally:
session.close()
return 0
检查redis角色数据事件响应函数:
def OnCheckRedisJuese(self,evt):
value = self.redis_juese_id_ctl.GetValue()
value = value.replace(" ", "")
try:
ret = RedisCommandExecute.redis_juese_check(value)
if -1 == ret:
title = u"没有该键,查询失败"
self.show_warnning(title)
elif -2 == ret:
title = u"查询出错"
self.show_warnning(title)
elif -3 == ret:
title = u"服务器未启动"
self.show_warnning(title)
else:
title = u"查询成功,请查看redis文件"
self.show_warnning(title)
except:
traceback.print_exc()
title = u"查询结果有误"
self.show_warnning(title)
3、redis角色数据管理
redis的角色数据是以哈希表的格式来存储的(t_juese:1:该角色所有数据)
该角色所有数据的格式是json格式。python对象支持把对象数据序列化成json格式。
获取的某角色的格式如hgetall(“t_juese:1”)
获取redis角色数据并以json格式写入到文件:
@staticmethod
def redis_juese_check(command):
result = 0
try:
command = command.replace(" ","")
if None == command or "" == command:
print "command empty"
return -2
rc = RedisCommand()
if False == rc.ping():#检查redis服务器是否响应
return -3
command_list = command.split(",")#一系列角色id是以,逗号分隔的字符串,一次查询多个角色数据
ret = None
if 0 == len(command_list):
print "command empty or command error"
return -2
elif 1 == len(command_list):
ret = rc.get_by_juese_id(command_list[0])#执行redis里的角色的查询(若角色id是1:最终的查询是 client.hgetall(“t_juese:1”),类型是hash )
elif 1 < len(command_list):
ret = rc.get_by_juese_id_list(command_list)#使用管道(pipeline),多个命令一次执行
print "result",result
if None == ret:
result = -1
return result
elif isinstance(ret,list) and 0 == len(ret):
result = -1
return result
print "ret\n",ret
print type(ret)
import json
# redis_data = json.loads(ret)
# print redis_data
redis_data = json.dumps(ret, sort_keys=True,indent=4)
print redis_data
File.write_redis_juese_info(redis_data)
except:
traceback.print_exc()
result = -2
return result
连接redis的实例
self.client = redis.Redis(host=RedisConfig.host, port=RedisConfig.port,
db=RedisConfig.db)
配置如下:
[node]
host = 127.0.0.1#redis的地址
db = 0#redis的使用的库
port = 6379#redis的端口
redis的操作命令封装如下:
#coding=utf-8
import ConfigParser
import redis
import traceback
from core.file import File
完整的redis命令执行文件如下:
#配置文件
_CONFIG_FILE = "config\cache.ini"
class RedisConfig(object):
cf = ConfigParser.ConfigParser()
cf.read(_CONFIG_FILE)
host = cf.get("node", "host")
port = cf.getint("node", "port")
db = cf.getint("node", "db")
@staticmethod
def set_config(host,port,db):
if False == isinstance(port,int):
print "port type wrong ",port
return false
if False == isinstance(db,int):
print "db type wrong ",db
return false
RedisConfig.host = host
RedisConfig.port = port
RedisConfig.db = db
cp = ConfigParser.ConfigParser()
cp.read(_CONFIG_FILE)
cp.set("node", "host",host)
cp.set("node", "port",port)
cp.set("node", "db",db)
cp.write(open(_CONFIG_FILE,"w"))
return True
@staticmethod
def ping(address,port):
import socket
try:
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.settimeout(1)
sk.connect((address,port))
sk.close()
print 'ping ok'
return True
except Exception:
print 'ping error'
return False
class RedisCommandImp(object):
def __init__(self):
self.client = redis.Redis(host=RedisConfig.host, port=RedisConfig.port,
db=RedisConfig.db)
def ping(self): #测试连接,连通返回 True ,不通返回False
try:
return self.client.ping()
except:
return False
def tearDown(self):
return self.client.flushdb()
def execute(self):
print "execute(),to fulfill by subclass"
def clear_value(self,client,key,field = None):
try:
type = self.get_type(key)
if "none" == type:
print "key don't exist"
return False
elif "hash" == type:
if None == field:#域为空,删除键的所有域
resutl_dict = client.hgetall(key)
field_list = resutl_dict.keys()
for one_field in field_list:
if client.hexists(key,one_field):
client.hdel(key, one_field)
return True
if isinstance(field,list) or isinstance(field,set) :#域为列表或集
for one_field in field:
if client.hexists(key,one_field):
client.hdel(key, one_field)
return True
else:
if client.hexists(key,field):
client.hdel(key, field)
return True
else:
return False
elif "string" == type:
print "redis don't has such command"
return False
elif "list" == type:
return client.lpop(key)#或根据策略改成client.rpop(key) 或如client.lrem('a', 0, 'a1')
elif "set" == type:
return client.srem(key, field)
elif "zset" == type:
return client.zrem(key, field)
print "none of this type"
return False
except redis.ResponseError:
traceback.print_exc()
return False
def get_type(self,key):
return self.client.type(key)
def get_value(self,client,key):
try:
type = self.get_type(key)
if "none" == type:
print "key don't exist"
return None
elif "hash" == type:
return client.hgetall(key)
elif "string" == type:
return client.get(key)
elif "list" == type:
return client.lrange(key, 0, -1)
elif "set" == type:
return client.smembers(key)
elif "zset" == type:
return client.zrange(key, 0, -1, withscores=True)
print "none of this type"
return None
except redis.ResponseError:
traceback.print_exc()
return None
def get_keys_match_front(self,client,prefix):
try:
#返回符合前缀的所有键,返回类型 set类型
keys_set = client.keys(pattern=prefix+"*")
return keys_set
except redis.ResponseError:
traceback.print_exc()
return None
class SingleCommand(RedisCommandImp):
def __init__(self,prefix=""):
RedisCommandImp.__init__(self)
self.prefix = prefix
def execute(self,key):
key = self.prefix + str(key)
return self.get_value(self.client, key)
def clear(self,key,field=None):#删除哈希键只能用SingleCommand ,不能使用 Pipeline
key = self.prefix + str(key)
return self.clear_value(self.client, key,field)
def get_keys_by_prefix(self):
return self.get_keys_match_front(self.client,self.prefix)
class PipelineCommand(RedisCommandImp):
def __init__(self,prefix=""):
RedisCommandImp.__init__(self)
self.prefix = prefix
def execute(self,keys):
with self.client.pipeline() as pipe:
for one_key in keys:
if "" != one_key:
one_key = self.prefix + str(one_key)
self.get_value(pipe, one_key)
ret = pipe.execute()
return ret
return None
class RedisCommand(object):
def ping(self):
sc = SingleCommand()
return sc.ping()
def __get_keys(self,prefix=""):
sc = SingleCommand(prefix)
return sc.get_keys_by_prefix()
def __get_by_keys(self,keys,prefix=""):#keys is set or list
pc = PipelineCommand(prefix)
return pc.execute(keys)
def __get_by_key(self,key,prefix=""):
sc = SingleCommand(prefix)
return sc.execute(key)
def __clear_by_key(self,key,prefix=""):#删除键
sc = SingleCommand(prefix)
return sc.clear(key)
def __clear_by_keys(self,keys,prefix=""):#删除多键
if None == keys:
return False
if isinstance(keys,list) or isinstance(keys,set):
ret = False
for key in keys:
sc = SingleCommand()
if sc.clear(key):
ret = True
return ret
return False
def __clear_all_key(self,prefix):
keys_set = self.__get_keys(prefix)
print "__clear_all_key, keys_set :",keys_set
if 0 == len(keys_set):#没有需要删除的数据
return True
if None == keys_set:
return False
return self.__clear_by_keys(keys_set)
#具体的命令
#根据角色id获取角色数据
def get_by_juese_id(self,id):
return self.__get_by_key(id,"t_juese:")
#根据角色id列表获取角色数据
def get_by_juese_id_list(self,id_list):
return self.__get_by_keys(id_list,"t_juese:")
#清除某角色的数据
def clear_one_juese(self,id):#成功返回True,失败返回False
return self.__clear_by_key(id,"t_juese:")
#清除所有角色数据
def clear_all_juese(self):
return self.__clear_all_key(prefix="t_juese:")
#获取所有角色数据
def check_all_juese(self):
keys_set = self.__get_keys(prefix="t_juese:")
if None == keys_set:
return None
return self.__get_by_keys(keys_set)
#清除所有数据
def clear_all_data(self):
sc = SingleCommand()
ret = sc.tearDown()
return ret
#redis命令执行类
class RedisCommandExecute(object):
@staticmethod
#清除角色文件(获取的数据写入文件)
def init():
File.clear_juese_redis_file()
#清除指定的角色
@staticmethod
def redis_juese_clear(command):
result = 0
try:
command = command.replace(" ","")
if None == command or "" == command:
print "command empty"
return -1
rc = RedisCommand()
if False == rc.ping():#检查redis服务器是否可以连通
return -3
command_list = command.split(",")
ret = False
if 0 == len(command_list):
print "command empty or command error"
return -2
elif 1 == len(command_list):
ret = rc.clear_one_juese(command_list[0])
elif 1 < len(command_list):
for one_juese in command_list:
if rc.clear_one_juese(one_juese):
ret = True
if False == ret:
result = -1
except:
traceback.print_exc()
result = -2
return result
#清除所有的角色数据
@staticmethod
def redis_all_juese_clear():
result = 0
try:
rc = RedisCommand()
if False == rc.ping():
return -3
ret = rc.clear_all_juese()
if False == ret:
result = -1
except:
traceback.print_exc()
result = -2
return result
#获取指定的角色列表的数据并写到文件
@staticmethod
def redis_juese_check(command):
result = 0
try:
command = command.replace(" ","")
if None == command or "" == command:
print "command empty"
return -2
rc = RedisCommand()
if False == rc.ping():
return -3
command_list = command.split(",")
ret = None
if 0 == len(command_list):
print "command empty or command error"
return -2
elif 1 == len(command_list):
ret = rc.get_by_juese_id(command_list[0])
elif 1 < len(command_list):
ret = rc.get_by_juese_id_list(command_list)
print "result",result
if None == ret:
result = -1
return result
elif isinstance(ret,list) and 0 == len(ret):
result = -1
return result
print "ret\n",ret
print type(ret)
import json
# redis_data = json.loads(ret)
# print redis_data
redis_data = json.dumps(ret, sort_keys=True,indent=4)
print redis_data
File.write_redis_juese_info(redis_data)
except:
traceback.print_exc()
result = -2
return result
#获取指定的角色列表的数据并写到文件
@staticmethod
def redis_all_juese_check():
result = 0
try:
rc = RedisCommand()
if False == rc.ping():
return -3
ret = rc.check_all_juese()
if None == ret:
result = -1
return result
print "ret\n",ret
print type(ret)
import json
redis_data = json.dumps(ret, sort_keys=True,indent=4)
print redis_data
File.write_redis_juese_info(redis_data)
except:
traceback.print_exc()
result = -2
return result
#测试指令
@staticmethod
def redis_clear_all_data():
result = 0
try:
rc = RedisCommand()
if False == rc.ping():
return -3
ret = rc.clear_all_data()
if False == ret:
result = -1
except:
traceback.print_exc()
result = -2
return result
#测试指令
def test_redis_command():
rc = RedisCommand()
command = raw_input("input juese id,separated by ',' please:")
command_list = command.split(",")
if 0 == len(command_list):
print "command empty or command error"
return None
elif 1 == len(command_list):
result = rc.get_by_juese_id(command_list[0])
elif 1 < len(command_list):
result = rc.get_by_juese_id_list(command_list)
print "result",result
File.write_redis_juese_info(result)
#测试指令
def test_command():
while True:
test_redis_command()
#测试指令
if __name__ == "__main__":
RedisCommandExecute.redis_clear_all_data()
使用python作为工具是python容易写图形工具,而原来的项目也用到python作为逻辑脚本。
1、python 的一个第三方库wx图形支持
2、mysql角色数据管理
3、redis角色数据管理
1、python 的一个第三方库wx图形支持
python 的一个第三方库wx可以提供一些图形支持,支持事件循环和响应。
if __name__ == "__main__":
import sys
reload(sys)
sys.setdefaultencoding('utf8')#设置编码
#app = wx.PySimpleApp(True, 'log.txt')
app = wx.PySimpleApp()#创建一个应用程序,wx.PySimpleApp
frame = MainFrame(None)#MainFrame(wx.Frame)
gSetMainframe(frame)
frame.Show()
app.MainLoop()#开始图形事件循环
管理工具以一个标签页来作为工具的一部分而存在。
检查页的ui是一个wx.Panel : class VerifyDataUI(wx.Panel)
功能是查询、删除redis和mysql的角色数据。
2、mysql角色数据管理
(1)mysql数据访问
首先需要说明的是mysql的查询用到orm的技术(对象关系映射,使用到的第三方库是sqlalchemy),而角色表的orm内容如下:
含有角色id、角色背包(二进制)、角色商城背包(二进制)、角色仓库(二进制)、角色技能(二进制)。
#导入sqlalchemy 的类和类型
from sqlalchemy import Table, Column
from sqlalchemy.dialects.mysql import TINYINT, SMALLINT, INTEGER, BIGINT, \
TEXT, CHAR, VARCHAR, FLOAT, DATETIME, MEDIUMTEXT, BLOB
from sqlalchemy.orm import Mapper
from core.db.connect import metadata, TableObject
#类Tjuese_shuju映射的是mysql里的表t_juese_shuju。
#映射的方式是通过sqlalchemy.orm 的类 Mapper
tab_t_juese_wupin = Table("t_juese_shuju", metadata,
Column("jue_se", INTEGER, primary_key = True), # 角色id
Column("b_beibao", BLOB), # 角色背包
Column("b_cangku", BLOB), # 角色商城背包
Column("b_shangcheng_beibao", BLOB), # 角色仓库
Column("b_jineng", BLOB) # 角色技能
)
class Tjuese_shuju(TableObject):
COL_NAMES = ['jue_se', 'b_beibao', 'b_cangku', 'b_shangcheng_beibao',
'b_jineng']
TABLE_NAME = 't_juese_shuju'
ROW_INDEX = ['jue_se']
def __init__(self):
TableObject.__init__(self)
def get_name(self):
return Tjuese_shuju.TABLE_NAME
def get_col_names(self):
return Tjuese_shuju.COL_NAMES
def get_key_names(self):
return Tjuese_shuju.ROW_INDEX
def session_obj_copy(self,obj):#拷贝orm对象
self.__dict__ = dict(obj.__dict__,**self.__dict__)#保留自身的状态成员_sa_instance_state
mapper_t_juese_wupin = Mapper(Tjuese_shuju, tab_t_juese_wupin)
mysql的连接会话。会话的创建是使用sqlalchemy的接口。
@staticmethod
def make_session_class(ip,user,pw,port,database):
try:
print "ip,port,database",ip,port,database
if False == isinstance(port,int):
print "port must be int type"
return None
_connect_str = "mysql://%s:%s@%s:%d/%s?charset=utf8" % (user,pw,ip,port,database)
_engine = create_engine(_connect_str, echo=False)
print _engine.connect()
Session = sessionmaker(bind=_engine, autoflush=False, autocommit=True)
print "Session",Session
return Session
except:
traceback.print_exc()
return None
(2)mysql角色数据管理工具
获取mysql角色数据事件响应函数:
def OnVerifyMysqlOneJuesePackage(self,evt):
juese_id_str = self.ctl_mysql_juese_id.GetValue()
juese_id_str = juese_id_str.replace(" ", "")
juese_name_str = ""
if "" == juese_id_str:
juese_name_str = self.ctl_mysql_juese_name.GetValue()
juese_name_str = juese_name_str.encode("utf8")
juese_name_str = juese_name_str.replace(" ","")
if "" == juese_id_str and "" == juese_name_str:
title = u"请输入关键字(角色id或角色名)"
self.show_warnning(title)
return
juese_id = -1
if "" != juese_id_str:
try:
juese_id = int(juese_id_str)
if 0 >= juese_id:
title = u"角色id需要是正数"
self.show_warnning(title)
return
except:
title = u"角色id需要是数字"
self.show_warnning(title)
return
try:
ret = Consistency.check_one_juese_beibao(juese_id,juese_name_str)
if -1 == ret:
title = u"该角色数据为空"
self.show_warnning(title)
elif -2 == ret:
title = u"校验出错"
self.show_warnning(title)
elif -3 == ret:
title = u"数据库为连接或连接出错"
self.show_warnning(title)
else:
title = u"校验成功,请查看juese_beibao_check文件"
self.show_warnning(title)
except:
traceback.print_exc()
title = u"检查角色背包出错"
self.show_warnning(title)
通过会话把检查的角色数据写到文件里
检查所有角色数据,获取所有角色的背包数据,并以json的格式写入到文件中。
@staticmethod
def check_juese_beibao():
global ConsistencySession
if None == ConsistencySession:
return -3
session = ConsistencySession()
try:
juese_shuju_list = session.query(Tjuese_shuju).all()
if 0 == len(juese_shuju_list):
print "juese_shuju_list empty"
return -1
print "juese_shuju_list",juese_shuju_list
for one_juese_shuju in juese_shuju_list:
#检验一个角色背包
b_one_juese_beibao = one_juese_shuju.b_beibao
juese_id = one_juese_shuju.jue_se
one_juese_beibao_list = xjson.loads(b_one_juese_beibao)
one_juese_beibao = JueseBeibao(one_juese_beibao_list)
search_result = True
for one_juese_beibao_item in one_juese_beibao.juese_beibao:
if 0 < one_juese_beibao_item.wupin_guid and 0 < one_juese_beibao_item.flag:
one_t_wupin = session.query(Twupin).filter(Twupin.guid \
== one_juese_beibao_item.wupin_guid).first()
print "one_juese_beibao_item.wupin_guid",one_juese_beibao_item.wupin_guid
print "one_t_wupin",one_t_wupin
# time.sleep(0.01)
if None == one_t_wupin:
print "juese(%d) wupin guid (%d) don't exist" % \
(juese_id,one_juese_beibao_item.wupin_guid)
File.write_juese_beibao(one_juese_beibao_item,one_juese_shuju.jue_se)
search_result = False
if True == search_result:
File.write_juese_beibao_result("juese(%d) check juese_beibao consistency successfully" % (juese_id))
else:
File.write_juese_beibao_result("juese(%d) check juese_beibao consistency failed" % (juese_id))
except:
traceback.print_exc()
return -2
finally:
session.close()
return 0
根据指定的角色的id或名字来查询该角色的背包数据,并写入文件:
@staticmethod
def check_one_juese_beibao(juese_id,juese_name):
global ConsistencySession
if None == ConsistencySession:
return -3
session = ConsistencySession()
try:
one_juese_shuju = None
if 0 < juese_id:
one_juese_shuju = session.query(Tjuese_shuju).filter(
Tjuese_shuju.jue_se == juese_id).first()
if None == one_juese_shuju:
return -1
else:
one_juese = session.query(Tjuese).filter(Tjuese.str_name == juese_name).first()
if None == one_juese:
return -1
juese_id = one_juese.guid
one_juese_shuju = session.query(Tjuese_shuju).filter(
Tjuese_shuju.jue_se == one_juese.guid).first()
if None == one_juese_shuju:
return -1
#检验一个角色背包
b_one_juese_beibao = one_juese_shuju.b_beibao
one_juese_beibao_list = xjson.loads(b_one_juese_beibao)
one_juese_beibao = JueseBeibao(one_juese_beibao_list)
search_result = True
for one_juese_beibao_item in one_juese_beibao.juese_beibao:
if 0 < one_juese_beibao_item.wupin_guid and 0 < one_juese_beibao_item.flag:
one_t_wupin = session.query(Twupin).filter(Twupin.guid \
== one_juese_beibao_item.wupin_guid).first()
print "one_juese_beibao_item.wupin_guid",one_juese_beibao_item.wupin_guid
print "one_t_wupin",one_t_wupin
if None == one_t_wupin:
print "juese(%d) wupin guid (%d) don't exist" % \
(juese_id,one_juese_beibao_item.wupin_guid)
File.write_juese_beibao(one_juese_beibao_item,one_juese_shuju.jue_se)
search_result = False
if True == search_result:
File.write_juese_beibao_result("juese(%d) check juese_beibao consistency successfully" % (juese_id))
else:
File.write_juese_beibao_result("juese(%d) check juese_beibao juese(%d) consistency failed" % (juese_id))
except:
traceback.print_exc()
return -2
finally:
session.close()
return 0
检查redis角色数据事件响应函数:
def OnCheckRedisJuese(self,evt):
value = self.redis_juese_id_ctl.GetValue()
value = value.replace(" ", "")
try:
ret = RedisCommandExecute.redis_juese_check(value)
if -1 == ret:
title = u"没有该键,查询失败"
self.show_warnning(title)
elif -2 == ret:
title = u"查询出错"
self.show_warnning(title)
elif -3 == ret:
title = u"服务器未启动"
self.show_warnning(title)
else:
title = u"查询成功,请查看redis文件"
self.show_warnning(title)
except:
traceback.print_exc()
title = u"查询结果有误"
self.show_warnning(title)
3、redis角色数据管理
redis的角色数据是以哈希表的格式来存储的(t_juese:1:该角色所有数据)
该角色所有数据的格式是json格式。python对象支持把对象数据序列化成json格式。
获取的某角色的格式如hgetall(“t_juese:1”)
获取redis角色数据并以json格式写入到文件:
@staticmethod
def redis_juese_check(command):
result = 0
try:
command = command.replace(" ","")
if None == command or "" == command:
print "command empty"
return -2
rc = RedisCommand()
if False == rc.ping():#检查redis服务器是否响应
return -3
command_list = command.split(",")#一系列角色id是以,逗号分隔的字符串,一次查询多个角色数据
ret = None
if 0 == len(command_list):
print "command empty or command error"
return -2
elif 1 == len(command_list):
ret = rc.get_by_juese_id(command_list[0])#执行redis里的角色的查询(若角色id是1:最终的查询是 client.hgetall(“t_juese:1”),类型是hash )
elif 1 < len(command_list):
ret = rc.get_by_juese_id_list(command_list)#使用管道(pipeline),多个命令一次执行
print "result",result
if None == ret:
result = -1
return result
elif isinstance(ret,list) and 0 == len(ret):
result = -1
return result
print "ret\n",ret
print type(ret)
import json
# redis_data = json.loads(ret)
# print redis_data
redis_data = json.dumps(ret, sort_keys=True,indent=4)
print redis_data
File.write_redis_juese_info(redis_data)
except:
traceback.print_exc()
result = -2
return result
连接redis的实例
self.client = redis.Redis(host=RedisConfig.host, port=RedisConfig.port,
db=RedisConfig.db)
配置如下:
[node]
host = 127.0.0.1#redis的地址
db = 0#redis的使用的库
port = 6379#redis的端口
redis的操作命令封装如下:
#coding=utf-8
import ConfigParser
import redis
import traceback
from core.file import File
完整的redis命令执行文件如下:
#配置文件
_CONFIG_FILE = "config\cache.ini"
class RedisConfig(object):
cf = ConfigParser.ConfigParser()
cf.read(_CONFIG_FILE)
host = cf.get("node", "host")
port = cf.getint("node", "port")
db = cf.getint("node", "db")
@staticmethod
def set_config(host,port,db):
if False == isinstance(port,int):
print "port type wrong ",port
return false
if False == isinstance(db,int):
print "db type wrong ",db
return false
RedisConfig.host = host
RedisConfig.port = port
RedisConfig.db = db
cp = ConfigParser.ConfigParser()
cp.read(_CONFIG_FILE)
cp.set("node", "host",host)
cp.set("node", "port",port)
cp.set("node", "db",db)
cp.write(open(_CONFIG_FILE,"w"))
return True
@staticmethod
def ping(address,port):
import socket
try:
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.settimeout(1)
sk.connect((address,port))
sk.close()
print 'ping ok'
return True
except Exception:
print 'ping error'
return False
class RedisCommandImp(object):
def __init__(self):
self.client = redis.Redis(host=RedisConfig.host, port=RedisConfig.port,
db=RedisConfig.db)
def ping(self): #测试连接,连通返回 True ,不通返回False
try:
return self.client.ping()
except:
return False
def tearDown(self):
return self.client.flushdb()
def execute(self):
print "execute(),to fulfill by subclass"
def clear_value(self,client,key,field = None):
try:
type = self.get_type(key)
if "none" == type:
print "key don't exist"
return False
elif "hash" == type:
if None == field:#域为空,删除键的所有域
resutl_dict = client.hgetall(key)
field_list = resutl_dict.keys()
for one_field in field_list:
if client.hexists(key,one_field):
client.hdel(key, one_field)
return True
if isinstance(field,list) or isinstance(field,set) :#域为列表或集
for one_field in field:
if client.hexists(key,one_field):
client.hdel(key, one_field)
return True
else:
if client.hexists(key,field):
client.hdel(key, field)
return True
else:
return False
elif "string" == type:
print "redis don't has such command"
return False
elif "list" == type:
return client.lpop(key)#或根据策略改成client.rpop(key) 或如client.lrem('a', 0, 'a1')
elif "set" == type:
return client.srem(key, field)
elif "zset" == type:
return client.zrem(key, field)
print "none of this type"
return False
except redis.ResponseError:
traceback.print_exc()
return False
def get_type(self,key):
return self.client.type(key)
def get_value(self,client,key):
try:
type = self.get_type(key)
if "none" == type:
print "key don't exist"
return None
elif "hash" == type:
return client.hgetall(key)
elif "string" == type:
return client.get(key)
elif "list" == type:
return client.lrange(key, 0, -1)
elif "set" == type:
return client.smembers(key)
elif "zset" == type:
return client.zrange(key, 0, -1, withscores=True)
print "none of this type"
return None
except redis.ResponseError:
traceback.print_exc()
return None
def get_keys_match_front(self,client,prefix):
try:
#返回符合前缀的所有键,返回类型 set类型
keys_set = client.keys(pattern=prefix+"*")
return keys_set
except redis.ResponseError:
traceback.print_exc()
return None
class SingleCommand(RedisCommandImp):
def __init__(self,prefix=""):
RedisCommandImp.__init__(self)
self.prefix = prefix
def execute(self,key):
key = self.prefix + str(key)
return self.get_value(self.client, key)
def clear(self,key,field=None):#删除哈希键只能用SingleCommand ,不能使用 Pipeline
key = self.prefix + str(key)
return self.clear_value(self.client, key,field)
def get_keys_by_prefix(self):
return self.get_keys_match_front(self.client,self.prefix)
class PipelineCommand(RedisCommandImp):
def __init__(self,prefix=""):
RedisCommandImp.__init__(self)
self.prefix = prefix
def execute(self,keys):
with self.client.pipeline() as pipe:
for one_key in keys:
if "" != one_key:
one_key = self.prefix + str(one_key)
self.get_value(pipe, one_key)
ret = pipe.execute()
return ret
return None
class RedisCommand(object):
def ping(self):
sc = SingleCommand()
return sc.ping()
def __get_keys(self,prefix=""):
sc = SingleCommand(prefix)
return sc.get_keys_by_prefix()
def __get_by_keys(self,keys,prefix=""):#keys is set or list
pc = PipelineCommand(prefix)
return pc.execute(keys)
def __get_by_key(self,key,prefix=""):
sc = SingleCommand(prefix)
return sc.execute(key)
def __clear_by_key(self,key,prefix=""):#删除键
sc = SingleCommand(prefix)
return sc.clear(key)
def __clear_by_keys(self,keys,prefix=""):#删除多键
if None == keys:
return False
if isinstance(keys,list) or isinstance(keys,set):
ret = False
for key in keys:
sc = SingleCommand()
if sc.clear(key):
ret = True
return ret
return False
def __clear_all_key(self,prefix):
keys_set = self.__get_keys(prefix)
print "__clear_all_key, keys_set :",keys_set
if 0 == len(keys_set):#没有需要删除的数据
return True
if None == keys_set:
return False
return self.__clear_by_keys(keys_set)
#具体的命令
#根据角色id获取角色数据
def get_by_juese_id(self,id):
return self.__get_by_key(id,"t_juese:")
#根据角色id列表获取角色数据
def get_by_juese_id_list(self,id_list):
return self.__get_by_keys(id_list,"t_juese:")
#清除某角色的数据
def clear_one_juese(self,id):#成功返回True,失败返回False
return self.__clear_by_key(id,"t_juese:")
#清除所有角色数据
def clear_all_juese(self):
return self.__clear_all_key(prefix="t_juese:")
#获取所有角色数据
def check_all_juese(self):
keys_set = self.__get_keys(prefix="t_juese:")
if None == keys_set:
return None
return self.__get_by_keys(keys_set)
#清除所有数据
def clear_all_data(self):
sc = SingleCommand()
ret = sc.tearDown()
return ret
#redis命令执行类
class RedisCommandExecute(object):
@staticmethod
#清除角色文件(获取的数据写入文件)
def init():
File.clear_juese_redis_file()
#清除指定的角色
@staticmethod
def redis_juese_clear(command):
result = 0
try:
command = command.replace(" ","")
if None == command or "" == command:
print "command empty"
return -1
rc = RedisCommand()
if False == rc.ping():#检查redis服务器是否可以连通
return -3
command_list = command.split(",")
ret = False
if 0 == len(command_list):
print "command empty or command error"
return -2
elif 1 == len(command_list):
ret = rc.clear_one_juese(command_list[0])
elif 1 < len(command_list):
for one_juese in command_list:
if rc.clear_one_juese(one_juese):
ret = True
if False == ret:
result = -1
except:
traceback.print_exc()
result = -2
return result
#清除所有的角色数据
@staticmethod
def redis_all_juese_clear():
result = 0
try:
rc = RedisCommand()
if False == rc.ping():
return -3
ret = rc.clear_all_juese()
if False == ret:
result = -1
except:
traceback.print_exc()
result = -2
return result
#获取指定的角色列表的数据并写到文件
@staticmethod
def redis_juese_check(command):
result = 0
try:
command = command.replace(" ","")
if None == command or "" == command:
print "command empty"
return -2
rc = RedisCommand()
if False == rc.ping():
return -3
command_list = command.split(",")
ret = None
if 0 == len(command_list):
print "command empty or command error"
return -2
elif 1 == len(command_list):
ret = rc.get_by_juese_id(command_list[0])
elif 1 < len(command_list):
ret = rc.get_by_juese_id_list(command_list)
print "result",result
if None == ret:
result = -1
return result
elif isinstance(ret,list) and 0 == len(ret):
result = -1
return result
print "ret\n",ret
print type(ret)
import json
# redis_data = json.loads(ret)
# print redis_data
redis_data = json.dumps(ret, sort_keys=True,indent=4)
print redis_data
File.write_redis_juese_info(redis_data)
except:
traceback.print_exc()
result = -2
return result
#获取指定的角色列表的数据并写到文件
@staticmethod
def redis_all_juese_check():
result = 0
try:
rc = RedisCommand()
if False == rc.ping():
return -3
ret = rc.check_all_juese()
if None == ret:
result = -1
return result
print "ret\n",ret
print type(ret)
import json
redis_data = json.dumps(ret, sort_keys=True,indent=4)
print redis_data
File.write_redis_juese_info(redis_data)
except:
traceback.print_exc()
result = -2
return result
#测试指令
@staticmethod
def redis_clear_all_data():
result = 0
try:
rc = RedisCommand()
if False == rc.ping():
return -3
ret = rc.clear_all_data()
if False == ret:
result = -1
except:
traceback.print_exc()
result = -2
return result
#测试指令
def test_redis_command():
rc = RedisCommand()
command = raw_input("input juese id,separated by ',' please:")
command_list = command.split(",")
if 0 == len(command_list):
print "command empty or command error"
return None
elif 1 == len(command_list):
result = rc.get_by_juese_id(command_list[0])
elif 1 < len(command_list):
result = rc.get_by_juese_id_list(command_list)
print "result",result
File.write_redis_juese_info(result)
#测试指令
def test_command():
while True:
test_redis_command()
#测试指令
if __name__ == "__main__":
RedisCommandExecute.redis_clear_all_data()
相关文章推荐
- 服务器安装.net 时候提示:必须使用角色管理工具安装或配置microsoft.net framework的解决方法
- Python游戏服务器开发日记(四)scons编译工具、C和C++混合使用
- 选定的数据存储区出现问题,原因可能是服务器名称或凭据无效,或者权限不足。也可能是未启用角色管理器功能造成的。
- 安装 SQL SERVER 2008 必须使用 "角色管理工具" 错误 的 解决方案
- 用Python开发主机批量管理工具
- supervisor - Python进程管理工具
- python+django(admin管理工具)
- Python包管理工具pip安装
- python 包管理工具pip基本用法
- Python 包管理工具解惑
- 打通流程管理与MIS数据的管道是运用工作流程工具搭建业务系统的关键
- window环境下安装 pip 工具 【pip为Python的扩展管理工具】
- Python之包管理工具
- Python 包管理工具解惑
- CentOS 6.4安装pip,CentOS安装python包管理安装工具pip的方法
- windows XP下Python2.7包管理工具安装-setuptool,pip、distribute、nose、virtualenv
- 【编程语言】Python 使用包管理工具pip安装模块numpy、scipy、matplotlib以及scikit-learn CentOS 7
- Python的包管理工具easy_install, setuptools, pip,distribute介绍
- Python的包管理工具Pip
- 游戏服务器关于玩家数据的解决方案