文章目录
一、python连接redis:
说明:平时工作当中为了读取方便都会采取redis缓存的方式那么对于redis的增、删、改、查操作就有需求
1.1、首先安装redis模块:
pip install redis
1.2、python连接redis数据库:
import redis
import time
from ReadYaml import *
class operate_redis(object):
def __init__(self,number):
# redis数据库连接重试功能和连接超时功能的连接
self._max_retries_count = 2 # 设置最大重试次数
self._conn_retries_count = 0 # 初始重试次数
self._conn_timeout = 10 # 连接超时时间为10秒
self.db = number # 连接对应redis数据库db13
# redis数据库连接
self._list = []
self.host = ec_mysql_redis1()['host']
self.pw = ec_mysql_redis1()['password']
self.port = ec_mysql_redis1()['port']
while self._conn_retries_count <= self._max_retries_count:
try:
self.redis = redis.Redis(
host=self.host,
password=self.pw,
db=self.db,
port=self.port,
socket_connect_timeout=self._conn_timeout)
break
except:
print(f"{
datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}redis数据库连接失败")
self._conn_retries_count += 1
time.sleep(2)
continue
1.2、查看表名称是否存在:
def if_exists(self, name: str):
# 校验输入的redis数据表名称是否存在
return self.redis.exists(name)
1.3、redis新增&更改操作:
def add_redis(self, name: str, key: str, value):
# 指定redis新增和更改操作,key值没有则新建,有则更新
try:
self.redis.hset(name, key, value)
except Exception as e:
raise e
else:
print("redis写入操作:success")
1.4、获取redis表下所有key值:
def get_keys(self, name: str):
# 获取redis表下所有key键值
if operate_redis().if_exists(name=name) == 1:
try:
for i in self.redis.hkeys(name):
list = self._list
list.append(i.decode('utf8'))
except Exception as e:
raise e
else:
return list
else:
print("redis下name有误请检查后重新输入")
1.5、根据表名称和key值获取对应的value:
def query_redis(self, name: str, key: str):
# 输入redis数据表名称及对应的key值精确查找value值
if operate_redis().if_exists(name=name) == 1:
try:
keys = operate_redis().get_keys(name=name)
if key in keys:
value = self.redis.hget(name=name, key=key)
return value.decode('utf8')
# print(value.decode('utf8'))
else:
print("该name名称下不存在该key值,请查证后重新输入")
except Exception as e:
raise e
else:
print("redis下name有误请检查后重新输入")
1.6、根据数据表名称返回keys、values列表:
def get_keys_values(self, name: str):
# 输入redis数据表的名称返回keys,values列表
if operate_redis().if_exists(name=name) == 1:
try:
keys = operate_redis().get_keys(name=name)
values = self._list
for key in keys:
value = self.redis.hget(name=name, key=key)
values.append(value.decode('utf8'))
return [keys, values]
except Exception as e:
raise e
else:
print("redis下name有误请检查后重新输入")
1.7、redis根据数据表名称及key值删除对应的value值
def deleted_redis(self, name: str, key: str):
if operate_redis().if_exists(name=name) == 1:
try:
keys = operate_redis().get_keys(name=name)
if key in keys:
self.redis.hdel(name, key)
print(f"删除{
name}下{
key}键值对成功")
else:
print("该name名称下不存在该key值,请查证后重新输入")
except Exception as e:
raise e
else:
print("redis下name有误请检查后重新输入")
from utils.commons.base.enviroments import read_environment
import redis, time
class Redis(object):
def __init__(self, name="xw", db='0'):
# redis连接重试功能和连接超时功能的DB连接
self._max_retries_count = 2 # 设置最大重试次数
self._conn_retries_count = 0 # 初始重试次数
self._conn_timeout = 100 # 连接超时时间为10秒
self.name = name
self.db = db
self._list = []
# redis数据库连接
if self.name == 'xw':
self.host = read_environment('env_datas')['redis']['xw']['host']
self.pwd = read_environment('env_datas')['redis']['xw']['pwd']
self.port = read_environment('env_datas')['redis']['xw']['port']
while self._conn_retries_count <= self._max_retries_count:
try:
self.redis_pool = redis.ConnectionPool(host=self.host, port=self.port, password=self.pwd, db=self.db)
# cursor = self.db.cursor()#创建游标
self.redis = redis.Redis(connection_pool=self.redis_pool)
print(f"**********redis连接success---当前环境为:{
read_environment('env')},连接名称为:{
self.name}")
break
except Exception as e:
print(f"**********redis连接error---当前环境为:{
read_environment('env')},连接名称为:{
self.name}")
raise e
time.sleep(2)
continue
def if_exists(self, name: str):
# 校验输入的redis数据表名称是否存在
return self.redis.exists(name)
# print(type(self.redis.exists(name)))
# ================================================= #
# ************** redis中string类型用法 ************** #
# ================================================= #
def str_get(self, name):
'''
string类型获取单个数据表对应的值
'''
try:
if self.if_exists(name) == 1:
rv = self.redis.get(name).decode()
print(f"\033[0;31;43m**********redis查询数据成功,数据表名称为:{
name}\033[0m")
return rv
else:
print(f'**********{
name} is not exit!')
except Exception as e:
raise e
def str_set(self, key, value, ex=None):
'''
string类型添加单个key-value
key:为redis项目数据表名称
value:为对应数据表名称的值
ex:设置redis值过期时间-秒,到期自动删除
'''
try:
self.redis.set(key, value, ex)
print(f"\033[0;33;41m**********redis添加数据成功,数据表名称为:{
key},数据值为:{
value}\33[0m")
except Exception as e:
raise e
# ================================================= #
# ************** 通用删除数据表用法 ************** #
# ================================================= #
def str_deleted(self, keys):
'''
删除数据表
'''
try:
self.redis.delete(keys)
print(f"\033[0;31;43m**********redis删除数据成功,数据表名称为:{
keys}\033[0m")
except Exception as e:
raise e
# ================================================= #
# ************** redis中dict类型用法 ************** #
# ================================================= #
def dict_add(self, name: str, key: str, value):
# 字典形式-指定redis新增和更改操作,key值没有则新建,有则更新
try:
self.redis.hset(name, key, value)
print(f"\033[0;33;41m**********redis添加数据成功,数据表名称为:{
name},数据值为:{
key}:{
value}\33[0m")
except Exception as e:
raise e
def dict_query(self, name: str, key: str):
# 输入redis数据表名称及对应的key值精确查找value值
if self.if_exists(name=name) == 1:
try:
keys = self.get_keys(name=name)
if key in keys:
value = self.redis.hget(name=name, key=key)
print(f"\033[0;31;43m**********redis查询数据成功,数据表名称为:{
name},查询key为:{
key}\33[0m")
return value.decode('utf8') # 二进制转换
else:
print("**********该name名称下不存在该key值,请查证后重新输入")
except Exception as e:
raise e
else:
print("**********redis下name有误请检查后重新输入")
def dict_deleted(self, name: str, key: str):
# 根据数据表名称及key值删除对应的value值
if self.if_exists(name=name) == 1:
try:
keys = self.get_keys(name=name)
if key in keys:
self.redis.hdel(name, key)
print(f"\033[0;33;41m**********删除{
name}下{
key}键值对成功\33[0m")
else:
print(f"**********数据表{
name}下不存在key-{
key},请查证后重新输入")
except Exception as e:
raise e
else:
print(f"**********redis下数据表名称{
name}有误请检查后重新输入")
def get_keys(self, name: str):
# 获取redis表下所有key键值
if self.if_exists(name) == 1:
try:
for i in self.redis.hkeys(name):
list = self._list
list.append(i.decode('utf8'))
except Exception as e:
raise e
else:
return list
else:
print(f"**********redis下数据表{
name}名称有误请检查后重新输入")
def get_keys_values(self, name: str):
# 输入redis数据表的名称返回keys,values列表
if self.if_exists(name=name) == 1:
try:
keys = self.get_keys(name=name)
values = self._list
for key in keys:
value = self.redis.hget(name=name, key=key)
values.append(value.decode('utf8'))
return [keys, values]
except Exception as e:
raise e
else:
print(f"**********redis下数据表{
name}名称有误请检查后重新输入")
if __name__ == "__main__":
Redis().str_set('demo:test', 'test1')
Redis().str_get('demo:test')
Redis().str_deleted('demo:test')
Redis().dict_add('demo', 'test', '[12,33]')
Redis().dict_query('demo', 'test')
Redis().dict_deleted('demo', 'test')