Python Redis 客户端

异步调用时自动关闭 #

当事件循环正在运行时,会创建一个任务来关闭 redis 连接,而不会阻塞事件循环。如果事件循环没有运行,就可以像之前一样调用 run_until_complete

async def close_redis(self):
    if self.redis:
        await self.redis.close()

def __del__(self):
    loop = asyncio.get_event_loop()
    if loop.is_running():
        asyncio.create_task(self.close_redis())
    else:
        loop.run_until_complete(self.close_redis())

数据迁移(hash数据) #

脚本 #

import os
import json
import redis
import argparse
from loguru import logger

home_path = os.environ.get('HOME')

parser = argparse.ArgumentParser(description='Copy Reten Config')
parser.add_argument('--mode', help='Pull or Push data from Redis')
parser.add_argument('-ds', '--datasource', help='retens or events config data from Redis')
args = parser.parse_args()

source_redis = redis.StrictRedis(host='redis-product.com', port=6379, db=0)
def scan_keys_with_pattern(pattern):
    # 初始游标
    cursor = '0'
    count_group = 0
    while True:
        # 使用 SCAN 命令进行迭代
        cursor, keys = source_redis.scan(cursor=cursor, match=pattern)

        # 游标为 '0' 表示迭代结束
        if cursor == b'0' or cursor == 0 or cursor == '0':
            break

        # 将找到的键添加到结果列表中
        count_group += 1
        logger.info(f"{count_group} - {len(keys)} = {cursor}")
        yield [key.decode('utf-8') for key in keys]

def append_2_file(data):
    # 将数据转换为字典
    decoded_data = {}
    for key, value in data.items():
        decoded_data[key.decode('utf-8')] = value.decode('utf-8')
    # 将数据写入文件
    with open(f'data_{args.datasource}.json', 'a') as file:
        file.write(json.dumps(decoded_data) + '\n')

def pull():
    # 连接到源 Redis 实例

    # 从 Redis 中读取数据
    all_keys = set()
    if args.datasource == 'retens':
        for keys in scan_keys_with_pattern('mmp:cfgs:*:retens'):
            for key in keys:
                if key not in all_keys:
                    all_keys.add(key)
                else:
                    logger.warning(f"重复一次: {key}")
                    continue
                data = source_redis.hgetall(key)
                append_2_file(data=data)
    elif args.datasource == 'events':
        for keys in scan_keys_with_pattern('mmp:cfgs:*:events'):
            for key in keys:
                if key not in all_keys:
                    all_keys.add(key)
                else:
                    logger.warning(f"重复一次: {key}")
                    continue
                data = source_redis.hgetall(key)
                append_2_file(data=data)
    else:
        logger.error(f"Invalid datasource: {args.datasource}")
        return

def push():
    # 连接到目标 Redis 实例
    target_redis = redis.StrictRedis(host='localhost', port=6379, db=1)

    # 读取文件内容并写入目标 Redis
    if not args.datasource in ['retens', 'events']:
        logger.error(f"Invalid datasource: {args.datasource}")
        return

    with open(os.path.join(home_path, f'data_{args.datasource}.json'), 'r') as file:
        for l in file:
            l = json.loads(l)
            for key, value in l.items():
                value_obj = json.loads(value)
                pkg_name = value_obj["pkg_name"]
                if args.datasource == 'retens':
                    ok = target_redis.hset(f'mmp:cfgs:{pkg_name}:retens', key, value)
                elif args.datasource == 'events':
                    ok = target_redis.hset(f'mmp:cfgs:{pkg_name}:events', key, value)
                else:
                    print(f"Invalid datasource: {args.datasource}")
                logger.info(f'Writing {key} {value} to Redis -> {ok}')

if __name__ == "__main__":
    if args.mode == 'pull':
        pull()
    elif args.mode == 'push':
        push()

使用 #

python {thisscript}.py --mode=pull -ds=events
python {thisscript}.py --mode=push -ds=events

hash #

import redis

# 创建 Redis 连接
r = redis.Redis(host='localhost', port=6379, db=0)

# 写入数据到哈希
r.hset('my_hash', 'field1', 'value1')
r.hset('my_hash', mapping={'field2': 'value2', 'field3': 'value3'})

# 获取单个字段的值
value = r.hget('my_hash', 'field1')
print(f'field1: {value.decode("utf-8")}')

# 获取所有字段和值
all_fields = r.hgetall('my_hash')
print('All fields:')
for field, value in all_fields.items():
    print(f'{field.decode("utf-8")}: {value.decode("utf-8")}')

# 查看哈希中字段的数量
field_count = r.hlen('my_hash')
print(f'Number of fields in my_hash: {field_count}')

事务 #

import asyncio
import aioredis

async def run_transaction(redis: aioredis.Redis):
    async with redis.pipeline() as pipe:
        # 开始事务
        await pipe.multi()
        
        # 在事务中执行多个命令
        await pipe.set('key1', 'value1')
        await pipe.set('key2', 'value2')
        
        # 提交事务
        results = await pipe.execute()
        
        print('Transaction results:', results)

async def main():
    # 创建 Redis 连接
    redis = await aioredis.from_url("redis://localhost:6379", encoding="utf-8", decode_responses=True)
    
    # 运行事务
    await run_transaction(redis)
    
    # 关闭连接
    await redis.close()

# 运行异步主函数
if __name__ == "__main__":
    asyncio.run(main())

乐观锁 #

详细解释

  • async with redis.pipeline() as pipe: 使用 pipeline 上下文管理器创建一个事务或管道。这会创建一个事务队列或管道。
  • await pipe.multi(): 开始一个事务。如果在管道中使用这个命令,则表示开始一个事务。如果不使用,则表示普通的管道操作。
  • await pipe.execute(): 提交事务或管道操作,并获取所有命令的结果。
  • await pipe.watch('key'): 监视指定的键,以便在事务提交之前确保这些键没有被其他客户端修改。如果在事务提交前键发生变化,事务会失败。
  • aioredis.WatchError: 捕获由于键被修改而导致的事务失败异常。
async def run_watch(redis: aioredis.Redis):
    async with redis.pipeline() as pipe:
        # 监视键
        await pipe.watch('key1')
        
        # 开始事务
        await pipe.multi()
        
        # 执行事务中的命令
        await pipe.set('key1', 'value1')
        await pipe.set('key2', 'value2')
        
        # 提交事务
        try:
            results = await pipe.execute()
            print('Watch transaction results:', results)
        except aioredis.WatchError:
            print('Transaction failed due to concurrent modification')

async def main():
    # 创建 Redis 连接
    redis = await aioredis.from_url("redis://localhost:6379", encoding="utf-8", decode_responses=True)
    
    # 运行 WATCH 事务
    await run_watch(redis)
    
    # 关闭连接
    await redis.close()

# 运行异步主函数
if __name__ == "__main__":
    asyncio.run(main())

分布式锁 #

async def release_counter_lock(account_id: int, package_name: str, identifier: str):
	lock_key = f"lock_{account_id}_{package_name}"
	# Lua 脚本:如果锁的值与标识符匹配,则删除锁
	lua_script = """
	if redis.call("get", KEYS[1]) == ARGV[1] then
		return redis.call("del", KEYS[1])
	else
		return 0
	end
	"""
	# 执行 Lua 脚本
	result = await rds.eval(lua_script, 1, lock_key, identifier)
	# 如果返回值为 1,说明锁被删除
	if result == 1:
		return True
	return False

async def acquire_counter_lock(account_id: int, package_name: str) -> str:
	"""
	True 为被锁定,暂时不可用
	"""
	acquire_time = 2
	identifier = str(uuid.uuid4())  # 使用UUID生成唯一标识
	lock_key = f"lock_{account_id}_{package_name}"
	end = time.time() + acquire_time
	while time.time() < end:
		if await rds.setnx(lock_key, identifier):  # 如果锁不存在,设置并返回True
			await rds.expire(lock_key, 10)
			return identifier  # 成功获取锁,返回标识符
		elif not await rds.ttl(lock_key):
			await rds.expire(lock_key, 10)
		time.sleep(0.01)
	# 超时未能获取锁
	return ''