您的位置:首页 > 其它

装饰器进阶

2019-07-23 18:03 1581 查看

一 functools 模块

1 update_wrapper模块

Functools.update_wrapper(wrapper,wrapped,assigned=WRAPPER_ASSIGNMENTS,updated=WRAPPER_UPDATES)
类似 copy_properties功能
Wrapper 包装函数
wrapped 被包装函数(源函数)
元祖 WRAPPERA_ASSIGNMENTS中是被覆盖的属性
'_module_','_name_','_qualname_','_doc_','_annotations_'
模块名。名称。限定名。文档。参数注解
元祖WRAPPER_UPDATES是要被更新的属性__dict__属性字典。将自己字典中的东西添加进去,而不是覆盖别人的东西,key相同,则是覆盖,key不同,则是增加
增加一个__wrapped__属性,保留着wrapped函数

# 装饰器参数传递及相关应用
import  functools
import  datetime,time
def  x(t):
def _logger(fn):
def  _wapper(*args,**kwargs):
'''This  is  wapper'''
print ("login.....")
start=datetime.datetime.now()
ret = fn(*args,**kwargs)
du=datetime.datetime.now()-start
if  du.total_seconds()<5:
print  (fn.__name__,du)
else:
print  ("goods")
print ("logout....")
return  ret
functools.update_wrapper(_wapper,fn)
return  _wapper
return  _logger

@x(5)
def  add(x,y):
'''This is add'''
time.sleep(5)
return  x+y

print (add(5,6),add.__name__,add.__doc__,sep='\n')

查看显示结果

2 wraps ,类似上述的变种

# 装饰器参数传递及相关应用
import  functools
import  datetime,time
def  x(t):
def _logger(fn):
@functools.wraps(fn)#通过闭包和装饰器完成,其和上面的_cpoy的是实现原理相似
def  _wapper(*args,**kwargs):
'''This  is  wapper'''
print ("login.....")
start=datetime.datetime.now()
ret = fn(*args,**kwargs)
du=datetime.datetime.now()-start
if  du.total_seconds()<5:
print  (fn.__name__,du)
else:
print  ("goods")
print ("logout....")
return  ret
return  _wapper
return  _logger

@x(5)
def  add(x,y):
'''This is add'''
time.sleep(5)
return  x+y

print (add(5,6),add.__name__,add.__doc__,sep='\n')

结果如下:

3 partial 方法

偏函数,把函数部分的参数固定下来,相当于为部分的参数添加了一个固定的默认值,形成一个新的函数并返回
从partial 生成新函数,是对原函数的封装

import  inspect
import   functools
def add(x,y)->int:
return  x+y
newadd=functools.partial(add,y=5)  # 构建新函数,使用y=5称为其默认值
print (newadd(3))   # 默认是x传值
print (newadd(4,y=6))
print (newadd(x=6,y=7))
sig1=inspect.signature(add)
sig2=inspect.signature(newadd)
print (sig1.parameters)
print (sig2.parameters)

结果如下

import  inspect
import   functools
def  add(x,y,*args)->int:
print (x,y,args)
newadd= functools.partial(add,1,2,3,4,5,6)
print (inspect.signature(add).parameters.items())
print (inspect.signature(newadd).parameters.items())
print (newadd())
print (newadd(7))
print (newadd(8,9))
print (newadd(10,11,y=4,x=5))  #此处的x,y 已经定义并且已经赋值,不能被重复赋值

结果如下

4 lru_cache

functools.lru_cache(maxsize=128,typed=False)
least-recently-used装饰器,lru,最近最少使用,cache缓存
如果maxsize设置为None,则禁用LRU功能,并且缓存可以无限制增长,当maxsize是二的幂时,LRU功能执行得最好 。
如果typed设置为True,则不同类型的函数参数将单独存储,如f(3)和f(3.0)将被视为具有不同结果的不同调用

实例

import   functools
import  datetime
import  time
def  logger(fn):
def _wapper(*args,**kwargs):
start_time=datetime.datetime.now()
ret = fn(*args,**kwargs)
send=(datetime.datetime.now()-start_time).total_seconds()
print  ("{} 函数执行时间为: {}".format(fn.__name__,send))
return   ret
return   _wapper
@logger  # 此处的调用不分先后次序,其结果都一样
@functools.lru_cache()
def  add(x,y,z=3):
time.sleep(3)
return  x+y+z
print (add(3,4))
print  (add(3,4))
print  (add(3.0,4.0))
print (add(3,4,3))  # 此处是重新计算
print (add(3,4.0,3.0))

执行结果为

lru_cache 装饰器基础应用

import   functools
import  datetime
import  time
def  logger(fn):
def _wapper(*args,**kwargs):
start_time=datetime.datetime.now()
ret = fn(*args,**kwargs)
send=(datetime.datetime.now()-start_time).total_seconds()
print  ("{} 函数执行时间为: {}".format(fn.__name__,send))
return   ret
return   _wapper
@logger  # 此处的调用不分先后次序,其结果都一样
@functools.lru_cache()
def  Fib(n):
while  n < 2:
return   n
return   Fib(n-1)+Fib(n-2)
print  ([Fib(x)  for x  in range(10)])

结果如下

总结

lrucache 装饰器应用
使用前提
同样的函数参数一定得到同样的结果
函数执行时间很长,且要多次执行
本质是函数调用的参数=> 返回值
缺点
不支持缓存过期,key无法过期,失效
不支持清除操作
不支持分布式,是一个单机的缓存
使用场景,单机上需要空间换取时间的地方,可以使用缓存来计算变成快速的查询*

二 装饰器练习

1 实现一个cache装饰器,实现可过期,可删除的功能,可以不换出

思想:
1 cache是通过可hash对象进行存储和调用的,因此其存入的key必须是不可变类型
2 通过前面的inspect模块取出对应的形参,及key,通过传入的实参获取到其对应的值,进行键和值的处理

实例如下
初步代码实现如下:

#!/usr/bin/poython3.6
#conding:utf-8
import   functools
import  datetime
import  time
import  inspect
def  logger(fn):
local_cache={}  # 此处定义一个缓冲器
@functools.wraps(fn)
def  _wapper(*args,**kwargs):
list_dict={}  #此处定义一个构建key的字典
sig=inspect.signature(fn)
param=sig.parameters
param_list=list(param.keys())
for  i,v  in  enumerate(args):    # 此处的作用是取出形参和传入实参的对应关系
list_dict[param_list[i]]=v
list_dict.update(kwargs)
key = tuple(sorted(list_dict.keys()))  # 通过此处获取字典的键的固定顺序的元祖,因为元祖是不可变数据类型
if  key  not  in  local_cache.keys():  #查询这个key是否在此缓存中
ret = fn(*args,**kwargs)  #此处调用外部参数获取值
local_cache[key]=ret  # 此处对值进行处理
return  local_cache[key]  # 此处将值返回,用于返回,此处若返回为ret,则缓存变失去了意义,
return   _wapper
def  functime(fn):
@functools.wraps(fn)
def _wapper(*args,**kwargs):
start_time=datetime.datetime.now()
ret  =  fn(*args,**kwargs)
send=(datetime.datetime.now()-start_time).total_seconds()
print ("{} 函数的执行时间为:{}".format(fn.__name__,send))
return   ret
return   _wapper
@functime
@logger
def  add(x,y=4):
time.sleep(3)
return  x+y
print (add(x=3,y=4))
print (add(3,4))
print  (add(y=4,x=3))

执行结果如下

添加默认值参数

import   functools
import  datetime
import  time
import  inspect
def  logger(fn):
local_cache={}  # 此处定义一个缓冲器
@functools.wraps(fn)
def  _wapper(*args,**kwargs):
list_dict={}  #此处定义一个构建key的字典
sig=inspect.signature(fn)
param=sig.parameters
param_list=list(param.keys())
for  i,v  in  enumerate(args):    # 此处的作用是取出形参和传入实参的对应关系
list_dict[param_list[i]]=v
list_dict.update(kwargs)
for  i  in  param.keys():  #检测形式参数
if  i not  in  list_dict.keys():  # 判断形参是否在传入的对应参数中,若不存在,则由默认情况,则进行加入对应的字典中
list_dict[i]=param[i].default
key = tuple(sorted(list_dict.keys()))  # 通过此处获取字典的键的固定顺序的元祖,因为元祖是不可变数据类型
if  key  not  in  local_cache.keys():  #查询这个key是否在此缓存中
ret = fn(*args,**kwargs)  #此处调用外部参数获取值
local_cache[key]=ret  # 此处对值进行处理
return  local_cache[key]  # 此处将值返回,用于返回,此处若返回为ret,则缓存变失去了意义,
return   _wapper
def  functime(fn):
@functools.wraps(fn)
def _wapper(*args,**kwargs):
start_time=datetime.datetime.now()
ret  =  fn(*args,**kwargs)
send=(datetime.datetime.now()-start_time).total_seconds()
print ("{} 函数的执行时间为:{}".format(fn.__name__,send))
return   ret
return   _wapper
@functime
@logger
def  add(x,y=4):
time.sleep(3)
return  x+y
print (add(x=3,y=4))
print (add(3,4))
print  (add(y=4,x=3))
print (add(3))

结果如下

配置过期功能

import  functools
import  datetime
import  time
import  inspect
def  logger(times):
def  _logger(fn):
local_cache={}  # 此处定义一个缓冲器
@functools.wraps(fn)
def  _wapper(*args,**kwargs):
pop_key_list=[]
for  k,(_,item)  in  local_cache.items():
if datetime.datetime.now().timestamp()- item> times:
pop_key_list.append(k)
for  i in pop_key_list:
local_cache.pop(i)
list_dict={}  #此处定义一个构建key的字典
sig=inspect.signature(fn)
param=sig.parameters
param_list=list(param.keys())
for  i,v  in  enumerate(args):    # 此处的作用是取出形参和传入实参的对应关系
list_dict[param_list[i]]=v
list_dict.update(kwargs)
for  i  in  param.keys():  #检测形式参数
if  i not  in  list_dict.keys():  # 判断形参是否在传入的对应参数中,若不存在,则由默认情况,则进行加入对应的字典中
list_dict[i]=param[i].default
key = tuple(sorted(list_dict.keys()))  # 通过此处获取字典的键的固定顺序的元祖,因为元祖是不可变数据类型
if  key  not  in  local_cache.keys():  #查询这个key是否在此缓存中
ret = fn(*args,**kwargs)  #此处调用外部参数获取值
local_cache[key]=(ret,datetime.datetime.now().timestamp())  # 此处将值返回,用于返回,此处若返回为ret,则缓存变失去了意义,
return  local_cache[key]
return   _wapper
return  _logger
def  functime(fn):
@functools.wraps(fn)
def _wapper1(*args,**kwargs):
start_time=datetime.datetime.now()
ret=fn( *args,**kwargs)
send=(datetime.datetime.now()-start_time).total_seconds()
print ("{} 函数的执行时间为:{}".format(fn.__name__,send))
return   ret
return   _wapper1
@functime
@logger(5)  #传入过期时间为5s进行处理
def  add(x,y=4):
time.sleep(3)
return  x+y
print (add(x=3,y=4))
print (add(3,4))
time.sleep(5)
print  (add(y=4,x=3))
print (add(3))

查看结果

2 写一个命令分发器

程序员可以方便的注册函数到某一个命令,用户输入命令时,路由到注册函数
如果此命令没有对应的注册函数,执行默认函数
用户输入用input(">>")

分析:
输入一个命令映射到一个函数,并执行这个函数,应该是cmd,fn 形式,此时字典整好满足此中需求
如果输入了某一个cmd命令后,没有找到函数,就要调用缺省的函数执行,这整好是字典的缺省函数
cmd是字符串

基本代码如下

#!/usr/bin/poython3.6
#conding:utf-8
#定义一个字典,其保存命令和函数的集合
commds={}
# 创建函数
def  fun1():
print  ("hello  fun1")
def  fun2():
print ("hello  fun2")
#创建默认函数
def  fundefault():
print  ("hello  default")

# 创建注册函数
def  register(name,fn):
commds[name]=fn
# 创建查询函数
def  printf():
while  True:
cmd=input(">>")
if cmd.strip() == 'quit':
return
commds.get(cmd,fundefault)()  #调用函数,若无存在,则调用默认函数
# 调用注册函数进行注册
register("fun1",fun1)
register("fun2",fun2)
#调用显示函数
printf()

查看结果如下

改善注册函数如下

#!/usr/bin/poython3.6
#conding:utf-8
#定义一个字典,其保存命令和函数的集合
commds={}
# 创建注册函数
def  register(name):  #通过柯里化进行处理
def  _warpper(fn):
commds[name]=fn
return  _warpper
# 创建查询函数
def  printf():
while  True:
cmd=input(">>")
if cmd.strip() == 'quit':
return
commds.get(cmd,fundefault)()  #调用函数,若无存在,则调用默认函数
# 调用注册函数进行注册
# 创建函数
@register('fun1')
def  fun1():
print  ("hello  fun1")
@register('fun2')
def  fun2():
print ("hello  fun2")
#创建默认函数
def  fundefault():
print  ("hello  default")
#调用显示函数
printf()

将调用函数和显示函数进行合并,并进行集中输入,如下

#!/usr/bin/poython3.6
#conding:utf-8
#定义一个字典,其保存命令和函数的集合
commds={}
# 创建注册函数
def  comm():
def  register(name):  #通过柯里化进行处理
def  _warpper(fn):
commds[name]=fn
return  _warpper
# 创建查询函数
def  printf():
while  True:
cmd=input(">>")
if cmd.strip() == 'quit':
return
commds.get(cmd,fundefault)()  #调用函数,若无存在,则调用默认函数
return   register,printf
register,printf=comm()
# 调用注册函数进行注册
# 创建函数
@register('fun1')
def  fun1():
print  ("hello  fun1")
@register('fun2')
def  fun2():
print ("hello  fun2")
#创建默认函数
def  fundefault():
print  ("hello  default")
#调用显示函数
printf()

#!/usr/bin/poython3.6
#conding:utf-8
from   functools  import partial
def  dispatcher():
commads={}
def reg(cmd,*args,**kwargs):
def _reg(fn):
func=partial(fn,*args,**kwargs)
commads[cmd]=func
return  func
return _reg
def  run():
while True:
cmd=input('>>')
if  cmd.strip()  == 'q'  or  cmd.strip() =='quit':
break
else:
commads.get(cmd,defaunlt)()

def  defaunlt():
print ('default')
return   reg,run
reg,run=dispatcher()
@reg('add',1,2,3,4)
def add(x,y,z,w):
print   (x+y+z+w)
@reg('sub',20,10)
def  sub(x,y):
print  (x-y)
run()

结果如下

3 实现base64编码和解码

1 简介

Base64是网络上最常见的用于传输8Bit字节码的编码方式之一,Base64就是一种基于64个可打印字符来表示二进制数据的方法。
Base64编码是从二进制到字符的过程,可用于在HTTP环境下传递较长的标识信息。采用Base64编码具有不可读性,需要解码后才能阅读。
Base64由于以上优点被广泛应用于计算机的各个领域,然而由于输出内容中包括两个以上“符号类”字符(+, /, =),不同的应用场景又分别研制了Base64的各种“变种”。为统一和规范化Base64的输出,Base62x被视为无符号化的改进版本。

标准的Base64并不适合直接放在URL里传输,因为URL编码器会把标准Base64中的“/”和“+”字符变为形如“%XX”的形式,而这些“%”号在存入数据库时还需要再进行转换,因为ANSI SQL中已将“%”号用作通配符。
为解决此问题,可采用一种用于URL的改进Base64编码,它在末尾填充'='号,并将标准Base64中的“+”和“/”分别改成了“-”和“_”,这样就免去了在URL编解码和数据库存储时所要作的转换,避免了编码信息长度在此过程中的增加,并统一了数据库、表单等处对象标识符的格式。

Base64要求把每三个8Bit的字节转换为四个6Bit的字节(3*8 = 4*6 = 24),然后把6Bit再添两位高位0,组成四个8Bit的字节,也就是说,转换后的字符串理论上将要比原来的长1/3。

2 规则

关于这个编码的规则:
①.把3个字符变成4个字符。
②每76个字符加一个换行符。
③.最后的结束符也要处理。

base64编码

import base64
source = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
def Base64(src):
ret = bytearray()  # 定义可变字节列表,一个字节为8bit
length = len(src)
r = 0  # r为记录的补0的数量
for offset in range(0, length, 3):  # 进行三位切割,三位8bbit切割成4位6bit,
if offset <= length - 3:  #此处是匹配前面的整3位的
triple = src[offset:offset + 3]
else:
triple = src[offset:]  # 截取后面的小于3位的
r = 3 - len(triple)  # 获取需要补0的位数
triple = triple + '\x00' * r  # 补几个0    bin(0x00)       '0b0'
# print  (triple,r)
b = int.from_bytes(triple.encode(), 'big')  # 大端模式为big,小端模式为little
for i in range(18, -1, -6):  # 进行移位操作
if i == 18:
index = b >> i
else:
index = b >> i & 0x3F  # 此处是进行运算 ,转换为二进制是 bin(0x3f)     '0b111111'
#  In [24]: int('0x3d',16)
# Out[24]: 61
# In [25]: chr(61)
# Out[25]: '='
ret.append(source[index])
for i in range(1, r + 1):
ret[-i] = 0x3D  # 0x3D 表示等号
return bytes(ret)
print('Base64',Base64('123456'))
print ('base64',base64.b64encode('123456'.encode('utf-8')))

结果如下

Base64解码

import  base64
#base64一定是四的倍数
from  collections  import OrderedDict
base_tb1="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
alphabet=OrderedDict(zip(base_tb1,range(64)))
def  Base64decode(src):
ret=bytearray()
length=len(src)
step=4
for offset  in  range(0,length,step):
tmp=0x00
block=src[offset:offset+step]  # 4位进行截取,abcd
for  i,c in  enumerate(reversed(block)):  # 进行反向操作处理,第一个是0 d
index= alphabet.get(c)  # 通过值找索引 ,其中负数表示没找到,find不抛异常
if  index==-1: #表示没找到
continue   #直接操作下一个
tmp += (index  << i*6)  # 此处第一个d是最后面的低6位,向左是高位,右边位低位,,第一次i=0表示没移动,
# 第二次i=1表示向左移动6位,依次类推abcd,最后将其加在一起。此处相当于将其进行了拼接,此时已经成为了24
ret.extend(tmp.to_bytes(3,'big')) # 将4个段进行切成3段,若有等号,则先不管
return   bytes(ret.rstrip(b'\x00'))  # 去掉多余的右边的0,是asscii的0,在最后的4变3的过程中,才会出现0,因为前面的都是整取

print ('Base64',Base64decode('abcd'))
print ('base64',base64.b64decode('abcd'.encode('utf-8')))

结果如下

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: