您的位置:首页 > 数据库 > Redis

Python,Redis学习实践(一)

2017-03-20 18:49 323 查看
Ubuntu系统redis-py客户端安装:

Redis的Python客户端redis-py

安装 python 的 redis 客户端

Redis存储的设计实现

1 使用redis数据结构中的set来存储所有博客的ID.key为:BLOG:LIST

2 使用redis数据结构中的hash来存储每一条博客内容,key为博客的ID(BLOG:ID)。

blog 组成项 id title content author time

现有类:CBlog CBlogManager CBlogRedis

博客类CBlog 实现:

# blog class
class CBlog:
# constructor
def __init__(self, blog_id=1, title="title",
content=".....",
author="xxx",
bl_datetime=time.ctime()):
self.m_blog_id = blog_id
self.m_title = title
self.m_content = content
self.m_auhtor = author
self.m_datetime = bl_datetime

# get blog id
def get_blog_id(self):
return self.m_blog_id

# get title
def get_title(self):
return self.m_title

# get content
def get_content(self):
return self.m_content

# get author
def get_author(self):
return self.m_auhtor

# get datetime
def get_date_time(self):
return self.m_datetime

# set blog from list
def set_blog(self, blist):
self.m_blog_id = blist[0]
self.m_title = blist[1]
self.m_auhtor = blist[2]
self.m_datetime = blist[3]
self.m_content = blist[4]

# add blog info to redis
def add_blog_to_redis(self, blog_redis):
blog_redis.add_blog_to_redis(self)

# show Blog info
def play_blog(self):
print "*********************************"
print "TITLE:", self.m_title
print "------------------"
print "Author:", self.m_auhtor
print "------------------"
print "DateTime:", self.m_datetime
print "------------------"
print "Content:", self.m_content
print "*********************************"

# input blog info
def input_blog_info(self):
self.m_title = raw_input("Title:")
self.m_auhtor = raw_input("Author:")
self.m_content = raw_input("Content:")


博客管理类CBlogManager实现

# blog manager class
class CBlogManager:
# constructor
def __init__(self):
self.m_blog_list = list()

# add blog
def add_blog(self, blog):
self.m_blog_list.append(blog)

# get blog list
def get_blog_list(self):
return self.m_blog_list

# show blog list
def play_blog_list(self):
if len(self.m_blog_list) > 0:
for blog in self.m_blog_list:
blog.play_blog()
else:
print 'Blog List is empty'

# get blog list  to redis
def get_blog_from_redis(self, blog_redis):
blog_redis.get_blogs_from_redis(self)


与redis通信类CBlogRedis实现

# redis id and port
REDIS_IP='127.0.0.1'
REDIS_PORT=6379

# redis keys
BLOG_LIST_KEY = 'BLOG:LIST'

BLOG_COUNT_KEY = 'blog:count'

BLOG_ID_KEY = 'blog:id'
BLOG_TITLE_KEY = 'blog:title'
BLOG_AUTHOR_KEY = 'blog:author'
BLOG_CONTENT_KEY = 'blog:content'
BLOG_DATETIME_KEY = 'blog:datetime'

# operate redis class
class CBlogRedis:
# constructor
def __init__(self):
self.m_redis_ip = REDIS_IP
self.m_redis_port = REDIS_PORT
self.m_redis_pool = redis.ConnectionPool(host=REDIS_IP, port=REDIS_PORT)

# get redis connect
def get_redis_connect(self):
return redis.Redis(connection_pool=self.m_redis_pool)

# add blog info to redis
def add_blog_to_redis(self, blog_info):
# get redis connect
rc = redis.Redis(connection_pool=self.m_redis_pool)

# get blog id
blog_count = rc.incr(BLOG_COUNT_KEY)

# create blog blog_id
blog_id = 'BLOG:'+str(blog_count)

# get the pipeline instance
pipe = rc.pipeline()

# add blog_id set
pipe.sadd(BLOG_LIST_KEY, blog_id)

# add blog info to blog redis hash
blog_map = {BLOG_ID_KEY: blog_id,
BLOG_TITLE_KEY: blog_info.get_title(),
BLOG_AUTHOR_KEY: blog_info.get_author(),
BLOG_DATETIME_KEY: blog_info.get_date_time(),
BLOG_CONTENT_KEY: blog_info.get_content()}

pipe.hmset(blog_id, blog_map)

# show execute result
print 'Execute Result:', pipe.execute()

# get blog info from redis
def get_blogs_from_redis
c8ca
(self, blog_manager):
# get redis connect
rc = redis.Redis(connection_pool=self.m_redis_pool)

# get blog_id set info
blog_id_list = rc.smembers(BLOG_LIST_KEY)

# get the pipeline instance
pipe = rc.pipeline()

# get blog info from redis hash
for blog_id in blog_id_list:
pipe.hmget(blog_id, BLOG_ID_KEY,
BLOG_TITLE_KEY,
BLOG_AUTHOR_KEY,
BLOG_DATETIME_KEY,
BLOG_CONTENT_KEY)

# get pipe execute result
result_list = pipe.execute()

# add blog info to blog_list
for blog_info in result_list:
cur_blog = CBlog()
# set blog from list
cur_blog.set_blog(blog_info)

# add blog to blog_list
blog_manager.add_blog(cur_blog)


测试和展示的代码实现

# input blog info
def input_blog_info(blog_manager, blog_redis):
while(True):
instr = raw_input("a-add blog\nq-Quit input\ns-show blog info\n")
# add
if instr == 'a':
cur_blog = CBlog()
cur_blog.input_blog_info()
cur_blog.add_blog_to_redis(blog_redis)
# display
elif instr == 's':
blog_manager.get_blog_from_redis(blog_redis)
blog_manager.play_blog_list()
# quit
elif instr == 'q':
print "Quit()"
break
# error input
else:
print "Input error\n"

# main
if __name__ == "__main__":
# test_blog()
blog_list = CBlogManager()
blog_redis = CBlogRedis()
input_blog_info(blog_list, blog_redis)
print 'My Blog'
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  python redis