异步调用时自动关闭 #
当事件循环正在运行时,会创建一个任务来关闭 redis 连接,而不会阻塞事件循环。如果事件循环没有运行,就可以像之前一样调用
run_until_complete
。
async def close_redis(self):
if self.redis:
await self.redis.close()
def __del__(self):
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.create_task(self.close_redis())
else:
loop.run_until_complete(self.close_redis())
数据迁移(hash数据) #
脚本 #
import os
import json
import redis
import argparse
from loguru import logger
home_path = os.environ.get('HOME')
parser = argparse.ArgumentParser(description='Copy Reten Config')
parser.add_argument('--mode', help='Pull or Push data from Redis')
parser.add_argument('-ds', '--datasource', help='retens or events config data from Redis')
args = parser.parse_args()
source_redis = redis.StrictRedis(host='redis-product.com', port=6379, db=0)
def scan_keys_with_pattern(pattern):
# 初始游标
cursor = '0'
count_group = 0
while True:
# 使用 SCAN 命令进行迭代
cursor, keys = source_redis.scan(cursor=cursor, match=pattern)
# 游标为 '0' 表示迭代结束
if cursor == b'0' or cursor == 0 or cursor == '0':
break
# 将找到的键添加到结果列表中
count_group += 1
logger.info(f"{count_group} - {len(keys)} = {cursor}")
yield [key.decode('utf-8') for key in keys]
def append_2_file(data):
# 将数据转换为字典
decoded_data = {}
for key, value in data.items():
decoded_data[key.decode('utf-8')] = value.decode('utf-8')
# 将数据写入文件
with open(f'data_{args.datasource}.json', 'a') as file:
file.write(json.dumps(decoded_data) + '\n')
def pull():
# 连接到源 Redis 实例
# 从 Redis 中读取数据
all_keys = set()
if args.datasource == 'retens':
for keys in scan_keys_with_pattern('mmp:cfgs:*:retens'):
for key in keys:
if key not in all_keys:
all_keys.add(key)
else:
logger.warning(f"重复一次: {key}")
continue
data = source_redis.hgetall(key)
append_2_file(data=data)
elif args.datasource == 'events':
for keys in scan_keys_with_pattern('mmp:cfgs:*:events'):
for key in keys:
if key not in all_keys:
all_keys.add(key)
else:
logger.warning(f"重复一次: {key}")
continue
data = source_redis.hgetall(key)
append_2_file(data=data)
else:
logger.error(f"Invalid datasource: {args.datasource}")
return
def push():
# 连接到目标 Redis 实例
target_redis = redis.StrictRedis(host='localhost', port=6379, db=1)
# 读取文件内容并写入目标 Redis
if not args.datasource in ['retens', 'events']:
logger.error(f"Invalid datasource: {args.datasource}")
return
with open(os.path.join(home_path, f'data_{args.datasource}.json'), 'r') as file:
for l in file:
l = json.loads(l)
for key, value in l.items():
value_obj = json.loads(value)
pkg_name = value_obj["pkg_name"]
if args.datasource == 'retens':
ok = target_redis.hset(f'mmp:cfgs:{pkg_name}:retens', key, value)
elif args.datasource == 'events':
ok = target_redis.hset(f'mmp:cfgs:{pkg_name}:events', key, value)
else:
print(f"Invalid datasource: {args.datasource}")
logger.info(f'Writing {key} {value} to Redis -> {ok}')
if __name__ == "__main__":
if args.mode == 'pull':
pull()
elif args.mode == 'push':
push()
使用 #
python {thisscript}.py --mode=pull -ds=events
python {thisscript}.py --mode=push -ds=events
hash #
import redis
# 创建 Redis 连接
r = redis.Redis(host='localhost', port=6379, db=0)
# 写入数据到哈希
r.hset('my_hash', 'field1', 'value1')
r.hset('my_hash', mapping={'field2': 'value2', 'field3': 'value3'})
# 获取单个字段的值
value = r.hget('my_hash', 'field1')
print(f'field1: {value.decode("utf-8")}')
# 获取所有字段和值
all_fields = r.hgetall('my_hash')
print('All fields:')
for field, value in all_fields.items():
print(f'{field.decode("utf-8")}: {value.decode("utf-8")}')
# 查看哈希中字段的数量
field_count = r.hlen('my_hash')
print(f'Number of fields in my_hash: {field_count}')
事务 #
import asyncio
import aioredis
async def run_transaction(redis: aioredis.Redis):
async with redis.pipeline() as pipe:
# 开始事务
await pipe.multi()
# 在事务中执行多个命令
await pipe.set('key1', 'value1')
await pipe.set('key2', 'value2')
# 提交事务
results = await pipe.execute()
print('Transaction results:', results)
async def main():
# 创建 Redis 连接
redis = await aioredis.from_url("redis://localhost:6379", encoding="utf-8", decode_responses=True)
# 运行事务
await run_transaction(redis)
# 关闭连接
await redis.close()
# 运行异步主函数
if __name__ == "__main__":
asyncio.run(main())
乐观锁 #
详细解释
async with redis.pipeline() as pipe
: 使用pipeline
上下文管理器创建一个事务或管道。这会创建一个事务队列或管道。await pipe.multi()
: 开始一个事务。如果在管道中使用这个命令,则表示开始一个事务。如果不使用,则表示普通的管道操作。await pipe.execute()
: 提交事务或管道操作,并获取所有命令的结果。await pipe.watch('key')
: 监视指定的键,以便在事务提交之前确保这些键没有被其他客户端修改。如果在事务提交前键发生变化,事务会失败。aioredis.WatchError
: 捕获由于键被修改而导致的事务失败异常。
async def run_watch(redis: aioredis.Redis):
async with redis.pipeline() as pipe:
# 监视键
await pipe.watch('key1')
# 开始事务
await pipe.multi()
# 执行事务中的命令
await pipe.set('key1', 'value1')
await pipe.set('key2', 'value2')
# 提交事务
try:
results = await pipe.execute()
print('Watch transaction results:', results)
except aioredis.WatchError:
print('Transaction failed due to concurrent modification')
async def main():
# 创建 Redis 连接
redis = await aioredis.from_url("redis://localhost:6379", encoding="utf-8", decode_responses=True)
# 运行 WATCH 事务
await run_watch(redis)
# 关闭连接
await redis.close()
# 运行异步主函数
if __name__ == "__main__":
asyncio.run(main())
分布式锁 #
async def release_counter_lock(account_id: int, package_name: str, identifier: str):
lock_key = f"lock_{account_id}_{package_name}"
# Lua 脚本:如果锁的值与标识符匹配,则删除锁
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
# 执行 Lua 脚本
result = await rds.eval(lua_script, 1, lock_key, identifier)
# 如果返回值为 1,说明锁被删除
if result == 1:
return True
return False
async def acquire_counter_lock(account_id: int, package_name: str) -> str:
"""
True 为被锁定,暂时不可用
"""
acquire_time = 2
identifier = str(uuid.uuid4()) # 使用UUID生成唯一标识
lock_key = f"lock_{account_id}_{package_name}"
end = time.time() + acquire_time
while time.time() < end:
if await rds.setnx(lock_key, identifier): # 如果锁不存在,设置并返回True
await rds.expire(lock_key, 10)
return identifier # 成功获取锁,返回标识符
elif not await rds.ttl(lock_key):
await rds.expire(lock_key, 10)
time.sleep(0.01)
# 超时未能获取锁
return ''