V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
RedisMasterNode
V2EX  ›  Redis

Redis 6.0 权限控制基于 Bitmap 实现

  •  
  •   RedisMasterNode · 2020-03-14 22:03:31 +08:00 · 6968 次点击
    这是一个创建于 1762 天前的主题,其中的信息可能已经有所发展或是发生改变。

    Redis 6.0 在 4 月 30 日就要和大家正式见面了,现在redis.io上已经提供了 RC 版本。在之前的博客中,已经介绍过权限控制新功能的一些用法,主要来源于作者 Antirez 在 Redis Day 上的一些演示。Antirez 在最后提到,ACL 的主要实现是基于 Bitmap,因此对性能影响是可以忽略不计的。当时大致猜想了一下实现的思路,那么现在离发布已经很近了,作者也对 ACL Logging 进行了一些补充,不妨一起来看一下。

    user 结构

    server.h 中定义了对应的user结构保存用户的 ACL 信息,包括:

    • 用户名
    • flag,主要是一些特殊状态,例如用户的启用与禁用、整体控制(所有命令可用与否、所有键可访问与否)、免密码等
    • 可用命令( allowed_commands ),一个长整型数。每一位代表命令,如果用户允许使用这个命令则置位 1
    • 可用子命令( allowed_subcommands ),一个指针数组,值也为指针,数组与可用命令一一对应,值为一个 SDS 数组,SDS 数组中存放的是这个命令可用的子命令
    • 用户密码
    • 可用的 key patterns。如果这个字段为NULL,用户将不能使用任何 Key,除非 flag 中指明特殊状态如ALLKEYS
    typedef struct user {
        sds name;
        uint64_t flags;
        uint64_t allowed_commands[USER_COMMAND_BITS_COUNT/64];
        sds **allowed_subcommands;
        list *passwords;
        list *patterns;
    } user;
    

    补充一下一些新鲜的字段描述,allowed_commands实际上是一个(默认)长度为 1024 的位图,它的 index 对应各个命令的 ID,在历史版本中命令结构redisCommand是通过名字(name)来查找的,id为这个版本中新增的属性,专门用于 ACL 功能。

    struct redisCommand {
        ...
        int id;
    };
    

    user这个结构对应的是client结构的"user"字段,熟悉 Redis 的同学应该对client也有所了解,就不再赘述了。

    ACL 操作选读

    ACL 的命令很多,总体而言都是围绕着user对象展开的,因此从中挑选了几个函数来看一下具体是如何操作user对象。

    一个需要铺垫的通用方法就是ACLGetUserCommandBit,ACL 操作中都会涉及到获取用户的命令位图,ACLGetUserCommandBit()接收一个user结构和命令 ID,根据 ID 定位出命令在allowed_commands中的位置,通过位运算返回用户是否有该命令权限

    int ACLGetUserCommandBit(user *u, unsigned long id) {
        uint64_t word, bit;
        if (ACLGetCommandBitCoordinates(id,&word,&bit) == C_ERR) return 0;
        return (u->allowed_commands[word] & bit) != 0;
    }
    

    当用户进行 Redis 操作时,例如set操作,操作的命令会保存在client结构的*cmd字段中,*cmd字段就是一个redisCommand结构的指针,redisCommand结构包含了命令的id,因此在使用时通过ACLGetUserCommandBit(u, cmd->id)传入。

    创建用户

    创建用户分为两步,首先需要创建一个user,通过调用ACLCreateUser(const char *name, size_t namelen)实现,返回的是一个user对象的指针。在创建时,会在server.h定义的Users中查找是否有同名用户,也是本次功能新增的,因为旧版本中只有"default"用户。此时这个用户拥有名称,flag 被初始化为禁用用户,其余的属性均为 Null 或空 list 等。

    然后,通过调用ACLSetUser(user *u, const char *op, ssize_t oplen),调整传入用户u的对应属性,调整内容放在名为op操作的参数中。这个函数非常长,主要是针对各种不同的“操作” switch case 处理,节选部分如下:

    int ACLSetUser(user *u, const char *op, ssize_t oplen) {
        if (oplen == -1) oplen = strlen(op);
        /* Part1 - 处理用户状态(flag)操作 */
        // 控制用户启用状态
        if (!strcasecmp(op,"on")) {
            u->flags |= USER_FLAG_ENABLED;
            u->flags &= ~USER_FLAG_DISABLED;
        } else if (!strcasecmp(op,"off")) {
            u->flags |= USER_FLAG_DISABLED;
            u->flags &= ~USER_FLAG_ENABLED;
        // 控制全局键、命令等可用与否
        } else if (!strcasecmp(op,"allkeys") ||
                   !strcasecmp(op,"~*"))
        {
            u->flags |= USER_FLAG_ALLKEYS;
            listEmpty(u->patterns);
        }
        ...
    
    
        /* Part2 - 操作用户密码增删改查 */
        // > 和 < 等控制密码的改动删除等
        else if (op[0] == '>' || op[0] == '#') {
            sds newpass;
            if (op[0] == '>') {
                newpass = ACLHashPassword((unsigned char*)op+1,oplen-1);
            }
    
    
        /* Part3 - 操作用户可用命令的范围 */
        else if (op[0] == '+' && op[1] != '@') {
            if (strchr(op,'|') == NULL) {
                if (ACLLookupCommand(op+1) == NULL) {
                    errno = ENOENT;
                    return C_ERR;
                }
                unsigned long id = ACLGetCommandID(op+1);
                // 根据传入的 id 参数设置对应 allowed_commands 位图的值
                ACLSetUserCommandBit(u,id,1);
                // 新调整的命令的子命令数组会被重置
                ACLResetSubcommandsForCommand(u,id);
            }
        }
    

    补充一下具体调用例子,其实 Redis 的默认用户就是按照这套流程创建的:初始化名为“default”的空白无权限用户,然后为这个用户设置上所有权限:

    DefaultUser = ACLCreateUser("default",7);
    ACLSetUser(DefaultUser,"+@all",-1);
    ACLSetUser(DefaultUser,"~*",-1);
    ACLSetUser(DefaultUser,"on",-1);
    ACLSetUser(DefaultUser,"nopass",-1);
    

    拦截不可用命令 /键

    命令 /键拦截操作非常简单:

    • 判断命令 /键是否可用
      • 如果不可用,ACL Log 处理以及返回错误

    ACL 判断

    我们先看一下“不可用”的判断逻辑,然后再回到命令执行流程中看判断方法的调用。

    判断函数同样非常长,展示完后会进行总结:

    int ACLCheckCommandPerm(client *c, int *keyidxptr) {
        user *u = c->user;
        uint64_t id = c->cmd->id;
        // 命令相关的全局 flag 的检查,若满足则跳过后续部分
        if (!(u->flags & USER_FLAG_ALLCOMMANDS) &&
            c->cmd->proc != authCommand)
        {
            // 即使当前命令没有在 allowed_commands 中,还要检查子命令是否可用
            // 以免出现仅开放了部分子命令权限的情况
            if (ACLGetUserCommandBit(u,id) == 0) {
                ...
                // 遍历子命令
                long subid = 0;
                while (1) {
                    if (u->allowed_subcommands[id][subid] == NULL)
                        return ACL_DENIED_CMD;
                    if (!strcasecmp(c->argv[1]->ptr,
                                    u->allowed_subcommands[id][subid]))
                        break; // 子命令可用,跳出循环
                    subid++;
                }
            }
        }
    
        // 键相关的全局 flag 检查,若满足则跳过后续部分
        if (!(c->user->flags & USER_FLAG_ALLKEYS) &&
            (c->cmd->getkeys_proc || c->cmd->firstkey))
        {
            int numkeys;
            // 先拿到当前要进行操作的 Key
            int *keyidx = getKeysFromCommand(c->cmd,c->argv,c->argc,&numkeys);
            for (int j = 0; j < numkeys; j++) {
                listIter li;
                listNode *ln;
                listRewind(u->patterns,&li);
    
                // 检查当前 user 所有的关于 Key 的匹配 Pattern
                // 如果有任意命中则跳出,否则判定不可用
                int match = 0;
                while((ln = listNext(&li))) {
                    sds pattern = listNodeValue(ln);
                    size_t plen = sdslen(pattern);
                    int idx = keyidx[j];
                    if (stringmatchlen(pattern,plen,c->argv[idx]->ptr,
                                       sdslen(c->argv[idx]->ptr),0))
                    {
                        match = 1;
                        break;
                    }
                }
                if (!match) {
                    if (keyidxptr) *keyidxptr = keyidx[j];
                    getKeysFreeResult(keyidx);
                    return ACL_DENIED_KEY;
                }
            }
            getKeysFreeResult(keyidx);
        }
        return ACL_OK;
    }
    

    那么为了方便喜欢跳过代码的同学看结论:

    • ACL 限制围绕user的各个字段进行
    • 全局的 flag 优先级最高,例如设置为所有键可用,所有命令可用,会跳过后续的可用命令遍历和可用键 Pattern 匹配
    • 即使在 allowed_commands 位图中没有被置位,命令也可能可用,因为它是个子命令,而且命令只开放了部分子命令的使用权限
    • 键通过遍历所有定义了的 Pattern 检查,如果有匹配上说明可用
    • 先判断操作是否可用,再判断键(包括全局 flag 也在操作之后)是否可用,两种判断分别对应不同返回整数值:ACL_DENIED_CMDACL_DENIED_KEY

    命令执行流程中的调用

    判断逻辑之后到何时调用这套判断。我们先来复习一下 Redis 如何执行命令:

    • 用户操作
    • 客户端 RESP 协议( Redis 6.0 中有 RESP3 新协议记得关注)压缩发送给服务端
    • 服务端解读消息,存放至client对象的对应字段中,例如argcargv等存放命令和参数等内容
    • 执行前检查(各种执行条件)
    • 执行命令
    • 执行后处理(慢查询日志、AOF 等)

    目前执行命令的方法是在server.c中的processCommand(client *c),传入client对象,执行,返回执行成功与否。我们节选其中关于 ACL 的部分如下:

    int processCommand(client *c) {
        ...
        int acl_keypos;
        int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
        if (acl_retval != ACL_OK) {
            addACLLogEntry(c,acl_retval,acl_keypos,NULL);
            flagTransaction(c);
            if (acl_retval == ACL_DENIED_CMD)
                addReplyErrorFormat(c,
                    "-NOPERM this user has no permissions to run "
                    "the '%s' command or its subcommand", c->cmd->name);
            else
                addReplyErrorFormat(c,
                    "-NOPERM this user has no permissions to access "
                    "one of the keys used as arguments");
            return C_OK;
        }
        ...
    

    在命令解析之后,真正执行之前,通过调用ACLCheckCommandPerm获取判断结果,如果判定不通过,进行以下操作:

    • 记录 ACL 不通过的日志,这个是作者在 RC1 之后新增的功能,还在 Twitch 上进行了直播开发,有兴趣的同学可以在 Youtube 上看到录播
    • 如果当前处于事务( MULTI )过程中,将 client 的flag置为CLIENT_DIRTY_EXEC
    • 根据命令还是键不可用,返回给客户端不同的信息

    因此这次 ACL 功能影响的是执行命令前后的操作。

    其他功能对 ACL 的调用

    通过搜索可以发现一共有 3 处调用了ACLCheckCommandPerm方法:

    /home/duck/study/redis/src/multi.c:
      179  
      180          int acl_keypos;
      181:         int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
      182          if (acl_retval != ACL_OK) {
      183              addACLLogEntry(c,acl_retval,acl_keypos,NULL);
    
    /home/duck/study/redis/src/scripting.c:
      608      /* Check the ACLs. */
      609      int acl_keypos;
      610:     int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
      611      if (acl_retval != ACL_OK) {
      612          addACLLogEntry(c,acl_retval,acl_keypos,NULL);
    
    /home/duck/study/redis/src/server.c:
     3394       * ACLs. */
     3395      int acl_keypos;
     3396:     int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
     3397      if (acl_retval != ACL_OK) {
     3398          addACLLogEntry(c,acl_retval,acl_keypos,NULL);
    

    形式都是大同小异,了解一下即可。总结一下需要判定 ACL 的位置:

    • 正常命令执行流程中
    • MULTI 事务执行过程中
    • Lua 脚本

    总结

    补充一张图来描述新增的 ACL 功能相关的结构: 图中部分的表达可能与实际的数据结构有所差异,主要原因是代码理解和 C 语言的语法掌握不到位所致。

    阅读代码的过程中留意到,对命令的限制是通过 Bitmap 来实现的,而对 Key 的限制是通过特定 Pattern 来实现的。当对 Key 的限制 Pattern 数量特别多时,是否会因为匹配 Pattern 而对性能造成影响,例如超多次的stringmatchlen()执行。当然这一块内容似乎确实没有想到什么提升非常大的判断方式,后续也会继续关注 ACL 的相关改进。

    博客: https://blog.2014bduck.com/archives/343
    备注:毕业不久多积累一点总是好的 orz,如果解读得不正确或者不恰当欢迎邮件骚扰 [email protected]
    
    3 条回复    2020-04-07 11:26:07 +08:00
    longlong
        1
    longlong  
       2020-03-15 12:16:34 +08:00
    挺不错,我竟然读完了
    AyoCross
        2
    AyoCross  
       2020-04-07 10:26:22 +08:00
    redis 为何会考虑用 ACL,而不是 RBAC 或者更复杂的 ABAC 呢
    RedisMasterNode
        3
    RedisMasterNode  
    OP
       2020-04-07 11:26:07 +08:00 via Android
    @AyoCross 个人认为因为简单才符合 redis 各项功能设计的理念
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1720 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 00:00 · PVG 08:00 · LAX 16:00 · JFK 19:00
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.