名称 | 功能 |
---|---|
redisServer | 服务器结构体 |
redisClient | 客户端结构体 |
redisCommand | Redis命令,全局有一个命令字典,会在服务器启动初始化调用initServerConfig()->populateCommandTable()时生成 |
redisServer以及在第一篇文章[1]中讲过,这里只介绍后两个结构:
/* With multiplexing we need to take per-client state. * Clients are taken in a liked list. */ typedef struct redisClient { int fd; redisDb *db; int dictid; robj *name; /* As set by CLIENT SETNAME */ sds querybuf; int argc; robj **argv; struct redisCommand *cmd, *lastcmd; int reqtype; int multibulklen; /* number of multi bulk arguments left to read */ long bulklen; /* length of bulk argument in multi bulk request */ list *reply; unsigned long reply_bytes; /* Tot bytes of objects in reply list */ int sentlen; /* Amount of bytes already sent in the current /* Response buffer */ int bufpos; char buf[REDIS_REPLY_CHUNK_BYTES]; } redisClient;
struct redisCommand { char *name; redisCommandProc *proc; int arity; char *sflags; /* Flags as string representation, one char per flag. */ int flags; /* The actual flags, obtained from the 'sflags' field. */ /* Use a function to determine keys arguments in a command line. */ redisGetKeysProc *getkeys_proc; /* What keys should be loaded in background when calling this command? */ int firstkey; /* The first argument that's a key (0 = no keys) */ int lastkey; /* The last argument that's a key */ int keystep; /* The step between first and last key */ long long microseconds, calls; };
L3:命令处理例程;
redisCommand就被存放在一个列表中:
struct redisCommand redisCommandTable[] = { {"get",getCommand,2,"r",0,NULL,1,1,1,0,0}, {"set",setCommand,-3,"wm",0,noPreloadGetKeys,1,1,1,0,0}, {"setnx",setnxCommand,3,"wm",0,noPreloadGetKeys,1,1,1,0,0}, // ... }
Redis的网络事件处理是基于Reactor模型的,因此客户端的命令请求就是对应连接的fd上的监听事件触发:
redisClient *createClient(int fd) { redisClient *c = zmalloc(sizeof(redisClient)); /* passing -1 as fd it is possible to create a non connected client. * This is useful since all the Redis commands needs to be executed * in the context of a client. When commands are executed in other * contexts (for instance a Lua script) we need a non connected client. */ if (fd != -1) { anetNonBlock(NULL,fd); anetEnableTcpNoDelay(NULL,fd); if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); /* 监听客户端的可读事件,设置回调 */ if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } // ... }
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *c = (redisClient*) privdata; int nread, readlen; size_t qblen; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); server.current_client = c; readlen = REDIS_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the * Redis Object representing the argument. */ if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= REDIS_MBULK_BIG_ARG) { int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf); if (remaining < readlen) readlen = remaining; } qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); /* 将命令读入客户端querybuf(输入缓冲区)中 */ nread = read(fd, c->querybuf+qblen, readlen); if (nread == -1) { if (errno == EAGAIN) { nread = 0; } else { redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno)); freeClient(c); return; } } else if (nread == 0) { redisLog(REDIS_VERBOSE, "Client closed connection"); freeClient(c); return; } if (nread) { /* 更新sds的len和free属性 */ sdsIncrLen(c->querybuf,nread); c->lastinteraction = server.unixtime; /* 如果客户端是master,即本端是slave; * 本端收到的命令请求是master发过来用于部分重同步的, * 所以需要更新reploff(replication offset) */ if (c->flags & REDIS_MASTER) c->reploff += nread; } else { server.current_client = NULL; return; } /* 写入数据过大时关闭客户端 */ if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = getClientInfoString(c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64); redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); freeClient(c); return; } /* 从输入缓存中读取内容,执行命令 */ processInputBuffer(c); server.current_client = NULL; }
L49:注意主从复制[1]时,master是slave的服务器,也是slave的客户端;
L66:第三、四节分析命令分析和执行过程;
void processInputBuffer(redisClient *c) { /* Keep processing while there is something in the input buffer */ while(sdslen(c->querybuf)) { /* Immediately abort if the client is in the middle of something. */ if (c->flags & REDIS_BLOCKED) return; /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is * written to the client. Make sure to not let the reply grow after * this flag has been set (i.e. don't process more commands). */ if (c->flags & REDIS_CLOSE_AFTER_REPLY) return; /* Determine request type when unknown. */ if (!c->reqtype) { if (c->querybuf[0] == '*') { c->reqtype = REDIS_REQ_MULTIBULK; } else { c->reqtype = REDIS_REQ_INLINE; } } if (c->reqtype == REDIS_REQ_INLINE) { if (processInlineBuffer(c) != REDIS_OK) break; } else if (c->reqtype == REDIS_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != REDIS_OK) break; } else { redisPanic("Unknown request type"); } /* Multibulk processing could see a <= 0 length. */ if (c->argc == 0) { resetClient(c); } else { /* Only reset the client when the command was executed. */ if (processCommand(c) == REDIS_OK) resetClient(c); } } }
int processMultibulkBuffer(redisClient *c) { char *newline = NULL; int pos = 0, ok; long long ll; /* 假设命令为: * *3\r\n$3\r\nSET\r\n... * | * | * querybuf * */ if (c->multibulklen == 0) { /* The client should have been reset */ redisAssertWithInfo(c,NULL,c->argc == 0); /* Multi bulk length cannot be read without a \r\n */ newline = strchr(c->querybuf,'\r'); if (newline == NULL) { if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) { addReplyError(c,"Protocol error: too big mbulk count string"); setProtocolError(c,0); } return REDIS_ERR; } /* Buffer should also contain \n */ if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2)) return REDIS_ERR; /* We know for sure there is a whole line since newline != NULL, * so go ahead and find out the multi bulk length. */ redisAssertWithInfo(c,NULL,c->querybuf[0] == '*'); /* 1.获取参数个数 * 假设命令为: * *3\r\n$3\r\nSET\r\n... * | | * | | * qb newline * * ll即为3 */ ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll); if (!ok || ll > 1024*1024) { addReplyError(c,"Protocol error: invalid multibulk length"); setProtocolError(c,pos); return REDIS_ERR; } pos = (newline-c->querybuf)+2; if (ll <= 0) { sdsrange(c->querybuf,pos,-1); return REDIS_OK; } /* 参数个数 */ c->multibulklen = ll; /* Setup argv array on client structure */ if (c->argv) zfree(c->argv); c->argv = zmalloc(sizeof(robj*)*c->multibulklen); } redisAssertWithInfo(c,NULL,c->multibulklen > 0); while(c->multibulklen) { /* Read bulk length if unknown */ if (c->bulklen == -1) { newline = strchr(c->querybuf+pos,'\r'); if (newline == NULL) { if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) { addReplyError(c,"Protocol error: too big bulk count string"); setProtocolError(c,0); } break; } /* Buffer should also contain \n */ if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2)) break; if (c->querybuf[pos] != '$') { addReplyErrorFormat(c, "Protocol error: expected '$', got '%c'", c->querybuf[pos]); setProtocolError(c,pos); return REDIS_ERR; } /* 2.获取参数长度 * 假设命令为: * *3\r\n$3\r\nSET\r\n... * | | | * | | | * qb pos newline * * ll即为$之后的3 */ ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll); if (!ok || ll < 0 || ll > 512*1024*1024) { addReplyError(c,"Protocol error: invalid bulk length"); setProtocolError(c,pos); return REDIS_ERR; } /* 移动到参数开头 * 3.获取参数 * 假设命令为: * *3\r\n$3\r\nSET\r\n... * | | * | | * qb pos * */ pos += newline-(c->querybuf+pos)+2; if (ll >= REDIS_MBULK_BIG_ARG) { size_t qblen; /* If we are going to read a large object from network * try to make it likely that it will start at c->querybuf * boundary so that we can optimize object creation * avoiding a large copy of data. */ sdsrange(c->querybuf,pos,-1); pos = 0; qblen = sdslen(c->querybuf); /* Hint the sds library about the amount of bytes this string is * going to contain. */ if (qblen < ll+2) c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-qblen); } /* 参数长度 */ c->bulklen = ll; } /* Read bulk argument */ if (sdslen(c->querybuf)-pos < (unsigned)(c->bulklen+2)) { /* Not enough data (+2 == trailing \r\n) */ break; } else { /* Optimization: if the buffer contains JUST our bulk element * instead of creating a new object by *copying* the sds we * just use the current sds string. */ if (pos == 0 && c->bulklen >= REDIS_MBULK_BIG_ARG && (signed) sdslen(c->querybuf) == c->bulklen+2) { c->argv[c->argc++] = createObject(REDIS_STRING,c->querybuf); sdsIncrLen(c->querybuf,-2); /* remove CRLF */ c->querybuf = sdsempty(); /* Assume that if we saw a fat argument we'll see another one * likely... */ c->querybuf = sdsMakeRoomFor(c->querybuf,c->bulklen+2); pos = 0; } else { /* 不进行优化,得到参数,pos向后移动 */ c->argv[c->argc++] = createStringObject(c->querybuf+pos,c->bulklen); pos += c->bulklen+2; } c->bulklen = -1; c->multibulklen--; } } /* Trim to pos */ /* 将pos前(缓冲区已被读取的部分)删除 */ if (pos) sdsrange(c->querybuf,pos,-1); /* We're done when c->multibulk == 0 */ if (c->multibulklen == 0) return REDIS_OK; /* Still not read to process the command */ return REDIS_ERR; }
processCommand() 在进行一些校验以后,会执行call()函数。官方注释将call函数称作 the **core** of Redis execution of a command。
/* Call() is the core of Redis execution of a command */ void call(redisClient *c, int flags) { long long dirty, start = ustime(), duration; int client_old_flags = c->flags; /* Sent the command to clients in MONITOR mode, only if the commands are * not generated from reading an AOF. */ if (listLength(server.monitors) && !server.loading && !(c->cmd->flags & REDIS_CMD_SKIP_MONITOR)) { replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); } /* Call the command. */ /*to_do why??; */ c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL); redisOpArrayInit(&server.also_propagate); dirty = server.dirty; c->cmd->proc(c); /* 当前指令进行了多少次修改; */ dirty = server.dirty-dirty; duration = ustime()-start; /* When EVAL is called loading the AOF we don't want commands called * from Lua to go into the slowlog or to populate statistics. */ if (server.loading && c->flags & REDIS_LUA_CLIENT) flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS); /* If the caller is Lua, we want to force the EVAL caller to propagate * the script if the command flag or client flag are forcing the * propagation. */ if (c->flags & REDIS_LUA_CLIENT && server.lua_caller) { if (c->flags & REDIS_FORCE_REPL) server.lua_caller->flags |= REDIS_FORCE_REPL; if (c->flags & REDIS_FORCE_AOF) server.lua_caller->flags |= REDIS_FORCE_AOF; } /* Log the command into the Slow log if needed, and populate the * per-command statistics that we show in INFO commandstats. */ /* 记录慢查询日志; */ if (flags & REDIS_CALL_SLOWLOG && c->cmd->proc != execCommand) slowlogPushEntryIfNeeded(c->argv,c->argc,duration); if (flags & REDIS_CALL_STATS) { c->cmd->microseconds += duration; c->cmd->calls++; } /* Propagate the command into the AOF and replication link */ /* 向从服务器和AOF文件进行命令传播; */ if (flags & REDIS_CALL_PROPAGATE) { int flags = REDIS_PROPAGATE_NONE; if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL; if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF; if (dirty) flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF); if (flags != REDIS_PROPAGATE_NONE) propagate(c->cmd,c->db->id,c->argv,c->argc,flags); } /* Restore the old FORCE_AOF/REPL flags, since call can be executed * recursively. */ /* to_do why??; */ c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL); c->flags |= client_old_flags & (REDIS_FORCE_AOF|REDIS_FORCE_REPL); /* Handle the alsoPropagate() API to handle commands that want to propagate * multiple separated commands. */ if (server.also_propagate.numops) { int j; redisOp *rop; for (j = 0; j < server.also_propagate.numops; j++) { rop = &server.also_propagate.ops[j]; propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target); } redisOpArrayFree(&server.also_propagate); } server.stat_numcommands++; }
下面展示一个c->cmd->proc()执行的实例setCommand() -> setGenericCommand:
void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) { long long milliseconds = 0; /* initialized to avoid any harmness warning */ if (expire) { if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != REDIS_OK) return; if (milliseconds <= 0) { addReplyError(c,"invalid expire time in SETEX"); return; } if (unit == UNIT_SECONDS) milliseconds *= 1000; } if ((flags & REDIS_SET_NX && lookupKeyWrite(c->db,key) != NULL) || (flags & REDIS_SET_XX && lookupKeyWrite(c->db,key) == NULL)) { addReply(c, abort_reply ? abort_reply : shared.nullbulk); return; } /* 对客户端当前使用的server的数据库(操作c->db->dict)的操作在这里; */ setKey(c->db,key,val); /* dirty计数器; */ server.dirty++; /* 设置过期时间(操作c->db->expires) */ if (expire) setExpire(c->db,key,mstime()+milliseconds); /* 订阅通知机制,to_do; */ notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",key,c->db->id); if (expire) notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC, "expire",key,c->db->id); addReply(c, ok_reply ? ok_reply : shared.ok); }
主要通过addReply*系列函数,修改c->buf输出缓冲区,并且通过prepareClientToWrite在事件循环中监听了可写事件(回调函数完成取消监听,因为Redis使用EPOLL LT模式)。以后可能会详细分析;
Redis源码分析--主从复制 - macguz - 博客园 (cnblogs.com)
REDIS protocol -- Redis中国用户组(CRUG)
Redis源码解析(7) 发布订阅机制_李兆龙的博客-CSDN博客
10Redis键空间通知(keyspace notifications)_程序员的自我修养-CSDN博客_redis键空间通知