代码围绕着ngx_http_upstream.c展开,该模块主要为创建mainconf函数:
static void * ngx_http_upstream_create_main_conf(ngx_conf_t *cf) { ... //创建main conf umcf = ngx_pcalloc(cf->pool, sizeof(ngx_http_upstream_main_conf_t)); //创建upstream数组,每一个ngx_http_upstream_srv_conf_t对应一个upstream ngx_array_init(&umcf->upstreams, cf->pool, 4, sizeof(ngx_http_upstream_srv_conf_t *) //创建implicit_upstreams链表,用来存放proxy_pass创建出来的upstream,真正创建upstream的时候用 ngx_list_init(&umcf->implicit_upstreams, cf->pool, 4, sizeof(ngx_http_upstream_srv_conf_t *) //创建rbtree。 ngx_rbtree_init(&umcf->rbtree, &umcf->sentinel, ngx_http_upstream_rbtree_insert_value); ... }
配置中的upstream会触发ngx_http_upstream函数,该配置作用于mainconf的配置
static char * ngx_http_upstream(ngx_conf_t *cf, ngx_command_t *cmd, void *dummy) { ... // upstream的name value = cf->args->elts; u.host = value[1]; //生成uscf结构体 uscf = ngx_http_upstream_add(cf, &u, flag); //创建配置指针,保存当前http块的配置,将生成配置的main_conf指向http块 ctx = ngx_pcalloc(cf->pool, sizeof(ngx_http_conf_ctx_t)); http_ctx = cf->ctx; ctx->main_conf = http_ctx->main_conf; //将生成的uscf放在upstream module对应srv配置上,因为这个模块没有create srv conf ctx->srv_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_http_max_module); ctx->srv_conf[ngx_http_upstream_module.ctx_index] = uscf; uscf->srv_conf = ctx->srv_conf; //接下来是生成loc conf并且将全部的模块过滤一次,生成对应的srv conf和loc conf,不用merge的是因为在ups里的配置不可能出现在别的地方了,这些生成的conf会在接下来使用 uscf->servers = ngx_array_create(cf->pool, 4, sizeof(ngx_http_upstream_server_t)); // 接下来开始解析里面的配置,主要是servers pcf = *cf; cf->ctx = ctx; cf->cmd_type = NGX_HTTP_UPS_CONF; rv = ngx_conf_parse(cf, NULL); ... }
ngx_http_upstream_add函数是当前的upstream加进去,这个函数在遇到“upstream”的时候,是带着create标志位的,这个就是正了八经的创建,如果遇到proxy_pass,带的标志位是0
ngx_http_upstream_srv_conf_t * ngx_http_upstream_add(ngx_conf_t *cf, ngx_url_t *u, ngx_uint_t flags) { ... // 红黑树里寻找uscf,对应upstream的配置 umcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_upstream_module); uscf = ngx_http_upstream_rbtree_lookup(umcf, &u->host); // not found的时候,再找找看有没有proxy_pass搞出来的part not_found: part = &umcf->implicit_upstreams.part; uscfp = part->elts; // 最后的创建,先看是否带标志位,如果带则删掉part,insert进红黑树 if (flags & NGX_HTTP_UPSTREAM_CREATE) { uscfp[i]->flags = flags; #if (NGX_HTTP_UPSTREAM_RBTREE) uscf = uscfp[i]; ngx_rbtree_insert(&umcf->rbtree, &uscfp[i]->node); ngx_list_delete(&umcf->implicit_upstreams, &uscfp[i]); return uscf; #endif } ... }
分析完回源upstream处理流程,再来看dyups是怎么工作的?
动态修改upstream不reload nginx模块,ngx_http_dyups_module分析。
围绕ngx_http_dyups_module.c进行分析:
在create main conf的时候初始化这个数组 static void * ngx_http_dyups_create_main_conf(ngx_conf_t *cf) { ... if (ngx_array_init(&dmcf->dy_upstreams, cf->pool, 1024, sizeof(ngx_http_dyups_srv_conf_t)) != NGX_OK) { return NULL; } ... }
在dyups init的时候将upstream中的conf全部取出来放进去
//初始化dy_upstream链以及全局ngx_http_dyups_deleted_upstream
static ngx_int_t ngx_http_dyups_init(ngx_conf_t *cf) { ... dmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_dyups_module); umcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_upstream_module); uscfp = umcf->upstreams.elts; for (i = 0; i < umcf->upstreams.nelts; i++) { duscf = ngx_array_push(&dmcf->dy_upstreams); // 清零 ngx_memzero(duscf, sizeof(ngx_http_dyups_srv_conf_t)); duscf->pool = NULL; // 赋值 duscf->upstream = uscfp[i]; duscf->dynamic = (uscfp[i]->port == 0 && uscfp[i]->srv_conf && uscfp[i]->servers && uscfp[i]->flags & NGX_HTTP_UPSTREAM_CREATE); duscf->deleted = 0; // 赋值index duscf->idx = i; } ... }
shm初始化在ngx_http_dyups_init_main_conf函数中,同时设置了read_mesg的超时时间,并且指定了大小。
static char * ngx_http_dyups_init_main_conf(ngx_conf_t *cf, void *conf) { ... if (dmcf->read_msg_timeout == NGX_CONF_UNSET_MSEC) { // 一秒一次 dmcf->read_msg_timeout = 1000; } if (dmcf->shm_size == NGX_CONF_UNSET_UINT) { dmcf->shm_size = 2 * 1024 * 1024; } return ngx_http_dyups_init_shm(cf, conf); ... }
static char * ngx_http_dyups_init_shm(ngx_conf_t *cf, void *conf) { ... shm_zone = ngx_shared_memory_add(cf, &dmcf->shm_name, dmcf->shm_size, &ngx_http_dyups_module); shm_zone->data = cf->pool; // 加进去的这个名头的共享内存块的init函数会在初始化的时候统一调用 shm_zone->init = ngx_http_dyups_init_shm_zone; ... }
static ngx_int_t ngx_http_dyups_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) { ... shpool = (ngx_slab_pool_t *) shm_zone->shm.addr; sh = ngx_slab_alloc(shpool, sizeof(ngx_dyups_shctx_t)); if (sh == NULL) { return NGX_ERROR; } // 全局变量,sh和shpool ngx_dyups_global_ctx.sh = sh; ngx_dyups_global_ctx.shpool = shpool; // 初始化msg->queue ngx_queue_init(&sh->msg_queue); sh->version = 0; sh->status = NULL; ... }
该函数在启动进程时候调用,设定定时器。
//初始化共享内存状态,判断如果是非正常退出,则重新加载upstream配置
static ngx_int_t ngx_http_dyups_init_process(ngx_cycle_t *cycle) { ... // 设定定时器来定时read msg,同步信息 timer = &ngx_dyups_global_ctx.msg_timer; timer->handler = ngx_http_dyups_read_msg; ngx_add_timer(timer, dmcf->read_msg_timeout); // 拿到全局的pool和sh shpool = ngx_dyups_global_ctx.shpool; sh = ngx_dyups_global_ctx.sh; ngx_shmtx_lock(&shpool->mutex); // 初始化的时候肯定是NULL,,申请对应数量进程数的内存 if (sh->status == NULL) { sh->status = ngx_slab_alloc_locked(shpool, sizeof(ngx_dyups_status_t) * ccf->worker_processes); if (sh->status == NULL) { ngx_shmtx_unlock(&shpool->mutex); return NGX_ERROR; } ngx_memzero(sh->status, sizeof(ngx_dyups_status_t) * ccf->worker_processes); ngx_shmtx_unlock(&shpool->mutex); return NGX_OK; } ngx_shmtx_unlock(&shpool->mutex); // 判断version,如果不是0的话,说明version已经在同步中被++了,所以是进程挂掉再被拉起来 if (sh->version != 0) { // 这里后续再看... }
最核心的是ngx_http_dyups_read_msg函数里的ngx_http_dyups_read_msg_locked函数
static void ngx_http_dyups_read_msg_locked(ngx_event_t *ev) { ... sh = ngx_dyups_global_ctx.sh; shpool = ngx_dyups_global_ctx.shpool; for (i = 0; i < ccf->worker_processes; i++) { status = &sh->status[i]; if (status->pid == 0 || status->pid == ngx_pid) { ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ev->log, 0, "[dyups] process %P update time %ui", status->pid, status->time); // 遍历全部进程,将对应的pid赋值 status->pid = ngx_pid; status->time = now; break; } } // 遍历消息队列 for (q = ngx_queue_last(&sh->msg_queue); q != ngx_queue_sentinel(&sh->msg_queue); q = ngx_queue_prev(q)) { // 如果该msg的count和进程数一致,就是大家都同步过了,把这个msg删掉 if (msg->count == ccf->worker_processes) { t = ngx_queue_next(q); ngx_queue_remove(q); q = t; ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ev->log, 0, "[dyups] destroy msg %V:%V", &msg->name, &msg->content); ngx_dyups_destroy_msg(shpool, msg); continue; } found = 0; for (i = 0; i < msg->count; i++) { ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ev->log, 0, "[dyups] msg pids [%P]", msg->pid[i]); if (msg->pid[i] == ngx_pid) { found = 1; break; } } // 如果发现该进程了,就说明已经同步过了,继续 if (found) { ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ev->log, 0, "[dyups] msg %V count %ui found", &msg->name, msg->count); continue; } // 如果没发现的话,count++,pid更新 msg->pid[i] = ngx_pid; msg->count++; ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ev->log, 0, "[dyups] msg %V count %ui", &msg->name, msg->count); // 取出来name和content name = msg->name; content = msg->content; // 执行同步 rc = ngx_dyups_sync_cmd(pool, &name, &content, msg->flag); if (rc != NGX_OK) { ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "[dyups] read msg error, may cause the " "config inaccuracy, name:%V, content:%V", &name, &content); } } ... }
static ngx_int_t ngx_dyups_sync_cmd(ngx_pool_t *pool, ngx_str_t *name, ngx_str_t *content, ngx_uint_t flag) { ... } else if (flag == NGX_DYUPS_ADD) { body.start = body.pos = content->data; body.end = body.last = content->data + content->len; body.temporary = 1; rc = ngx_dyups_do_update(name, &body, &rv); ngx_log_error(NGX_LOG_INFO, ngx_cycle->log, 0, "[dyups] sync add: %V rv: %V rc: %i", name, &rv, rc); if (rc != NGX_HTTP_OK) { return NGX_ERROR; } return NGX_OK; } ... }
接下来就是ngx_dyups_do_update
同步其他进程接收的信息,如果是当前进程处理的,要把信息添加到消息队列中。
ngx_dyups_update_upstream
ngx_int_t ngx_dyups_update_upstream(ngx_str_t *name, ngx_buf_t *buf, ngx_str_t *rv) { ... // 先检查有没有需要做的操作 ngx_http_dyups_read_msg_locked(timer); // 沙箱测试配置 status = ngx_dyups_sandbox_update(buf, rv); if (status != NGX_HTTP_OK) { goto finish; } status = ngx_dyups_do_update(name, buf, rv); if (status == NGX_HTTP_OK) { //关键的是把操作发到队列中去 if (ngx_http_dyups_send_msg(name, buf, NGX_DYUPS_ADD)) { ngx_str_set(rv, "alert: update success " "but not sync to other process"); status = NGX_HTTP_INTERNAL_SERVER_ERROR; } } ... }
接下来就是ngx_http_dyups_send_msg函数
static ngx_int_t ngx_http_dyups_send_msg(ngx_str_t *name, ngx_buf_t *body, ngx_uint_t flag) { ... // 这个函数还是挺简单的,就是初始化整个msg,将name和body填充进去。 sh->version++; ngx_queue_insert_head(&sh->msg_queue, &msg->queue); ... }
update之前先find寻找对应的upstream。
static ngx_http_dyups_srv_conf_t * ngx_dyups_find_upstream(ngx_str_t *name, ngx_int_t *idx) { ... duscfs = dumcf->dy_upstreams.elts; for (i = 0; i < dumcf->dy_upstreams.nelts; i++) { duscf = &duscfs[i]; uscf = duscf->upstream; if (uscf->host.len != name->len || ngx_strncasecmp(uscf->host.data, name->data, uscf->host.len) != 0) { continue; } *idx = i; return duscf; } ... }
如果寻找到了则对idx赋值,一旦发现寻找到了对应name的dy_upstream进行判断。
调用的是ngx_dyups_mark_upstream_delete函数
static void ngx_dyups_mark_upstream_delete(ngx_http_dyups_srv_conf_t *duscf) { ... // 获取umcf和uscf uscf = duscf->upstream; umcf = ngx_http_cycle_get_module_main_conf(ngx_cycle, ngx_http_upstream_module); // us获取这个dynamic upstream下的servers us = uscf->servers->elts; for (i = 0; i < uscf->servers->nelts; i++) { // 标志位置1 us[i].down = 1; #if (NGX_HTTP_UPSTREAM_CHECK) if (us[i].addrs) { // 关闭peer,看宏定义主要关闭健康检查的peer ngx_http_upstream_check_delete_dynamic_peer(&uscf->host, us[i].addrs); } #endif } // 将upstream对应的index的配置变成一个dummy配置 uscfp[duscf->idx] = &ngx_http_dyups_deleted_upstream; #if (NGX_HTTP_UPSTREAM_RBTREE) ngx_rbtree_delete(&umcf->rbtree, &uscf->node); #endif duscf->deleted = NGX_DYUPS_DELETING; ... }
这里最重要的是check_delete_dynamic_peer
void ngx_http_upstream_check_delete_dynamic_peer(ngx_str_t *name, ngx_addr_t *peer_addr) { ... /* 一堆比较 找到choosen*/ chosen = &peer[i]; chosen->shm->ref--; if (chosen->shm->ref <= 0 && chosen->shm->delete != PEER_DELETED) { ngx_http_upstream_check_clear_dynamic_peer_shm(chosen->shm); chosen->shm->delete = PEER_DELETED; } ngx_shmtx_unlock(&chosen->shm->mutex); ngx_http_upstream_check_clear_peer(chosen); ... }
删完一次之后再find一次,idx大概率就变成-1了,然后就进行创建了。
static ngx_int_t ngx_dyups_do_update(ngx_str_t *name, ngx_buf_t *buf, ngx_str_t *rv) { ... if (idx == -1) { duscf = ngx_array_push(&dumcf->dy_upstreams); // 这个uscfp是没有用处的,只为了给这个数组加一 uscfp = ngx_array_push(&umcf->upstreams); ngx_memzero(duscf, sizeof(ngx_http_dyups_srv_conf_t)); // 这块是为了获取在umcf中的新upstream的index值。 idx = umcf->upstreams.nelts - 1; } duscf->idx = idx; rc = ngx_dyups_init_upstream(duscf, name, idx); rc = ngx_dyups_add_server(duscf, buf); ... }
最重要的就是init_upstream和add_server。
init upstream的传参是dy_srv_conf_t、upstream的name,以及upstream链表中对应的index。
static ngx_int_t ngx_dyups_init_upstream(ngx_http_dyups_srv_conf_t *duscf, ngx_str_t *name, ngx_uint_t index) { ... umcf = ngx_http_cycle_get_module_main_conf(ngx_cycle, ngx_http_upstream_module); uscfp = umcf->upstreams.elts; /*初始化uscf 也就是upstream的各个结构体*/ uscfp[index] = uscf; // 赋值 duscf->dynamic = 1; duscf->upstream = uscf; ctx = ngx_pcalloc(duscf->pool, sizeof(ngx_http_conf_ctx_t)); // 存放ctx duscf->ctx = ctx; // insert进去uscf uscf->node.key = ngx_crc32_short(uscf->host.data, uscf->host.len); ngx_rbtree_insert(&umcf->rbtree, &uscf->node); ... } static ngx_int_t ngx_dyups_add_server(ngx_http_dyups_srv_conf_t *duscf, ngx_buf_t *buf) { ... ngx_dyups_parse_upstream(&cf, buf) ... } static char * ngx_dyups_parse_upstream(ngx_conf_t *cf, ngx_buf_t *buf) { ... b = *buf; ngx_memzero(&conf_file, sizeof(ngx_conf_file_t)); conf_file.file.fd = NGX_INVALID_FILE; conf_file.buffer = &b; cf->conf_file = &conf_file; return ngx_conf_parse(cf, NULL); ... }
static ngx_int_t ngx_dyups_do_delete(ngx_str_t *name, ngx_str_t *rv) { ... duscf = ngx_dyups_find_upstream(name, &dumy); // 如查出来是NULL or 被标记删除 or 彻底删除,说明要删的这个有问题 if (duscf == NULL || duscf->deleted) { ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0, "[dyups] not find upstream %V %p", name, duscf); ngx_str_set(rv, "not found uptream"); return NGX_HTTP_NOT_FOUND; } // 没问题的话就这样正常删除 ngx_dyups_mark_upstream_delete(duscf); ... }
find upstream主要做了查找、删除工作。
static ngx_http_dyups_srv_conf_t * ngx_dyups_find_upstream(ngx_str_t *name, ngx_int_t *idx) { ... dumcf = ngx_http_cycle_get_module_main_conf(ngx_cycle, ngx_http_dyups_module); duscfs = dumcf->dy_upstreams.elts; for (i = 0; i < dumcf->dy_upstreams.nelts; i++) { // 这块是在mark_upstream中被标记的 if (duscf->deleted == NGX_DYUPS_DELETING) { // 确认可以删除,主要看这个ref的引用计数 if (*(duscf->ref) == 0) { ngx_log_error(NGX_LOG_INFO, ngx_cycle->log, 0, "[dyups] free dynamic upstream in find upstream" " %ui", duscf->idx); duscf->deleted = NGX_DYUPS_DELETED; if (duscf->pool) { ngx_destroy_pool(duscf->pool); duscf->pool = NULL; } } } // 如果是deleted或者是deleting // 如果遍历完啥也没找到就返回一个deleted if (duscf->deleted == NGX_DYUPS_DELETING) { continue; } if (duscf->deleted == NGX_DYUPS_DELETED) { *idx = i; duscf_del = duscf; continue; } // 如果找到了就正常返回 if (uscf->host.len != name->len || ngx_strncasecmp(uscf->host.data, name->data, uscf->host.len) != 0) { continue; } *idx = i; return duscf; } ... }