异步调用时自动关闭 #
当事件循环正在运行时,会创建一个任务来关闭 redis 连接,而不会阻塞事件循环。如果事件循环没有运行,就可以像之前一样调用
run_until_complete
。
async def close_redis(self):
if self.redis:
await self.redis.close()
def __del__(self):
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.create_task(self.close_redis())
else:
loop.run_until_complete(self.close_redis())
数据迁移(hash数据) #
脚本 #
import os
import json
import redis
import argparse
from loguru import logger
home_path = os.environ.get('HOME')
parser = argparse.ArgumentParser(description='Copy Reten Config')
parser.add_argument('--mode', help='Pull or Push data from Redis')
parser.add_argument('-ds', '--datasource', help='retens or events config data from Redis')
args = parser.parse_args()
source_redis = redis.StrictRedis(host='redis-product.com', port=6379, db=0)
def scan_keys_with_pattern(pattern):
# 初始游标
cursor = '0'
count_group = 0
while True:
# 使用 SCAN 命令进行迭代
cursor, keys = source_redis.scan(cursor=cursor, match=pattern)
# 游标为 '0' 表示迭代结束
if cursor == b'0' or cursor == 0 or cursor == '0':
break
# 将找到的键添加到结果列表中
count_group += 1
logger.info(f"{count_group} - {len(keys)} = {cursor}")
yield [key.decode('utf-8') for key in keys]
def append_2_file(data):
# 将数据转换为字典
decoded_data = {}
for key, value in data.items():
decoded_data[key.decode('utf-8')] = value.decode('utf-8')
# 将数据写入文件
with open(f'data_{args.datasource}.json', 'a') as file:
file.write(json.dumps(decoded_data) + '\n')
def pull():
# 连接到源 Redis 实例
# 从 Redis 中读取数据
all_keys = set()
if args.datasource == 'retens':
for keys in scan_keys_with_pattern('mmp:cfgs:*:retens'):
for key in keys:
if key not in all_keys:
all_keys.add(key)
else:
logger.warning(f"重复一次: {key}")
continue
data = source_redis.hgetall(key)
append_2_file(data=data)
elif args.datasource == 'events':
for keys in scan_keys_with_pattern('mmp:cfgs:*:events'):
for key in keys:
if key not in all_keys:
all_keys.add(key)
else:
logger.warning(f"重复一次: {key}")
continue
data = source_redis.hgetall(key)
append_2_file(data=data)
else:
logger.error(f"Invalid datasource: {args.datasource}")
return
def push():
# 连接到目标 Redis 实例
target_redis = redis.StrictRedis(host='localhost', port=6379, db=1)
# 读取文件内容并写入目标 Redis
if not args.datasource in ['retens', 'events']:
logger.error(f"Invalid datasource: {args.datasource}")
return
with open(os.path.join(home_path, f'data_{args.datasource}.json'), 'r') as file:
for l in file:
l = json.loads(l)
for key, value in l.items():
value_obj = json.loads(value)
pkg_name = value_obj["pkg_name"]
if args.datasource == 'retens':
ok = target_redis.hset(f'mmp:cfgs:{pkg_name}:retens', key, value)
elif args.datasource == 'events':
ok = target_redis.hset(f'mmp:cfgs:{pkg_name}:events', key, value)
else:
print(f"Invalid datasource: {args.datasource}")
logger.info(f'Writing {key} {value} to Redis -> {ok}')
if __name__ == "__main__":
if args.mode == 'pull':
pull()
elif args.mode == 'push':
push()
使用 #
python {thisscript}.py --mode=pull -ds=events
python {thisscript}.py --mode=push -ds=events