Python Redis 客户端

异步调用时自动关闭 #

当事件循环正在运行时,会创建一个任务来关闭 redis 连接,而不会阻塞事件循环。如果事件循环没有运行,就可以像之前一样调用 run_until_complete

async def close_redis(self):
    if self.redis:
        await self.redis.close()

def __del__(self):
    loop = asyncio.get_event_loop()
    if loop.is_running():
        asyncio.create_task(self.close_redis())
    else:
        loop.run_until_complete(self.close_redis())

数据迁移(hash数据) #

脚本 #

import os
import json
import redis
import argparse
from loguru import logger

home_path = os.environ.get('HOME')

parser = argparse.ArgumentParser(description='Copy Reten Config')
parser.add_argument('--mode', help='Pull or Push data from Redis')
parser.add_argument('-ds', '--datasource', help='retens or events config data from Redis')
args = parser.parse_args()

source_redis = redis.StrictRedis(host='redis-product.com', port=6379, db=0)
def scan_keys_with_pattern(pattern):
    # 初始游标
    cursor = '0'
    count_group = 0
    while True:
        # 使用 SCAN 命令进行迭代
        cursor, keys = source_redis.scan(cursor=cursor, match=pattern)

        # 游标为 '0' 表示迭代结束
        if cursor == b'0' or cursor == 0 or cursor == '0':
            break

        # 将找到的键添加到结果列表中
        count_group += 1
        logger.info(f"{count_group} - {len(keys)} = {cursor}")
        yield [key.decode('utf-8') for key in keys]

def append_2_file(data):
    # 将数据转换为字典
    decoded_data = {}
    for key, value in data.items():
        decoded_data[key.decode('utf-8')] = value.decode('utf-8')
    # 将数据写入文件
    with open(f'data_{args.datasource}.json', 'a') as file:
        file.write(json.dumps(decoded_data) + '\n')

def pull():
    # 连接到源 Redis 实例

    # 从 Redis 中读取数据
    all_keys = set()
    if args.datasource == 'retens':
        for keys in scan_keys_with_pattern('mmp:cfgs:*:retens'):
            for key in keys:
                if key not in all_keys:
                    all_keys.add(key)
                else:
                    logger.warning(f"重复一次: {key}")
                    continue
                data = source_redis.hgetall(key)
                append_2_file(data=data)
    elif args.datasource == 'events':
        for keys in scan_keys_with_pattern('mmp:cfgs:*:events'):
            for key in keys:
                if key not in all_keys:
                    all_keys.add(key)
                else:
                    logger.warning(f"重复一次: {key}")
                    continue
                data = source_redis.hgetall(key)
                append_2_file(data=data)
    else:
        logger.error(f"Invalid datasource: {args.datasource}")
        return

def push():
    # 连接到目标 Redis 实例
    target_redis = redis.StrictRedis(host='localhost', port=6379, db=1)

    # 读取文件内容并写入目标 Redis
    if not args.datasource in ['retens', 'events']:
        logger.error(f"Invalid datasource: {args.datasource}")
        return

    with open(os.path.join(home_path, f'data_{args.datasource}.json'), 'r') as file:
        for l in file:
            l = json.loads(l)
            for key, value in l.items():
                value_obj = json.loads(value)
                pkg_name = value_obj["pkg_name"]
                if args.datasource == 'retens':
                    ok = target_redis.hset(f'mmp:cfgs:{pkg_name}:retens', key, value)
                elif args.datasource == 'events':
                    ok = target_redis.hset(f'mmp:cfgs:{pkg_name}:events', key, value)
                else:
                    print(f"Invalid datasource: {args.datasource}")
                logger.info(f'Writing {key} {value} to Redis -> {ok}')

if __name__ == "__main__":
    if args.mode == 'pull':
        pull()
    elif args.mode == 'push':
        push()

使用 #

python {thisscript}.py --mode=pull -ds=events
python {thisscript}.py --mode=push -ds=events