封装redis
1、将Redis操作封装成一个类,方便使用时调用:下面说明各个操作的函数
(1)构造函数:构造连接Redis函数,并进行异常处理def __init__(self,ip,password,port=6379,db=0)
(2)str类型获取key数据:def str_get(self,k)
(3)str类型保存key数据,注意失效时间设置默认值:def str_set(self,k,v,time=None)
(4)删除key数据:删除前先进行判断该值是否存在:def delete(self,k)
(5)哈希类型:获取name下key数据:def hash_get(self,name,k)
(6)哈希类型:保存key数据def hash_set(self,name,k,v)
(7)哈希类型:获取name所有的数据:def hash_getall
(8)删除哈希name下key数据:def hash_del(self,name,k)
(9)清空Redis:def clean_redis(self)
import redis# r = redis.Redis()class MyRedis(): def __init__(self,ip,password,port=6379,db=0): #构造函数 try: self.r = redis.Redis(host=ip,password=password,port=port,db=db) # r = redis.ConnectionPool(host=ip,password=password,port=port,db=db) except Exception as e: print('redis连接失败,错误信息%s'%e) def str_get(self,k): res = self.r.get(k) if res: return res.decode()#获取数据要有返回值,所以要有返回值 def str_set(self,k,v,time=None): self.r.set(k,v,time) def delete(self,k): tag = self.r.exists(k) #判断这个key是否存在 if tag: self.r.delete(k) print('删除成功') else: print('这个key不存在') def hash_get(self,name,k): #无论key是否存在,都不会报错,所以不用写try res = self.r.hget(name,k) if res: return res.decode() def hash_set(self,name,k,v): self.r.hset(name,k,v) def hash_getall(self,name): data = {} # {b'12': b'1212', b'3': b'sdad', b'4': b'asdadsa'} res = self.r.hgetall(name) if res: for k,v in res.items(): k = k.decode() v = v.decode() data[k]=v return data def hash_del(self,name,k): res = self.r.hdel(name,k) if res:#因为删除成功会返回1,删除失败返回0 print('删除成功') return 1 else: print('删除失败,该key不存在') return 0 @property#定义为属性方法,以后可以直接调用 def clean_redis(self): self.r.flushdb() #清1空redis print('清空redis成功!') return 0