布隆过滤器

Hello#

新建#

# 创建一个预期容量100万,错误率1%的布隆过滤器
BF.RESERVE my_bloom 0.01 1000000

添加元素#

BF.ADD my_bloom item1
BF.MADD my_bloom item2 item3 item4

检查元素是否存在#

BF.EXISTS my_bloom item1  # 返回1(可能存在)
BF.EXISTS my_bloom itemX  # 返回0(绝对不存在)

批量查询#

BF.MEXISTS my_bloom item1 itemX

查看过滤器信息#

命令: BF.DEBUG my_bloom

total_items:4,num_blooms:1,bytes:2097152 bits:16777216 hashes:7 hashwidth:64 capacity:1750350 items:4 error_ratio:0.01

命令: TYPE my_bloom

exbloom--

重要特性说明#

特性说明
误判率可能误报(假阳性),但不会漏报(假阴性)
空间效率100万元素+1%误判率约占用1.7MB内存
不可删除元素标准布隆过滤器不支持删除,可用Cuckoo FilterCF命令)替代
自动扩容如果不使用RESERVE,过滤器会动态扩容,但错误率可能上升

Python 代码示例#

import redis

# 连接Redis
r = redis.Redis(host='localhost', port=6379)

# 创建布隆过滤器(如果不存在)
try:
    r.execute_command("BF.RESERVE", "user_filter", 0.01, 1000000)
except redis.exceptions.ResponseError as e:
    if "item exists" not in str(e):
        raise

# 添加数据
r.execute_command("BF.ADD", "user_filter", "user123")
r.execute_command("BF.MADD", "user_filter", "user456", "user789")

# 检查是否存在
print(r.execute_command("BF.EXISTS", "user_filter", "user123"))  # 1
print(r.execute_command("BF.EXISTS", "user_filter", "unknown"))  # 0

# 重建
## 1. 查询所有需要保留的元素
existing_items = get_all_valid_items()  # 自定义获取有效元素的逻辑
## 2. 删除旧过滤器
r.delete("my_bloom")
## 3. 创建新过滤器并重新添加有效元素
r.execute_command("BF.RESERVE", "my_bloom", 0.01, 1000000)
r.execute_command("BF.MADD", "my_bloom", *existing_items)

Redis集群部署

以下是为 Redis 7.2.4(最新版本-我替换成了latest)编写的完整 docker-compose.yml 文件,包含 6 个节点 (3 主 + 3 从) 的完整配置:

单个部署#

docker run --name redis-single -p 6379:6379 -v $(pwd)/data/single:/data redis:latest redis-server --appendonly yes

安装#

version: '3.8'

services:
  redis-7000:
    image: redis:latest
    container_name: redis-7000
    command: redis-server /usr/local/etc/redis/redis.conf
    volumes:
      - ./config/redis-7000.conf:/usr/local/etc/redis/redis.conf
      - ./data/7000:/data
    networks:
      - redis-cluster-net
    ports:
      - "7000:7000"
      - "17000:17000"

  redis-7001:
    image: redis:latest
    container_name: redis-7001
    command: redis-server /usr/local/etc/redis/redis.conf
    volumes:
      - ./config/redis-7001.conf:/usr/local/etc/redis/redis.conf
      - ./data/7001:/data
    networks:
      - redis-cluster-net
    ports:
      - "7001:7001"
      - "17001:17001"

  redis-7002:
    image: redis:latest
    container_name: redis-7002
    command: redis-server /usr/local/etc/redis/redis.conf
    volumes:
      - ./config/redis-7002.conf:/usr/local/etc/redis/redis.conf
      - ./data/7002:/data
    networks:
      - redis-cluster-net
    ports:
      - "7002:7002"
      - "17002:17002"

  redis-7003:
    image: redis:latest
    container_name: redis-7003
    command: redis-server /usr/local/etc/redis/redis.conf
    volumes:
      - ./config/redis-7003.conf:/usr/local/etc/redis/redis.conf
      - ./data/7003:/data
    networks:
      - redis-cluster-net
    ports:
      - "7003:7003"
      - "17003:17003"

  redis-7004:
    image: redis:latest
    container_name: redis-7004
    command: redis-server /usr/local/etc/redis/redis.conf
    volumes:
      - ./config/redis-7004.conf:/usr/local/etc/redis/redis.conf
      - ./data/7004:/data
    networks:
      - redis-cluster-net
    ports:
      - "7004:7004"
      - "17004:17004"

  redis-7005:
    image: redis:latest
    container_name: redis-7005
    command: redis-server /usr/local/etc/redis/redis.conf
    volumes:
      - ./config/redis-7005.conf:/usr/local/etc/redis/redis.conf
      - ./data/7005:/data
    networks:
      - redis-cluster-net
    ports:
      - "7005:7005"
      - "17005:17005"

networks:
  redis-cluster-net:
    driver: bridge

配套操作步骤#

  1. 创建目录和配置文件
mkdir -p redis-cluster/{config,data/{7000,7001,7002,7003,7004,7005}}
cd redis-cluster
  1. 生成所有配置文件(一键生成脚本)
for port in {7000..7005}; do
  cat > config/redis-${port}.conf <<EOF
port ${port}
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes
dir /data
# 重要:替换为你的宿主机IP(非容器IP)
cluster-announce-ip 192.168.1.100
cluster-announce-port ${port}
cluster-announce-bus-port 1${port}
EOF
done
  1. 启动所有容器
podman-compose up -d

或者

Python Redis 客户端

异步调用时自动关闭#

当事件循环正在运行时,会创建一个任务来关闭 redis 连接,而不会阻塞事件循环。如果事件循环没有运行,就可以像之前一样调用 run_until_complete

async def close_redis(self):
    if self.redis:
        await self.redis.close()

def __del__(self):
    loop = asyncio.get_event_loop()
    if loop.is_running():
        asyncio.create_task(self.close_redis())
    else:
        loop.run_until_complete(self.close_redis())

数据迁移(hash数据)#

脚本#

import os
import json
import redis
import argparse
from loguru import logger

home_path = os.environ.get('HOME')

parser = argparse.ArgumentParser(description='Copy Reten Config')
parser.add_argument('--mode', help='Pull or Push data from Redis')
parser.add_argument('-ds', '--datasource', help='retens or events config data from Redis')
args = parser.parse_args()

source_redis = redis.StrictRedis(host='redis-product.com', port=6379, db=0)
def scan_keys_with_pattern(pattern):
    # 初始游标
    cursor = '0'
    count_group = 0
    while True:
        # 使用 SCAN 命令进行迭代
        cursor, keys = source_redis.scan(cursor=cursor, match=pattern)

        # 游标为 '0' 表示迭代结束
        if cursor == b'0' or cursor == 0 or cursor == '0':
            break

        # 将找到的键添加到结果列表中
        count_group += 1
        logger.info(f"{count_group} - {len(keys)} = {cursor}")
        yield [key.decode('utf-8') for key in keys]

def append_2_file(data):
    # 将数据转换为字典
    decoded_data = {}
    for key, value in data.items():
        decoded_data[key.decode('utf-8')] = value.decode('utf-8')
    # 将数据写入文件
    with open(f'data_{args.datasource}.json', 'a') as file:
        file.write(json.dumps(decoded_data) + '\n')

def pull():
    # 连接到源 Redis 实例

    # 从 Redis 中读取数据
    all_keys = set()
    if args.datasource == 'retens':
        for keys in scan_keys_with_pattern('mmp:cfgs:*:retens'):
            for key in keys:
                if key not in all_keys:
                    all_keys.add(key)
                else:
                    logger.warning(f"重复一次: {key}")
                    continue
                data = source_redis.hgetall(key)
                append_2_file(data=data)
    elif args.datasource == 'events':
        for keys in scan_keys_with_pattern('mmp:cfgs:*:events'):
            for key in keys:
                if key not in all_keys:
                    all_keys.add(key)
                else:
                    logger.warning(f"重复一次: {key}")
                    continue
                data = source_redis.hgetall(key)
                append_2_file(data=data)
    else:
        logger.error(f"Invalid datasource: {args.datasource}")
        return

def push():
    # 连接到目标 Redis 实例
    target_redis = redis.StrictRedis(host='localhost', port=6379, db=1)

    # 读取文件内容并写入目标 Redis
    if not args.datasource in ['retens', 'events']:
        logger.error(f"Invalid datasource: {args.datasource}")
        return

    with open(os.path.join(home_path, f'data_{args.datasource}.json'), 'r') as file:
        for l in file:
            l = json.loads(l)
            for key, value in l.items():
                value_obj = json.loads(value)
                pkg_name = value_obj["pkg_name"]
                if args.datasource == 'retens':
                    ok = target_redis.hset(f'mmp:cfgs:{pkg_name}:retens', key, value)
                elif args.datasource == 'events':
                    ok = target_redis.hset(f'mmp:cfgs:{pkg_name}:events', key, value)
                else:
                    print(f"Invalid datasource: {args.datasource}")
                logger.info(f'Writing {key} {value} to Redis -> {ok}')

if __name__ == "__main__":
    if args.mode == 'pull':
        pull()
    elif args.mode == 'push':
        push()

使用#

python {thisscript}.py --mode=pull -ds=events
python {thisscript}.py --mode=push -ds=events

hash#

import redis

# 创建 Redis 连接
r = redis.Redis(host='localhost', port=6379, db=0)

# 写入数据到哈希
r.hset('my_hash', 'field1', 'value1')
r.hset('my_hash', mapping={'field2': 'value2', 'field3': 'value3'})

# 获取单个字段的值
value = r.hget('my_hash', 'field1')
print(f'field1: {value.decode("utf-8")}')

# 获取所有字段和值
all_fields = r.hgetall('my_hash')
print('All fields:')
for field, value in all_fields.items():
    print(f'{field.decode("utf-8")}: {value.decode("utf-8")}')

# 查看哈希中字段的数量
field_count = r.hlen('my_hash')
print(f'Number of fields in my_hash: {field_count}')

事务#

import asyncio
import aioredis

async def run_transaction(redis: aioredis.Redis):
    async with redis.pipeline() as pipe:
        # 开始事务
        await pipe.multi()
        
        # 在事务中执行多个命令
        await pipe.set('key1', 'value1')
        await pipe.set('key2', 'value2')
        
        # 提交事务
        results = await pipe.execute()
        
        print('Transaction results:', results)

async def main():
    # 创建 Redis 连接
    redis = await aioredis.from_url("redis://localhost:6379", encoding="utf-8", decode_responses=True)
    
    # 运行事务
    await run_transaction(redis)
    
    # 关闭连接
    await redis.close()

# 运行异步主函数
if __name__ == "__main__":
    asyncio.run(main())

乐观锁#

详细解释

大key、热key问题

如何找出优化大Key与热Key,产生的原因和问题_云数据库 Redis 版(Redis)-阿里云帮助中心#


在使用Redis的过程中,如果未能及时发现并处理Big keys(下文称为“大Key”)与Hotkeys(下文称为“热Key”),可能会导致服务性能下降、用户体验变差,甚至引发大面积故障。本文将介绍大Key与热Key产生的原因、其可能引发的问题及如何快速找出大Key与热Key并将其优化的方案。

名词解释
大Key通常以Key的大小和Key中成员的数量来综合判定,例如:- Key本身的数据量过大:一个String类型的Key,它的值为5 MB。- Key中的成员数过多:一个ZSET类型的Key,它的成员数量为10,000个。- Key中成员的数据量过大:一个Hash类型的Key,它的成员数量虽然只有1,000个但这些成员的Value(值)总大小为100 MB。
热Key通常以其接收到的Key被请求频率来判定,例如:- QPS集中在特定的Key:Redis实例的总QPS(每秒查询率)为10,000,而其中一个Key的每秒访问量达到了7,000。- 带宽使用率集中在特定的Key:对一个拥有上千个成员且总大小为1 MB的HASH Key每秒发送大量的HGETALL操作请求。- CPU使用时间占比集中在特定的Key:对一个拥有数万个成员的Key(ZSET类型)每秒发送大量的ZRANGE操作请求。

说明

上述例子中的具体数值仅供参考,在实际业务中,您需要根据Redis的实际业务场景进行综合判断。

大Key和热Key引发的问题#

类别说明
大Key- 客户端执行命令的时长变慢。- Redis内存达到maxmemory参数定义的上限引发操作阻塞或重要的Key被逐出,甚至引发内存溢出(Out Of Memory)。- 集群架构下,某个数据分片的内存使用率远超其他数据分片,无法使数据分片的内存资源达到均衡。- 对大Key执行读请求,会使Redis实例的带宽使用率被占满,导致自身服务变慢,同时易波及相关的服务。- 对大Key执行删除操作,易造成主库较长时间的阻塞,进而可能引发同步中断或主从切换。
热Key- 占用大量的CPU资源,影响其他请求并导致整体性能降低。- 集群架构下,产生访问倾斜,即某个数据分片被大量访问,而其他数据分片处于空闲状态,可能引起该数据分片的连接数被耗尽,新的连接建立请求被拒绝等问题。- 在抢购或秒杀场景下,可能因商品对应库存Key的请求量过大,超出Redis处理能力造成超卖。- 热Key的请求压力数量超出Redis的承受能力易造成缓存击穿,即大量请求将被直接指向后端的存储层,导致存储访问量激增甚至宕机,从而影响其他业务。

大Key和热Key产生的原因#

未正确使用Redis、业务规划不足、无效数据的堆积、访问量突增等都会产生大Key与热Key,如:

Redis基础

基础理论#

概况#

Redis (Remote Dictionary Server) 是一个开源的、支持网络、基于内存、可选持久性的键值对数据库。它支持多种类型的数据结构,如字符串(strings)、列表(lists)、集合(sets)、有序集合(sorted sets)以及散列(hashes)、位图(bitmaps)、超日志(hyperloglogs)和地理空间(geospatial)索引半径查询。

特点包括:

  • 高性能:因为数据主要在内存中进行操作,读写速度非常快。
  • 丰富的数据类型:提供多种数据结构来满足不同场景下的需求。
  • 原子性:所有操作都是原子性的,支持事务(通过MULTI/EXEC命令)。
  • 丰富的功能:支持发布订阅、Lua脚本、事务等。

数据结构#

Redis提供的数据结构包括: Redis 支持多种数据类型,以下是一些常用的数据类型以及它们的简要描述和使用场景:

数据类型描述使用场景
String二进制安全字符串,最大可以存储 512MB存储文本或二进制数据,如缓存用户个人信息等
List有序集合,按插入顺序排序消息队列、时间线、最新消息列表
Set无序集合,元素不重复标签、社交网络中的朋友关系等
Sorted Set有序集合,元素不重复,并且每个元素都会关联一个浮点数分数排行榜、带权重的集合
Hash键值对集合,适用于存储对象存储、访问和修改对象属性
Bitmaps通过位来表示数据的数据类型,适合做计数统计在线状态、特性标志、统计等
HyperLogLog近似去重计数的数据结构大数据量的计数,如统计独立 IP 访问数量
Geospatial存储地理位置信息,并进行相关地理操作储存经纬度,查询附近的地点
StreamsRedis 5.0 新增的数据类型,是一个可持久化的日志数据结构实现消息队列,发布/订阅模式,日志记录

Redis 之所以快速,主要原因有以下几点:#

  1. 内存存储:Redis 将所有数据存储在内存中,内存的读写速度远高于硬盘。
  2. 数据结构简单:Redis 的数据结构设计简单直观,易于高效操作。
  3. 非阻塞 IO:Redis 使用非阻塞 IO 和多路复用技术,可以处理多个并发连接。
  4. 单线程模型:Redis 大部分操作是单线程执行,避免了多线程的上下文切换开销。

Redis高性能IO模型#

Redis的IO模型使用的是非阻塞IO复用技术,主要是epoll作为IO多路复用技术的实现方式。它通过单线程事件循环来处理所有客户端请求,确保绝大部分请求都是非阻塞的,并且使用异步编程模式来提高性能。

引用一些数据的话,可以提及以下几点:

  • 内存读写速度:NVMe SSD 的 IOPS 可以达到数十万到数百万,而内存的 IOPS 可以达到千万级别,延迟通常小于100微秒。相比之下,内存的速度通常是硬盘的数千倍甚至更高。
  • 性能表现:根据 Redis 官方给出的数据,在一个标准 Linux 系统上,Redis 可以达到每秒10万级别的读写操作(这个数字可能因配置、数据类型和具体操作而异)。

以上数据只是为了说明 Redis 为什么快速,具体的性能指标会根据使用环境、硬件配置以及具体的工作负载而有所变化。

Redis进阶

主从架构-主从同步#

在Redis中,主从同步是一种常用的数据复制方式,它允许一个或多个从服务器(slave)获得与主服务器(master)相同的数据副本。这种架构提供了数据的冗余和读取扩展性。

  1. 全量复制:当一个从服务器第一次连接到主服务器时,或是由于某些原因需要重新同步时,会进行全量复制。在这个过程中,主服务器会生成一个当前所有数据的快照,并将这个快照发送至请求同步的从服务器。从服务器接收到数据后,加载到自己的数据空间内。

  2. 部分复制:一旦完成了全量复制,如果从服务器断开连接又重新连接,且中断时间不长,主服务器可以只发送这段时间内发生变化的数据给从服务器,而不是再次进行全量复制。这依赖于主服务器的复制积压缓冲区来存储最近的写命令。

  3. 同步策略:为了保证数据的一致性,从服务器在初始同步完成之前不会对外提供服务。在日常运行时,主服务器会将写命令同时发送给所有的从服务器,以此来保证数据的实时一致性。

主从同步使得从服务器可以承担读操作,减轻主服务器的负载,同时也可以在主服务器遇到故障时,进行故障转移。

主从哨兵架构#

Redis哨兵(Sentinel)系统是用于管理多个Redis服务器的系统。该体系结构具有以下特点:

  1. 监控:哨兵会监控主从服务器是否正常运作。
  2. 自动故障转移:如果主服务器出现故障,哨兵可以自动选举新的主服务器,并让原来的从服务器指向新的主服务器。
  3. 配置提供者:哨兵还会将当前的主服务器地址提供给客户端,确保客户端总是连接到正确的主服务器。

哨兵机制通过这些功能增加了Redis环境的高可用性和稳定性。

切片集群-Redis Cluster#

Redis Cluster是Redis的分布式解决方案。它支持数据的水平分片,以下是其关键特性:

  1. 自动分片:��动将数据分布在不同的节点上,每个节点只保存整个数据集的一部分。
  2. 高可用性:采用主从复制模型,即使在多个节点失败的情况下也能保证服务的可用性。
  3. 无中心设计:没有中心节点,每个节点都保存着整个集群状态的一部分,节点之间通过Gossip协议交换信息。

Redis Cluster通过对键进行CRC16计算并对16384取余数来决定将键分配到哪个槽位,每个节点负责一部分槽位,从而实现负载均衡。

Redis Cluster 通信开销#

由于Redis Cluster节点间需要频繁地交换消息以维护集群状态,因此会产生额外的通信开销:

  1. Gossip通信:节点间通过Gossip协议定期交换信息,包括数据迁移、故障检测等。
  2. 重定向操作:客户端可能会尝试向不包含数据所在槽的节点发起请求,这时节点会返回一个重定向信息

切片集群-Codis

Redis 分布式锁是一种用于多个计算节点之间同步访问共享资源的机制。在分布式系统中,当多个进程需要同时访问某些数据或执行某些任务时,为了避免竞态条件(race conditions)和数据不一致,通常需要使用分布式锁来保证在同一时间只有一个进程能够执行特定的操作。

Redis 分布式锁实现#

一个常见的Redis分布式锁实现是使用 Redis 的 SETNX 命令(Set if not exists)。这个命令只在键不存在的情况下设置键的值,并且返回是否成功设置。由于 SETNX 是原子操作,因此可以用来实现锁的功能。

另一个更加推荐的方式是使用 Redis 2.6.12 版本引入的 SET 命令结合选项 NX(表示只有键不存在时才进行设置)和 PX(给键设置过期时间,单位为毫秒),这可以保证即使在客户端崩溃的情况下,锁也会在一定时间后自动释放,防止死锁。

使用 Redis 分布式锁的步骤:#

  1. 尝试获取锁

    • 使用 SET key value NX PX milliseconds 进行设置。
    • 如果返回 OK,则获取锁成功。
    • 如果返回 nil,则获取锁失败。
  2. 执行业务逻辑

    • 在持有锁的时间内执行必要的操作。
  3. 释放锁

Redis常用命令

scan 替代 keys#

SCAN 0 MATCH key* COUNT 100

查看类型type#

type#

使用了 GET 命令,这个命令通常用于获取字符串类型的值。如果key_hi 这个键在 Redis 中存储的不是字符串类型的值,而是其他类型(如列表、集合、哈希等),那么执行 GET 命令就会返回错误:

(error) WRONGTYPE Operation against a key holding the wrong kind of value

要解决这个问题,首先需要确定 key_hi 键存储的值的类型。你可以使用 TYPE 命令来检查键的类型:

TYPE mmp:cfgs:com.easypeso:retens

查看list#

  • 查看 List 长度LLEN mylist 0 -1
  • 查看 List 数据内容LRANGE mylist 0 -1

大key排查#

遍历所有key根据内存占用排序#

redis-cli -h r-1233211234567.redis.singapore.rds.aliyuncs.com -p 6379 --scan | while read k; do   size=$(redis-cli -h r-1233211234567.redis.singapore.rds.aliyuncs.com -p 6379 MEMORY USAGE "$k");   echo "$size $k"; done | sort -nr | head -300