您的位置:首页 > 数据库 > Memcache

Python操作 Memcache、Redis、RabbitMQ、SQLAlchemy

2016-07-30 10:35 507 查看
Memcached

Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载。它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态、数据库驱动网站的速度。Memcached基于一个存储键/值对的hashmap。其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信。

安装

yum -y install libevent-devel

wget http://memcached.org/latest

tar -zxvf memcached-1.x.x.tar.gz

cd memcached-1.x.x

./configure && make && make test && sudo make install

启动Memcached

2、天生支持集群

python-memcached模块原生支持集群操作,其原理是在内存维护一个主机列表,且集群中主机的权重值和主机在列表中重复出现的次数成正比

如果用户根据如果要在内存中创建一个键值对(如:k1 = "v1"),那么要执行一下步骤:

根据算法将 k1 转换成一个数字

将数字和主机列表长度求余数,得到一个值 N( 0 <= N < 列表长度 )

在主机列表中根据 第2步得到的值为索引获取主机,例如:host_list

连接 将第3步中获取的主机,将 k1 = "v1" 放置在该服务器的内存中

代码实现如下:

mc
=
memcache.Client([(
'1.1.1.1:12000'
,
1
),(
'1.1.1.2:12000'
,
2
),(
'1.1.1.3:12000'
,
1
)],debug
=
True
)


mc.
set
(
'k1'
,
'v1'
)


3、操作

 >>> import memcache  #导入模块

>>> mc= memcache.Client(['10.211.55.5:12000'],debug=True) #连接
>>> mc.set('x',1) #set()设置参数
True
>>> mc.set('y',2)
True
>>> mc.set('z',3)
True
>>> print (mc.get('x')) #get()获取参数
1
>>> print (mc.get('y'))
2
>>> print (mc.get('x'))
1
>>> print (mc.get('z'))
3
>>> mc.add('a',11) #增加参数,如已存在,刚报错
True
>>> mc.add('a',11)
False
MemCached: while expecting 'STORED',got unexpected response 'NOT_STORED'
>>> mc.add('x',11)
MemCached: while expecting 'STORED',got unexpected response 'NOT_STORED'
False

>>> mc.get_multi(['x','y','z']) #获取多个值,反回字典形式
{'z': 3,'x': 1,'y': 2}
>>> mc.delete('a')
1
>>> mc.set_multi({'a':11,'b':22,'c':33}) == [] #设置多个数值,返回空列表
True
>>> mc.get('a')
11
>>> mc.get('b')
22
>>> mc.get('c')
33
>>> mc.delete('a') #删除
1
>>> mc.get('a')
>>> mc.get('b')
22
>>> mc.cas('b','2222') #对已存在的值修改
True
>>> mc.get('b')
'2222'

>>> mc.set('name','haha',10) #设置失效时间(10为失效时间为10s)
True
>>> mc.get('name')
'haha'
>>> mc.get('name') #10秒后找不到了

>>> mc.replace('b',1111) #修改,值不存在则添加
True
>>> mc.get('b')
1111

>>> mc.set('haha','') #设置值为空
True
>>> mc.replace('haha','nimeide')
True
>>> mc.get('haha')
'nimeide'

>>> mc.get('x') #整数加法,默认是1
1
>>> mc.incr('x')
2
>>> mc.incr('x')
3
>>> mc.incr('x',11)
14
>>> mc.get('x') #整数减法,默认是1
14
>>> mc.decr('x')
13
>>> mc.decr('x')
12
>>> mc.decr('x',10)
2


4、gets 和 cas

如商城商品剩余个数,假设改值保存在memcache中,product_count = 900
A用户刷新页面从memcache中读取到product_count = 900
B用户刷新页面从memcache中读取到product_count = 900

如果A、B用户均购买商品

A用户修改商品剩余个数 product_count=899
B用户修改商品剩余个数 product_count=899

如此一来缓存内的数据便不在正确,两个用户购买商品后,商品剩余还是 899
如果使用python的set和get来操作以上过程,那么程序就会如上述所示情况!

如果想要避免此情况的发生,只要使用 gets 和 cas 即可,如:

Ps:本质上每次执行gets时,会从memcache中获取一个自增的数字,通过cas去修改gets的值时,会携带之前获取的自增值和memcache中的自增值进行比较,如果相等,则可以提交,如果不想等,那表示在gets和cas执行之间,又有其他人执行了gets(获取了缓冲的指定值), 如此一来有可能出现非正常数据,则不允许修改。

redis

redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。

安装:

wget http:
/
/
download.redis.io
/
releases
/
redis
-
3.0
.
6.tar
.gz


import redis

r = redis.Redis(host='10.211.55.5',port=6379,)

r.set('haha','soso')

print(r.get('haha'))


结果如下

b'soso'


2、连接池

redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池。

import redis

pool = redis.ConnectionPool(host='10.211.55.5',port=6379,)

r = redis.Redis(connection_pool=pool)

r.set('haha','hehe')
print(r.get('haha'))


结果哪下

b'hehe'


3、操作

String操作,redis中的String在在内存中按照一个name对应一个value来存储。如图:

name      value

n1 ======> v1

n2 ======> v2

n3 ======> v3

set(name,value,ex=None,px=None,nx=False,xx=False)

在Redis中设置值,默认,不存在则创建,存在则修改

psetex(name,time_ms,value)

# 设置值


批量设置值


# 在name对应的列表的左侧获取第一个元素并在列表中移除,返回值则是第一个元素


# 在name对应的列表分片获取数据


# name对应的集合中添加元素

scard(name)

获取name对应的集合中元素个数

sdiff(keys,*args)

在第一个name对应的集合中且不在其他name对应的集合的元素集合

sdiffstore(dest,keys,*args)

# 获取第一个name对应的集合中且不在其他name对应的集合,再将其新加入到dest对应的集合中

sinter(keys,*args)

# 获取多一个name对应集合的并集

sinterstore(dest,keys,*args)

# 获取多一个name对应集合的并集,再讲其加入到dest对应的集合中

sismember(name,value)

# 检查value是否是name对应的集合的成员

smembers(name)

# 获取name对应的集合的所有成员

smove(src,dst,value)

# 将某个成员从一个集合中移动到另外一个集合

spop(name)

# 从集合的右侧(尾部)移除一个成员,并将其返回

srandmember(name,numbers)

# 从name对应的集合中随机获取 numbers 个元素

srem(name,values)

# 在name对应的集合中删除某些值

sunion(keys,*args)

# 获取多一个name对应的集合的并集

sunionstore(dest,keys,*args)

# 获取多一个name对应的集合的并集,并将结果保存到dest对应的集合中

sscan(name,cursor=0,match=None,count=None)
sscan_iter(name,match=None,count=None)

# 同字符串的操作,用于增量迭代分批获取元素,避免内存消耗太大

有序集合,在集合的基础上,为每元素排序;元素的排序需要根据另外一个值来进行比较,所以,对于有序集合,每一个元素有两个值,即:值和分数,分数专门用来做排序。

zadd(name,*args,**kwargs)

# 在name对应的有序集合中添加元素


发布者

from my_demo import RedisHelper

obj = RedisHelper()
obj.public('hello')


RabbitMQ

对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

import pika

# ######################### 生产者 #########################

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()


import pika

# ########################## 消费者 ##########################

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch,method,properties,body):
print(" [x] Received %r" % body)

channel.basic_consume(callback,
queue='hello',
no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


1、acknowledgment 消息不丢失

no-ack = False,如果消费者遇到情况(its channel is closed,connection is closed,or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='10.211.55.4'))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch,method,properties,body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print 'ok'
ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
queue='hello',
no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


2、durable 消息不丢失

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column,Integer,String
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine

engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11",max_overflow=5)

Base = declarative_base()

class User(Base):
__tablename__ = 'users'
id = Column(Integer,primary_key=True)
name = Column(String(50))

# 寻找Base的所有子类,按照子类的结构在数据库中生成对应的数据表信息
# Base.metadata.create_all(engine)

Session = sessionmaker(bind=engine)
session = Session()

# ########## 增 ##########
# u = User(id=2,name='sb')
# session.add(u)
# session.add_all([
# User(id=3,name='sb'),
# User(id=4,name='sb')
# ])
# session.commit()

# ########## 删除 ##########
# session.query(User).filter(User.id > 2).delete()
# session.commit()

# ########## 修改 ##########
# session.query(User).filter(User.id > 2).update({'cluster_id' : 0})
# session.commit()
# ########## 查 ##########
# ret = session.query(User).filter_by(name='sb').first()

# ret = session.query(User).filter_by(name='sb').all()
# print ret

# ret = session.query(User).filter(User.name.in_(['sb','bb'])).all()
# print ret

# ret = session.query(User.name.label('name_label')).all()
# print ret,type(ret)

# ret = session.query(User).order_by(User.id).all()
# print ret

# ret = session.query(User).order_by(User.id)[1:3]
# print ret
# session.commit()


View Code
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: