异步调用时自动关闭#
当事件循环正在运行时,会创建一个任务来关闭 redis 连接,而不会阻塞事件循环。如果事件循环没有运行,就可以像之前一样调用 run_until_complete。
async def close_redis(self):
if self.redis:
await self.redis.close()
def __del__(self):
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.create_task(self.close_redis())
else:
loop.run_until_complete(self.close_redis())
数据迁移(hash数据)#
import os
import json
import redis
import argparse
from loguru import logger
home_path = os.environ.get('HOME')
parser = argparse.ArgumentParser(description='Copy Reten Config')
parser.add_argument('--mode', help='Pull or Push data from Redis')
parser.add_argument('-ds', '--datasource', help='retens or events config data from Redis')
args = parser.parse_args()
source_redis = redis.StrictRedis(host='redis-product.com', port=6379, db=0)
def scan_keys_with_pattern(pattern):
# 初始游标
cursor = '0'
count_group = 0
while True:
# 使用 SCAN 命令进行迭代
cursor, keys = source_redis.scan(cursor=cursor, match=pattern)
# 游标为 '0' 表示迭代结束
if cursor == b'0' or cursor == 0 or cursor == '0':
break
# 将找到的键添加到结果列表中
count_group += 1
logger.info(f"{count_group} - {len(keys)} = {cursor}")
yield [key.decode('utf-8') for key in keys]
def append_2_file(data):
# 将数据转换为字典
decoded_data = {}
for key, value in data.items():
decoded_data[key.decode('utf-8')] = value.decode('utf-8')
# 将数据写入文件
with open(f'data_{args.datasource}.json', 'a') as file:
file.write(json.dumps(decoded_data) + '\n')
def pull():
# 连接到源 Redis 实例
# 从 Redis 中读取数据
all_keys = set()
if args.datasource == 'retens':
for keys in scan_keys_with_pattern('mmp:cfgs:*:retens'):
for key in keys:
if key not in all_keys:
all_keys.add(key)
else:
logger.warning(f"重复一次: {key}")
continue
data = source_redis.hgetall(key)
append_2_file(data=data)
elif args.datasource == 'events':
for keys in scan_keys_with_pattern('mmp:cfgs:*:events'):
for key in keys:
if key not in all_keys:
all_keys.add(key)
else:
logger.warning(f"重复一次: {key}")
continue
data = source_redis.hgetall(key)
append_2_file(data=data)
else:
logger.error(f"Invalid datasource: {args.datasource}")
return
def push():
# 连接到目标 Redis 实例
target_redis = redis.StrictRedis(host='localhost', port=6379, db=1)
# 读取文件内容并写入目标 Redis
if not args.datasource in ['retens', 'events']:
logger.error(f"Invalid datasource: {args.datasource}")
return
with open(os.path.join(home_path, f'data_{args.datasource}.json'), 'r') as file:
for l in file:
l = json.loads(l)
for key, value in l.items():
value_obj = json.loads(value)
pkg_name = value_obj["pkg_name"]
if args.datasource == 'retens':
ok = target_redis.hset(f'mmp:cfgs:{pkg_name}:retens', key, value)
elif args.datasource == 'events':
ok = target_redis.hset(f'mmp:cfgs:{pkg_name}:events', key, value)
else:
print(f"Invalid datasource: {args.datasource}")
logger.info(f'Writing {key} {value} to Redis -> {ok}')
if __name__ == "__main__":
if args.mode == 'pull':
pull()
elif args.mode == 'push':
push()
python {thisscript}.py --mode=pull -ds=events
python {thisscript}.py --mode=push -ds=events
hash#
import redis
# 创建 Redis 连接
r = redis.Redis(host='localhost', port=6379, db=0)
# 写入数据到哈希
r.hset('my_hash', 'field1', 'value1')
r.hset('my_hash', mapping={'field2': 'value2', 'field3': 'value3'})
# 获取单个字段的值
value = r.hget('my_hash', 'field1')
print(f'field1: {value.decode("utf-8")}')
# 获取所有字段和值
all_fields = r.hgetall('my_hash')
print('All fields:')
for field, value in all_fields.items():
print(f'{field.decode("utf-8")}: {value.decode("utf-8")}')
# 查看哈希中字段的数量
field_count = r.hlen('my_hash')
print(f'Number of fields in my_hash: {field_count}')
import asyncio
import aioredis
async def run_transaction(redis: aioredis.Redis):
async with redis.pipeline() as pipe:
# 开始事务
await pipe.multi()
# 在事务中执行多个命令
await pipe.set('key1', 'value1')
await pipe.set('key2', 'value2')
# 提交事务
results = await pipe.execute()
print('Transaction results:', results)
async def main():
# 创建 Redis 连接
redis = await aioredis.from_url("redis://localhost:6379", encoding="utf-8", decode_responses=True)
# 运行事务
await run_transaction(redis)
# 关闭连接
await redis.close()
# 运行异步主函数
if __name__ == "__main__":
asyncio.run(main())
乐观锁#
详细解释