什么时候调用这些函数呢?值得思考,其实,每次传世壶数据数据到客户端时,都会调用。
/* This function is called every time we are going to transmit new data * to the client. The behavior is the following: * * If the client should receive new data (normal clients will) the function * returns C_OK, and make sure to install the write handler in our event * loop so that when the socket is writable new data gets written. * * If the client should not receive new data, because it is a fake client * (used to load AOF in memory), a master or because the setup of the write * handler failed, the function returns C_ERR. * * The function may return C_OK without actually installing the write * event handler in the following cases: * * 1) The event handler should already be installed since the output buffer * already contains something. * 2) The client is a slave but not yet online, so we want to just accumulate * writes in the buffer but not actually sending them yet. * * Typically gets called every time a reply is built, before adding more * data to the clients output buffers. If the function returns C_ERR no * data should be appended to the output buffers. */ int prepareClientToWrite(client *c) { /* If it's the Lua client we always return ok without installing any * handler since there is no socket at all. */ if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK; /* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */ if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR; /* CLIENT REPLY OFF / SKIP handling: don't send replies. */ if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR; /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag * is set. */ if ((c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR; if (!c->conn) return C_ERR; /* Fake client for AOF loading. */ /* Schedule the client to write the output buffers to the socket, unless * it should already be setup to do so (it has already pending data). * * If CLIENT_PENDING_READ is set, we're in an IO thread and should * not install a write handler. Instead, it will be done by * handleClientsWithPendingReadsUsingThreads() upon return. */ if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ)) clientInstallWriteHandler(c); /* Authorize the caller to queue in the output buffer of this client. */ return C_OK; } /* ----------------------------------------------------------------------------- * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- */ /* Attempts to add the reply to the static buffer in the client struct. * Returns C_ERR if the buffer is full, or the reply list is not empty, * in which case the reply must be added to the reply list. */ int _addReplyToBuffer(client *c, const char *s, size_t len) { size_t available = sizeof(c->buf)-c->bufpos; if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK; /* If there already are entries in the reply list, we cannot * add anything more to the static buffer. */ if (listLength(c->reply) > 0) return C_ERR; /* Check that the buffer has enough space available for this string. */ if (len > available) return C_ERR; memcpy(c->buf+c->bufpos,s,len); c->bufpos+=len; return C_OK; } /* Adds the reply to the reply linked list. * Note: some edits to this function need to be relayed to AddReplyFromClient. */ void _addReplyProtoToList(client *c, const char *s, size_t len) { if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; listNode *ln = listLast(c->reply); clientReplyBlock *tail = ln? listNodeValue(ln): NULL; /* Note that 'tail' may be NULL even if we have a tail node, because when * addReplyDeferredLen() is used, it sets a dummy node to NULL just * fo fill it later, when the size of the bulk length is set. */ /* Append to tail string when possible. */ if (tail) { /* Copy the part we can fit into the tail, and leave the rest for a * new node */ size_t avail = tail->size - tail->used; size_t copy = avail >= len? len: avail; memcpy(tail->buf + tail->used, s, copy); tail->used += copy; s += copy; len -= copy; } if (len) { /* Create a new node, make sure it is allocated to at * least PROTO_REPLY_CHUNK_BYTES */ size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len; tail = zmalloc(size + sizeof(clientReplyBlock)); /* take over the allocation's internal fragmentation */ tail->size = zmalloc_usable_size(tail) - sizeof(clientReplyBlock); tail->used = len; memcpy(tail->buf, s, len); listAddNodeTail(c->reply, tail); c->reply_bytes += tail->size; closeClientOnOutputBufferLimitReached(c, 1); } } /* ----------------------------------------------------------------------------- * Higher level functions to queue data on the client output buffer. * The following functions are the ones that commands implementations will call. * -------------------------------------------------------------------------- */ /* Add the object 'obj' string representation to the client output buffer. */ void addReply(client *c, robj *obj) { if (prepareClientToWrite(c) != C_OK) return; if (sdsEncodedObject(obj)) { if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr)); } else if (obj->encoding == OBJ_ENCODING_INT) { /* For integer encoded strings we just convert it into a string * using our optimized function, and attach the resulting string * to the output buffer. */ char buf[32]; size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr); if (_addReplyToBuffer(c,buf,len) != C_OK) _addReplyProtoToList(c,buf,len); } else { serverPanic("Wrong obj->encoding in addReply()"); } } /* Add the SDS 's' string to the client output buffer, as a side effect * the SDS string is freed. */ void addReplySds(client *c, sds s) { if (prepareClientToWrite(c) != C_OK) { /* The caller expects the sds to be free'd. */ sdsfree(s); return; } if (_addReplyToBuffer(c,s,sdslen(s)) != C_OK) _addReplyProtoToList(c,s,sdslen(s)); sdsfree(s); } /* This low level function just adds whatever protocol you send it to the * client buffer, trying the static buffer initially, and using the string * of objects if not possible. * * It is efficient because does not create an SDS object nor an Redis object * if not needed. The object will only be created by calling * _addReplyProtoToList() if we fail to extend the existing tail object * in the list of objects. */ void addReplyProto(client *c, const char *s, size_t len) { if (prepareClientToWrite(c) != C_OK) return; if (_addReplyToBuffer(c,s,len) != C_OK) _addReplyProtoToList(c,s,len); } /* Low level function called by the addReplyError...() functions. * It emits the protocol for a Redis error, in the form: * * -ERRORCODE Error Message<CR><LF> * * If the error code is already passed in the string 's', the error * code provided is used, otherwise the string "-ERR " for the generic * error code is automatically added. * Note that 's' must NOT end with \r\n. */ void addReplyErrorLength(client *c, const char *s, size_t len) { /* If the string already starts with "-..." then the error code * is provided by the caller. Otherwise we use "-ERR". */ if (!len || s[0] != '-') addReplyProto(c,"-ERR ",5); addReplyProto(c,s,len); addReplyProto(c,"\r\n",2); } /* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) */ void afterErrorReply(client *c, const char *s, size_t len) { /* Increment the global error counter */ server.stat_total_error_replies++; /* Increment the error stats * If the string already starts with "-..." then the error prefix * is provided by the caller ( we limit the search to 32 chars). Otherwise we use "-ERR". */ if (s[0] != '-') { incrementErrorCount("ERR", 3); } else { char *spaceloc = memchr(s, ' ', len < 32 ? len : 32); if (spaceloc) { const size_t errEndPos = (size_t)(spaceloc - s); incrementErrorCount(s+1, errEndPos-1); } else { /* Fallback to ERR if we can't retrieve the error prefix */ incrementErrorCount("ERR", 3); } } /* Sometimes it could be normal that a slave replies to a master with * an error and this function gets called. Actually the error will never * be sent because addReply*() against master clients has no effect... * A notable example is: * * EVAL 'redis.call("incr",KEYS[1]); redis.call("nonexisting")' 1 x * * Where the master must propagate the first change even if the second * will produce an error. However it is useful to log such events since * they are rare and may hint at errors in a script or a bug in Redis. */ int ctype = getClientType(c); if (ctype == CLIENT_TYPE_MASTER || ctype == CLIENT_TYPE_SLAVE || c->id == CLIENT_ID_AOF) { char *to, *from; if (c->id == CLIENT_ID_AOF) { to = "AOF-loading-client"; from = "server"; } else if (ctype == CLIENT_TYPE_MASTER) { to = "master"; from = "replica"; } else { to = "replica"; from = "master"; } if (len > 4096) len = 4096; char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>"; serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error " "to its %s: '%.*s' after processing the command " "'%s'", from, to, (int)len, s, cmdname); if (ctype == CLIENT_TYPE_MASTER && server.repl_backlog && server.repl_backlog_histlen > 0) { showLatestBacklog(); } server.stat_unexpected_error_replies++; } } /* The 'err' object is expected to start with -ERRORCODE and end with \r\n. * Unlike addReplyErrorSds and others alike which rely on addReplyErrorLength. */ void addReplyErrorObject(client *c, robj *err) { addReply(c, err); afterErrorReply(c, err->ptr, sdslen(err->ptr)-2); /* Ignore trailing \r\n */ } /* See addReplyErrorLength for expectations from the input string. */ void addReplyError(client *c, const char *err) { addReplyErrorLength(c,err,strlen(err)); afterErrorReply(c,err,strlen(err)); } /* See addReplyErrorLength for expectations from the input string. */ /* As a side effect the SDS string is freed. */ void addReplyErrorSds(client *c, sds err) { addReplyErrorLength(c,err,sdslen(err)); afterErrorReply(c,err,sdslen(err)); sdsfree(err); } /* See addReplyErrorLength for expectations from the formatted string. * The formatted string is safe to contain \r and \n anywhere. */ void addReplyErrorFormat(client *c, const char *fmt, ...) { va_list ap; va_start(ap,fmt); sds s = sdscatvprintf(sdsempty(),fmt,ap); va_end(ap); /* Trim any newlines at the end (ones will be added by addReplyErrorLength) */ s = sdstrim(s, "\r\n"); /* Make sure there are no newlines in the middle of the string, otherwise * invalid protocol is emitted. */ s = sdsmapchars(s, "\r\n", " ", 2); addReplyErrorLength(c,s,sdslen(s)); afterErrorReply(c,s,sdslen(s)); sdsfree(s); } void addReplyStatusLength(client *c, const char *s, size_t len) { addReplyProto(c,"+",1); addReplyProto(c,s,len); addReplyProto(c,"\r\n",2); } void addReplyStatus(client *c, const char *status) { addReplyStatusLength(c,status,strlen(status)); } void addReplyStatusFormat(client *c, const char *fmt, ...) { va_list ap; va_start(ap,fmt); sds s = sdscatvprintf(sdsempty(),fmt,ap); va_end(ap); addReplyStatusLength(c,s,sdslen(s)); sdsfree(s); } /* Sometimes we are forced to create a new reply node, and we can't append to * the previous one, when that happens, we wanna try to trim the unused space * at the end of the last reply node which we won't use anymore. */ void trimReplyUnusedTailSpace(client *c) { listNode *ln = listLast(c->reply); clientReplyBlock *tail = ln? listNodeValue(ln): NULL; /* Note that 'tail' may be NULL even if we have a tail node, because when * addReplyDeferredLen() is used */ if (!tail) return; /* We only try to trim the space is relatively high (more than a 1/4 of the * allocation), otherwise there's a high chance realloc will NOP. * Also, to avoid large memmove which happens as part of realloc, we only do * that if the used part is small. */ if (tail->size - tail->used > tail->size / 4 && tail->used < PROTO_REPLY_CHUNK_BYTES) { size_t old_size = tail->size; tail = zrealloc(tail, tail->used + sizeof(clientReplyBlock)); /* take over the allocation's internal fragmentation (at least for * memory usage tracking) */ tail->size = zmalloc_usable_size(tail) - sizeof(clientReplyBlock); c->reply_bytes = c->reply_bytes + tail->size - old_size; listNodeValue(ln) = tail; } } /* Adds an empty object to the reply list that will contain the multi bulk * length, which is not known when this function is called. */ void *addReplyDeferredLen(client *c) { /* Note that we install the write event here even if the object is not * ready to be sent, since we are sure that before returning to the * event loop setDeferredAggregateLen() will be called. */ if (prepareClientToWrite(c) != C_OK) return NULL; trimReplyUnusedTailSpace(c); listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */ return listLast(c->reply); } void setDeferredReply(client *c, void *node, const char *s, size_t length) { listNode *ln = (listNode*)node; clientReplyBlock *next, *prev; /* Abort when *node is NULL: when the client should not accept writes * we return NULL in addReplyDeferredLen() */ if (node == NULL) return; serverAssert(!listNodeValue(ln)); /* Normally we fill this dummy NULL node, added by addReplyDeferredLen(), * with a new buffer structure containing the protocol needed to specify * the length of the array following. However sometimes there might be room * in the previous/next node so we can instead remove this NULL node, and * suffix/prefix our data in the node immediately before/after it, in order * to save a write(2) syscall later. Conditions needed to do it: * * - The prev node is non-NULL and has space in it or * - The next node is non-NULL, * - It has enough room already allocated * - And not too large (avoid large memmove) */ if (ln->prev != NULL && (prev = listNodeValue(ln->prev)) && prev->size - prev->used > 0) { size_t len_to_copy = prev->size - prev->used; if (len_to_copy > length) len_to_copy = length; memcpy(prev->buf + prev->used, s, len_to_copy); prev->used += len_to_copy; length -= len_to_copy; if (length == 0) { listDelNode(c->reply, ln); return; } s += len_to_copy; } if (ln->next != NULL && (next = listNodeValue(ln->next)) && next->size - next->used >= length && next->used < PROTO_REPLY_CHUNK_BYTES * 4) { memmove(next->buf + length, next->buf, next->used); memcpy(next->buf, s, length); next->used += length; listDelNode(c->reply,ln); } else { /* Create a new node */ clientReplyBlock *buf = zmalloc(length + sizeof(clientReplyBlock)); /* Take over the allocation's internal fragmentation */ buf->size = zmalloc_usable_size(buf) - sizeof(clientReplyBlock); buf->used = length; memcpy(buf->buf, s, length); listNodeValue(ln) = buf; c->reply_bytes += buf->size; closeClientOnOutputBufferLimitReached(c, 1); } } /* Populate the length object and try gluing it to the next chunk. */ void setDeferredAggregateLen(client *c, void *node, long length, char prefix) { serverAssert(length >= 0); /* Abort when *node is NULL: when the client should not accept writes * we return NULL in addReplyDeferredLen() */ if (node == NULL) return; char lenstr[128]; size_t lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length); setDeferredReply(c, node, lenstr, lenstr_len); } void setDeferredArrayLen(client *c, void *node, long length) { setDeferredAggregateLen(c,node,length,'*'); } void setDeferredMapLen(client *c, void *node, long length) { int prefix = c->resp == 2 ? '*' : '%'; if (c->resp == 2) length *= 2; setDeferredAggregateLen(c,node,length,prefix); } void setDeferredSetLen(client *c, void *node, long length) { int prefix = c->resp == 2 ? '*' : '~'; setDeferredAggregateLen(c,node,length,prefix); } void setDeferredAttributeLen(client *c, void *node, long length) { serverAssert(c->resp >= 3); setDeferredAggregateLen(c,node,length,'|'); } void setDeferredPushLen(client *c, void *node, long length) { serverAssert(c->resp >= 3); setDeferredAggregateLen(c,node,length,'>'); } /* Add a double as a bulk reply */ void addReplyDouble(client *c, double d) { if (isinf(d)) { /* Libc in odd systems (Hi Solaris!) will format infinite in a * different way, so better to handle it in an explicit way. */ if (c->resp == 2) { addReplyBulkCString(c, d > 0 ? "inf" : "-inf"); } else { addReplyProto(c, d > 0 ? ",inf\r\n" : ",-inf\r\n", d > 0 ? 6 : 7); } } else { char dbuf[MAX_LONG_DOUBLE_CHARS+3], sbuf[MAX_LONG_DOUBLE_CHARS+32]; int dlen, slen; if (c->resp == 2) { dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d); slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf); addReplyProto(c,sbuf,slen); } else { dlen = snprintf(dbuf,sizeof(dbuf),",%.17g\r\n",d); addReplyProto(c,dbuf,dlen); } } } void addReplyBigNum(client *c, const char* num, size_t len) { if (c->resp == 2) { addReplyBulkCBuffer(c, num, len); } else { addReplyProto(c,"(",1); addReplyProto(c,num,len); addReply(c,shared.crlf); } } /* Add a long double as a bulk reply, but uses a human readable formatting * of the double instead of exposing the crude behavior of doubles to the * dear user. */ void addReplyHumanLongDouble(client *c, long double d) { if (c->resp == 2) { robj *o = createStringObjectFromLongDouble(d,1); addReplyBulk(c,o); decrRefCount(o); } else { char buf[MAX_LONG_DOUBLE_CHARS]; int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN); addReplyProto(c,",",1); addReplyProto(c,buf,len); addReplyProto(c,"\r\n",2); } }