多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國(guó)內(nèi)最全I(xiàn)T社區(qū)平臺(tái) 聯(lián)系我們 | 收藏本站
阿里云優(yōu)惠2
您當(dāng)前位置:首頁(yè) > 互聯(lián)網(wǎng) > nginx upstream使用及源碼解析

nginx upstream使用及源碼解析

來(lái)源:程序員人生   發(fā)布時(shí)間:2014-10-10 08:00:00 閱讀次數(shù):3055次

nginx upstream機(jī)制使得nginx可以成為一個(gè)反向代理服務(wù)器,nginx一方面從下游客戶(hù)端接收http請(qǐng)求,處理請(qǐng)求,并根據(jù)請(qǐng)求發(fā)送tcp報(bào)文到上游服務(wù)器,根據(jù)上游服務(wù)器的返回報(bào)文,來(lái)向下游客戶(hù)端發(fā)送請(qǐng)求響應(yīng)報(bào)文。
upstream機(jī)制也提供了負(fù)載分擔(dān)的功能,可以將請(qǐng)求負(fù)載分擔(dān)到集群服務(wù)器的某個(gè)服務(wù)器上面。

2.1upstream的流程介紹

1分析客戶(hù)端請(qǐng)求報(bào)文,構(gòu)建發(fā)往上游服務(wù)器的請(qǐng)求報(bào)文。
2調(diào)用ngx_http_upstream_init開(kāi)始與上游服務(wù)器建立tcp連接。
  3發(fā)送在第一步中組建的請(qǐng)求報(bào)文。
4接收來(lái)自上游服務(wù)器的響應(yīng)頭并進(jìn)行解析,往下游轉(zhuǎn)發(fā)。
5接收來(lái)自上游服務(wù)器的相應(yīng)體,進(jìn)行轉(zhuǎn)發(fā)。

在這5個(gè)階段中,upstream機(jī)制允許開(kāi)發(fā)人員自己設(shè)定相應(yīng)的處理方式,來(lái)達(dá)到自己的目的,這也是開(kāi)發(fā)人員使用upstream的方式。


2.2upstream的使用
開(kāi)發(fā)人員使用upstream機(jī)制時(shí),主要就是設(shè)置上面五個(gè)階段的處理回調(diào)函數(shù)。
以http反向代理為例:

ngx_http_proxy_handler(ngx_http_request_t *r) { : : : //設(shè)置http proxy使用到的upstream機(jī)制的各種方法 //設(shè)置創(chuàng)建請(qǐng)求報(bào)文的回調(diào)函數(shù) u->create_request = ngx_http_proxy_create_request; //設(shè)置當(dāng)鏈接失敗時(shí),需要執(zhí)行的動(dòng)作 u->reinit_request = ngx_http_proxy_reinit_request; //設(shè)置處理上游服務(wù)器的響應(yīng)頭回調(diào)函數(shù) u->process_header = ngx_http_proxy_process_status_line; //當(dāng)前無(wú)意義 u->abort_request = ngx_http_proxy_abort_request; //請(qǐng)求結(jié)束后會(huì)調(diào)用該方法 u->finalize_request = ngx_http_proxy_finalize_request; //設(shè)置upstream的buffer標(biāo)志位,為0時(shí),以下游網(wǎng)速優(yōu)先, //不會(huì)使用文件緩存響應(yīng)包體,為1時(shí),有多個(gè)buffer,并且 //可以使用文件來(lái)緩存響應(yīng)包體 u->buffering = plcf->upstream.buffering; //當(dāng)buffering為1時(shí)會(huì)使用到該pipe結(jié)構(gòu),即下游網(wǎng)速優(yōu)先,需要使用更多的buffer和臨時(shí)文件緩存響應(yīng) u->pipe = ngx_pcalloc(r->pool, sizeof(ngx_event_pipe_t)); if (u->pipe == NULL) { return NGX_HTTP_INTERNAL_SERVER_ERROR; } u->pipe->input_filter = ngx_event_pipe_copy_input_filter; u->accel = 1; //開(kāi)始讀取請(qǐng)求包體,讀取結(jié)束后,開(kāi)始調(diào)用ngx_http_upstream_init, //開(kāi)始upstream的流程 rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init); : : return NGX_DONE; }

2.3upstream源碼解析(1.0.15版本)
2.3.1構(gòu)建發(fā)往上游服務(wù)器的請(qǐng)求,建立與上游服務(wù)器的連接

主要函數(shù)是ngx_http_upstream_init_request,該函數(shù)會(huì)調(diào)用用戶(hù)注冊(cè)的請(qǐng)求構(gòu)建函數(shù)去構(gòu)建發(fā)往上游服務(wù)器的請(qǐng)求,同時(shí)將建立與上游服務(wù)器的連接。首先介紹兩個(gè)輔助函數(shù):

ngx_http_upstream_rd_check_broken_connection:該函數(shù)用來(lái)檢查nginx與客戶(hù)端之間的鏈路是否可用,

ngx_http_upstream_connect:該函數(shù)用來(lái)與上游服務(wù)器之間建立連接。


static void ngx_http_upstream_check_broken_connection(ngx_http_request_t *r, ngx_event_t *ev) { int n; char buf[1]; ngx_err_t err; ngx_int_t event; ngx_connection_t *c; ngx_http_upstream_t *u; c = r->connection; u = r->upstream; //若連接已終止的話(huà),該recv返回值會(huì)為0,MSG_PEEK表示會(huì)去 //讀取數(shù)據(jù),但不會(huì)減少接收緩存中的數(shù)據(jù),在這里讀取1 //個(gè)字節(jié),來(lái)判斷讀方向能否正常工作 n = recv(c->fd, buf, 1, MSG_PEEK); err = ngx_socket_errno; ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ev->log, err, "http upstream recv(): %d", n); //ev->write表明是寫(xiě)方向觸發(fā)的事件,讀方向能讀到數(shù)據(jù), //或者返回碼為NGX_eagain,表明應(yīng)該沒(méi)有問(wèn)題 if (ev->write && (n >= 0 || err == NGX_EAGAIN)) { return; } if ((ngx_event_flags & NGX_USE_LEVEL_EVENT) && ev->active) { event = ev->write ? NGX_WRITE_EVENT : NGX_READ_EVENT; if (ngx_del_event(ev, event, 0) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } //能用該socket讀出數(shù)據(jù),說(shuō)明連接沒(méi)有問(wèn)題 if (n > 0) { return; } //返回值為-1,但錯(cuò)誤為NGX_EAGAIN,表明recv超時(shí)時(shí)間到了 if (n == -1) { if (err == NGX_EAGAIN) { return; } //其他情況表明發(fā)生錯(cuò)誤 ev->error = 1; } else { //n=0,一般表示連接已經(jīng)結(jié)束 err = 0; } //設(shè)置事件的標(biāo)記位,標(biāo)記已經(jīng)結(jié)束了 ev->eof = 1; c->error = 1; if (!u->cacheable && u->peer.connection) { ngx_log_error(NGX_LOG_INFO, ev->log, err, "client prematurely closed connection, " "so upstream connection is closed too"); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_CLIENT_CLOSED_REQUEST); return; } ngx_log_error(NGX_LOG_INFO, ev->log, err, "client prematurely closed connection"); if (u->peer.connection == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_CLIENT_CLOSED_REQUEST); } } 2 ngx_http_upstream_connect static void ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u) { ngx_int_t rc; ngx_time_t *tp; ngx_connection_t *c; r->connection->log->action = "connecting to upstream"; r->connection->single_connection = 0; //記錄下當(dāng)前的響應(yīng)秒數(shù)和毫秒數(shù) if (u->state && u->state->response_sec) { tp = ngx_timeofday(); u->state->response_sec = tp->sec - u->state->response_sec; u->state->response_msec = tp->msec - u->state->response_msec; } u->state = ngx_array_push(r->upstream_states); if (u->state == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t)); //記錄下當(dāng)前的響應(yīng)秒數(shù)和毫秒數(shù) tp = ngx_timeofday(); u->state->response_sec = tp->sec; u->state->response_msec = tp->msec; //開(kāi)始連接上游服務(wù)器 rc = ngx_event_connect_peer(&u->peer); //printf("@@@@####rc is %d ", (int)rc); ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http upstream connect: %i", rc); if (rc == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } u->state->peer = u->peer.name; //在busy或者declined的情況下,會(huì)調(diào)用ngx_http_upstream_next,該函數(shù)會(huì) //多次嘗試調(diào)用connect試圖與上游服務(wù)器連接,多次連接失敗后, //才會(huì)調(diào)用ngx_http_upstream_finalize_request if (rc == NGX_BUSY) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no live upstreams"); ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_NOLIVE); return; } if (rc == NGX_DECLINED) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } /* rc == NGX_OK || rc == NGX_AGAIN */ c = u->peer.connection; c->data = r; //將客戶(hù)端與上游服務(wù)器的連接的讀寫(xiě)事件的處理回調(diào)設(shè)置為 //ngx_http_upstream_handler c->write->handler = ngx_http_upstream_handler; c->read->handler = ngx_http_upstream_handler; //ngx_http_upstream_handler最后會(huì)調(diào)用u->write_event_handler或者read_event_handler u->write_event_handler = ngx_http_upstream_send_request_handler; u->read_event_handler = ngx_http_upstream_process_header; c->sendfile &= r->connection->sendfile; u->output.sendfile = c->sendfile; c->pool = r->pool; c->log = r->connection->log; c->read->log = c->log; c->write->log = c->log; /* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */ u->writer.out = NULL; u->writer.last = &u->writer.out; u->writer.connection = c; u->writer.limit = 0; if (u->request_sent) { if (ngx_http_upstream_reinit(r, u) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } if (r->request_body && r->request_body->buf && r->request_body->temp_file && r == r->main) { /* * the r->request_body->buf can be reused for one request only, * the subrequests should allocate their own temporay bufs */ u->output.free = ngx_alloc_chain_link(r->pool); if (u->output.free == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } u->output.free->buf = r->request_body->buf; u->output.free->next = NULL; u->output.allocated = 1; r->request_body->buf->pos = r->request_body->buf->start; r->request_body->buf->last = r->request_body->buf->start; r->request_body->buf->tag = u->output.tag; } u->request_sent = 0; //與上游連接尚未建立起來(lái),加入定時(shí)器,返回 //當(dāng)與上游服務(wù)器連接建立成功會(huì)調(diào)用相關(guān)的處理函數(shù) if (rc == NGX_AGAIN) { ngx_add_timer(c->write, u->conf->connect_timeout); return; } #if (NGX_HTTP_SSL) if (u->ssl && c->ssl == NULL) { ngx_http_upstream_ssl_init_connection(r, u, c); return; } #endif //已經(jīng)建立連接,向上游服務(wù)器發(fā)送請(qǐng)求內(nèi)容 ngx_http_upstream_send_request(r, u); } 3 ngx_http_upstream_init_request static void ngx_http_upstream_init_request(ngx_http_request_t *r) { ngx_str_t *host; ngx_uint_t i; ngx_resolver_ctx_t *ctx, temp; ngx_http_cleanup_t *cln; ngx_http_upstream_t *u; ngx_http_core_loc_conf_t *clcf; ngx_http_upstream_srv_conf_t *uscf, **uscfp; ngx_http_upstream_main_conf_t *umcf; if (r->aio) { return; } u = r->upstream; u->store = (u->conf->store || u->conf->store_lengths); //ignore_client_abort為0標(biāo)志著需要關(guān)注nginx和客戶(hù)端的連接是否穩(wěn)定 if (!u->store && !r->post_action && !u->conf->ignore_client_abort) { r->read_event_handler = ngx_http_upstream_rd_check_broken_connection; r->write_event_handler = ngx_http_upstream_wr_check_broken_connection; } //從代碼來(lái)看,request_bufs貌似是在create_request中設(shè)置的 if (r->request_body) { u->request_bufs = r->request_body->bufs; } //調(diào)用用戶(hù)設(shè)置的create_request函數(shù) if (u->create_request(r) != NGX_OK) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } //u->conf->local中保存的是與上游服務(wù)建立連接的本地地址 u->peer.local = u->conf->local; //得到http core模塊在該loc下的配置 clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); //設(shè)置upstream向下游客戶(hù)端轉(zhuǎn)發(fā)數(shù)據(jù)的各種參數(shù),主要和 //buf相關(guān) u->output.alignment = clcf->directio_alignment; u->output.pool = r->pool; u->output.bufs.num = 1; u->output.bufs.size = clcf->client_body_buffer_size; //往下游客戶(hù)端寫(xiě)數(shù)據(jù)的接口 u->output.output_filter = ngx_chain_writer; u->output.filter_ctx = &u->writer; u->writer.pool = r->pool; if (r->upstream_states == NULL) { r->upstream_states = ngx_array_create(r->pool, 1, sizeof(ngx_http_upstream_state_t)); if (r->upstream_states == NULL) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } else { u->state = ngx_array_push(r->upstream_states); if (u->state == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t)); } //將ngx_http_upstream_cleanup函數(shù)加入到request的cleanup鏈表中, //當(dāng)request被刪除時(shí),會(huì)調(diào)用該函數(shù) cln = ngx_http_cleanup_add(r, 0); if (cln == NULL) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } //ngx_http_upstream_cleanup主要釋放resolve數(shù)據(jù)結(jié)構(gòu),執(zhí)行ngx_http_upstream_finalize cln->handler = ngx_http_upstream_cleanup; cln->data = r; u->cleanup = &cln->handler; //u->resolved中保存了用于與上游服務(wù)器建立連接的信息, //可以由開(kāi)發(fā)人員在代碼中設(shè)置,不設(shè)置的話(huà),從配置文件中 //去獲取 if (u->resolved == NULL) { uscf = u->conf->upstream; } else { //upstream中直接指定了相關(guān)的服務(wù)器地址,建立連接就ok了 if (u->resolved->sockaddr) { if (ngx_http_upstream_create_round_robin_peer(r, u->resolved) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ngx_http_upstream_connect(r, u); return; } //在這里host應(yīng)該為一個(gè)upstream組的名字 host = &u->resolved->host; umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module); uscfp = umcf->upstreams.elts; //遍歷系統(tǒng)中的upstream數(shù)組,找到匹配的upstream for (i = 0; i < umcf->upstreams.nelts; i++) { uscf = uscfp[i]; if (uscf->host.len == host->len && ((uscf->port == 0 && u->resolved->no_port) || uscf->port == u->resolved->port) && ngx_memcmp(uscf->host.data, host->data, host->len) == 0) { goto found; } } if (u->resolved->port == 0) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no port in upstream "%V"", host); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } temp.name = *host; //下面這部分需要進(jìn)行域名解析 ctx = ngx_resolve_start(clcf->resolver, &temp); if (ctx == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } if (ctx == NGX_NO_RESOLVER) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no resolver defined to resolve %V", host); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_BAD_GATEWAY); return; } //ngx_http_upstream_resolve_handler是域名解析后的回調(diào)函數(shù) ctx->name = *host; ctx->type = NGX_RESOLVE_A; ctx->handler = ngx_http_upstream_resolve_handler; ctx->data = r; ctx->timeout = clcf->resolver_timeout; u->resolved->ctx = ctx; if (ngx_resolve_name(ctx) != NGX_OK) { u->resolved->ctx = NULL; ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } return; } found: //peer.init()方法中會(huì)根據(jù)upstream的算法去選擇一個(gè)服務(wù)器,來(lái)進(jìn)行發(fā)送 //for example:us->peer.init = ngx_http_upstream_init_ip_hash_peer; if (uscf->peer.init(r, uscf) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } //與上游服務(wù)器建立連接 ngx_http_upstream_connect(r, u); }
2.3.2往上游發(fā)送請(qǐng)求 

   當(dāng)建立了與上游服務(wù)器的連接后,就會(huì)向上游服務(wù)器發(fā)送請(qǐng)求,主要函數(shù)是ngx_http_upstream_send_request。

static void ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u) { ngx_int_t rc; ngx_connection_t *c; //peer.connection中是nginx與上游服務(wù)器建立的connection c = u->peer.connection; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http upstream send request"); if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } c->log->action = "sending request to upstream"; //通過(guò)ngx_output_chain向上游服務(wù)器發(fā)送請(qǐng)求報(bào)文,request_sent //用來(lái)表示是否已經(jīng)發(fā)送請(qǐng)求頭了,發(fā)送了的話(huà),繼續(xù)發(fā)送 //剩余未發(fā)的就OK了,剩余未發(fā)送的數(shù)據(jù)保存在了u->output里面 rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs); //設(shè)置request_sent標(biāo)志,表明已經(jīng)發(fā)送過(guò)請(qǐng)求 u->request_sent = 1; if (rc == NGX_ERROR) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } //若寫(xiě)事件已經(jīng)被加入到了定時(shí)器中,刪除它,為后面的 //添加做準(zhǔn)備 if (c->write->timer_set) { ngx_del_timer(c->write); } //NGX_AGAIN表明數(shù)據(jù)尚未發(fā)送完畢,需要將其加入到定時(shí)器中 //當(dāng)發(fā)送事件觸發(fā)時(shí),會(huì)繼續(xù)調(diào)用該函數(shù)。 if (rc == NGX_AGAIN) { ngx_add_timer(c->write, u->conf->send_timeout); //主要是設(shè)置發(fā)送緩存的事件喚醒下限 if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } return; } /* rc == NGX_OK */ if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) { if (ngx_tcp_push(c->fd) == NGX_ERROR) { ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno, ngx_tcp_push_n " failed"); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } c->tcp_nopush = NGX_TCP_NOPUSH_UNSET; } //數(shù)據(jù)發(fā)送成功,添加一個(gè)讀事件定時(shí)器 ngx_add_timer(c->read, u->conf->read_timeout); #if 1 //寫(xiě)事件已經(jīng)發(fā)出,判斷讀事件是否ready if (c->read->ready) { /* post aio operation */ /* * TODO comment * although we can post aio operation just in the end * of ngx_http_upstream_connect() CHECK IT !!! * it's better to do here because we postpone header buffer allocation */ //讀事件已經(jīng)ready了,處理返回的報(bào)文頭 ngx_http_upstream_process_header(r, u); return; } #endif //將寫(xiě)事件處理函數(shù)置為dummy的話(huà),表明在讀完相應(yīng)之前,不允許 //接著寫(xiě)了 u->write_event_handler = ngx_http_upstream_dummy_handler; if (ngx_handle_write_event(c->write, 0) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } }

2.3.3處理上游服務(wù)器返回的回應(yīng)頭部
往上游服務(wù)器發(fā)送完請(qǐng)求后,就要等待著處理服務(wù)器的回應(yīng)了,首先會(huì)去處理服務(wù)器發(fā)回的響應(yīng)頭。處理函數(shù)是ngx_http_upstream_process_header.

static void ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u) { ssize_t n; ngx_int_t rc; ngx_connection_t *c; c = u->peer.connection; c->log->action = "reading response header from upstream"; //讀事件超時(shí) if (c->read->timedout) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT); return; } //測(cè)試與upstream服務(wù)器的連通性 if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } //尚未實(shí)質(zhì)分配數(shù)據(jù)緩沖區(qū) if (u->buffer.start == NULL) { //分配數(shù)據(jù)緩沖區(qū) u->buffer.start = ngx_palloc(r->pool, u->conf->buffer_size); if (u->buffer.start == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } //對(duì)緩沖區(qū)描述符進(jìn)行初始化 u->buffer.pos = u->buffer.start; u->buffer.last = u->buffer.start; u->buffer.end = u->buffer.start + u->conf->buffer_size; u->buffer.temporary = 1; u->buffer.tag = u->output.tag; //為收到的請(qǐng)求頭們創(chuàng)建ngx_list結(jié)構(gòu),用來(lái)存貯解析到的 //請(qǐng)求頭的名值對(duì) if (ngx_list_init(&u->headers_in.headers, r->pool, 8, sizeof(ngx_table_elt_t)) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } //準(zhǔn)備接受數(shù)據(jù)吧!!!狂奔的小怪獸 for ( ;; ) { n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last); //數(shù)據(jù)尚未接收完畢 if (n == NGX_AGAIN) { if (ngx_handle_read_event(c->read, 0) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } return; } //返回值為0,標(biāo)志upstream服務(wù)器關(guān)閉了連接 if (n == 0) { ngx_log_error(NGX_LOG_ERR, c->log, 0, "upstream prematurely closed connection"); } if (n == NGX_ERROR || n == 0) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } u->buffer.last += n; //處理接收到響應(yīng)頭數(shù)據(jù) rc = u->process_header(r); //響應(yīng)頭尚未接收完畢 if (rc == NGX_AGAIN) { //buffer已經(jīng)滿(mǎn)了,無(wú)法容納更多的響應(yīng)頭部 if (u->buffer.last == u->buffer.end) { ngx_log_error(NGX_LOG_ERR, c->log, 0, "upstream sent too big header"); ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER); return; } continue; } break; } //解析到了無(wú)效錯(cuò)誤頭,真真苦逼啊 if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER); return; } if (rc == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } /* rc == NGX_OK */ //頭部處理完畢,頭部返回碼大于300 if (u->headers_in.status_n > NGX_HTTP_SPECIAL_RESPONSE) { if (r->subrequest_in_memory) { u->buffer.last = u->buffer.pos; } if (ngx_http_upstream_test_next(r, u) == NGX_OK) { return; } //處理錯(cuò)誤碼大于300的錯(cuò)誤情況,比如404錯(cuò)誤, //頁(yè)面沒(méi)找到 if (ngx_http_upstream_intercept_errors(r, u) == NGX_OK) { return; } } //對(duì)u->headers_in中的頭部進(jìn)行處理過(guò)濾,把u->headers_in中的 //各個(gè)頭部信息挪到r->headers_out里面,以便于發(fā)送 if (ngx_http_upstream_process_headers(r, u) != NGX_OK) { return; } //不是子請(qǐng)求,需要轉(zhuǎn)發(fā)響應(yīng)體 if (!r->subrequest_in_memory) { //調(diào)用該函數(shù),先轉(zhuǎn)發(fā)響應(yīng)頭,再轉(zhuǎn)發(fā)響應(yīng)體 ngx_http_upstream_send_response(r, u); return; } /* subrequest content in memory */ //以下為子請(qǐng)求的處理流程,當(dāng)子請(qǐng)求的input_filter未設(shè)置時(shí), //其默認(rèn)的input_filter方法為ngx_http_upstream_non_buffered_filter, //即不轉(zhuǎn)發(fā)收到的響應(yīng) if (u->input_filter == NULL) { u->input_filter_init = ngx_http_upstream_non_buffered_filter_init; u->input_filter = ngx_http_upstream_non_buffered_filter; u->input_filter_ctx = r; } if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } //buffer.last和buffer.pos之間是多余的包體 n = u->buffer.last - u->buffer.pos; //下面對(duì)這段頭部以外的包體進(jìn)行處理 if (n) { u->buffer.last -= n; u->state->response_length += n; if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } //表明包體已經(jīng)全部處理完畢,可以結(jié)束請(qǐng)求類(lèi) if (u->length == 0) { ngx_http_upstream_finalize_request(r, u, 0); return; } } //設(shè)置接收事件的處理函數(shù) u->read_event_handler = ngx_http_upstream_process_body_in_memory; //在該函數(shù)中調(diào)用u->input_filter對(duì)后續(xù)包體進(jìn)行處理, //該函數(shù)是針對(duì)子請(qǐng)求來(lái)說(shuō)的,不轉(zhuǎn)發(fā)包體,在內(nèi)存中 //對(duì)包體進(jìn)行處理 ngx_http_upstream_process_body_in_memory(r, u);

2.3.4處理響應(yīng)包體
處理完返回的響應(yīng)頭就要處理響應(yīng)包體了,處理響應(yīng)包體比較復(fù)雜,在子請(qǐng)求的情況下,不用轉(zhuǎn)發(fā)響應(yīng)包體,處理一下就可以了,在upstream模式下,需要轉(zhuǎn)發(fā)接收到的請(qǐng)求,這時(shí)有下游網(wǎng)速優(yōu)先和上游網(wǎng)速優(yōu)先兩種,下游網(wǎng)速優(yōu)先,假設(shè)下游網(wǎng)速比上游快,因此分配了一塊固定大小的buffer緩沖區(qū)去接收數(shù)據(jù),同時(shí)進(jìn)行轉(zhuǎn)發(fā),上游網(wǎng)速優(yōu)先,假設(shè)上游網(wǎng)速比下游快,因此需要使用多塊buffer緩沖區(qū)去緩存數(shù)據(jù),同時(shí)必要時(shí),使用臨時(shí)文件來(lái)緩存接收到的數(shù)據(jù)。
ngx_http_upstream_process_body_in_memory:該函數(shù)用來(lái)處理子請(qǐng)求的情形,不轉(zhuǎn)發(fā)響應(yīng)包體。
ngx_http_upstream_send_response:該函數(shù)用來(lái)處理轉(zhuǎn)發(fā)響應(yīng)包體的情形,該函數(shù)會(huì)轉(zhuǎn)發(fā)響應(yīng)頭和響應(yīng)體,轉(zhuǎn)發(fā)響應(yīng)體時(shí)同時(shí)考慮了上游網(wǎng)速優(yōu)先和下游網(wǎng)速優(yōu)先兩種情況。

1ngx_http_upstream_process_body_in_memory static void ngx_http_upstream_process_body_in_memory(ngx_http_request_t *r, ngx_http_upstream_t *u) { size_t size; ssize_t n; ngx_buf_t *b; ngx_event_t *rev; ngx_connection_t *c; //c是nginx與upstream上游服務(wù)器之間建立的連接 c = u->peer.connection; rev = c->read; //讀事件超時(shí),結(jié)束upstream if (rev->timedout) { return; } //u->buffer用來(lái)保存讀取的數(shù)據(jù) b = &u->buffer; for ( ;; ) { size = b->end - b->last; if (size == 0) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } //讀取相關(guān)數(shù)據(jù)到u->buffer中 n = c->recv(c, b->last, size); //沒(méi)有數(shù)據(jù)可讀了,等待下一次處理 if (n == NGX_AGAIN) { break; } //對(duì)端已經(jīng)結(jié)束了該連接或者發(fā)生了錯(cuò)誤 if (n == 0 || n == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, n); return; } //response_length記錄了已接收相應(yīng)的長(zhǎng)度 u->state->response_length += n; //對(duì)接收到的數(shù)據(jù)進(jìn)行處理,一般子請(qǐng)求會(huì)重置該方法, //未設(shè)置的話(huà),則會(huì)默認(rèn)為ngx_http_upstream_non_buffered_filter,該 //方法僅僅是設(shè)置下該buffer以便繼續(xù)接收數(shù)據(jù) if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } //接收方向未ready退出 if (!rev->ready) { break; } } //設(shè)置讀事件 if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } if (rev->active) { ngx_add_timer(rev, u->conf->read_timeout); } else if (rev->timer_set) { ngx_del_timer(rev); } } 2 ngx_http_upstream_send_response 該函數(shù)會(huì)往客戶(hù)端發(fā)送響應(yīng)頭及轉(zhuǎn)發(fā)響應(yīng)體,根據(jù)不同的設(shè)置來(lái)調(diào)用不同的包體轉(zhuǎn)發(fā)。 static void ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u) { int tcp_nodelay; ssize_t n; ngx_int_t rc; ngx_event_pipe_t *p; ngx_connection_t *c; ngx_http_core_loc_conf_t *clcf; //發(fā)送回應(yīng)頭部,回應(yīng)頭部存放在request的headers_in里面, //在這里,有可能頭部沒(méi)有發(fā)送完畢,沒(méi)關(guān)系,未發(fā)送 //完的數(shù)據(jù)在request的out鏈表里面放著呢,接著處理下面的 //響應(yīng)包體即可 rc = ngx_http_send_header(r); if (rc == NGX_ERROR || rc > NGX_OK || r->post_action) { ngx_http_upstream_finalize_request(r, u, rc); return; } //c是客戶(hù)端與nginx之間建立的連接 c = r->connection; if (r->header_only) { if (u->cacheable || u->store) { if (ngx_shutdown_socket(c->fd, NGX_WRITE_SHUTDOWN) == -1) { ngx_connection_error(c, ngx_socket_errno, ngx_shutdown_socket_n " failed"); } r->read_event_handler = ngx_http_request_empty_handler; r->write_event_handler = ngx_http_request_empty_handler; c->error = 1; } else { ngx_http_upstream_finalize_request(r, u, rc); return; } } //將header_sent置位,表示響應(yīng)頭部已經(jīng)發(fā)送了 u->header_sent = 1; //請(qǐng)求中帶有包體,且包體被保存在了臨時(shí)文件里面, //現(xiàn)在這些臨時(shí)文件沒(méi)有用了,可以清理掉了,OK, //畢竟,服務(wù)器的回應(yīng)都來(lái)了,應(yīng)該沒(méi)問(wèn)題了 if (r->request_body && r->request_body->temp_file) { ngx_pool_run_cleanup_file(r->pool, r->request_body->temp_file->file.fd); r->request_body->temp_file->file.fd = NGX_INVALID_FILE; } //獲得http core在該loc下的配置 clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); //u->buffering為0表示下游網(wǎng)速優(yōu)先,不需要開(kāi)辟更多的緩存區(qū) //來(lái)存放相關(guān)回應(yīng)報(bào)文 if (!u->buffering) { //未設(shè)置input_filter的話(huà),設(shè)置默認(rèn)的處理函數(shù),input_filter是對(duì) //在buffer中接收到的數(shù)據(jù)進(jìn)行相應(yīng)處理,感覺(jué)主要有兩個(gè)功能 //一是把相關(guān)buffer掛到out鏈表,一是對(duì)內(nèi)容進(jìn)行過(guò)濾 if (u->input_filter == NULL) { //啥都不做 u->input_filter_init = ngx_http_upstream_non_buffered_filter_init; //該函數(shù)試圖在buffer中緩存所有的數(shù)據(jù),會(huì)操作設(shè)置ngx_buf中的 //各個(gè)字段 u->input_filter = ngx_http_upstream_non_buffered_filter; u->input_filter_ctx = r; } //設(shè)置upstream讀事件的處理回調(diào)函數(shù) u->read_event_handler = ngx_http_upstream_process_non_buffered_upstream; //設(shè)置request寫(xiě)事件的處理回調(diào)函數(shù) r->write_event_handler = ngx_http_upstream_process_non_buffered_downstream; r->limit_rate = 0; //調(diào)用input_filter之前進(jìn)行初始化 if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, 0); return; } if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) { ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay"); tcp_nodelay = 1; if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY, (const void *) &tcp_nodelay, sizeof(int)) == -1) { ngx_connection_error(c, ngx_socket_errno, "setsockopt(TCP_NODELAY) failed"); ngx_http_upstream_finalize_request(r, u, 0); return; } c->tcp_nodelay = NGX_TCP_NODELAY_SET; } //buffer.last與buffer.pos之間是剩余未被處理的數(shù)據(jù) n = u->buffer.last - u->buffer.pos; //n>0,說(shuō)明buffer中有未被轉(zhuǎn)發(fā)的響應(yīng)包體 if (n) { //在這里設(shè)置該last是為了在input_filter中處理時(shí),對(duì)其 //進(jìn)行重置 u->buffer.last = u->buffer.pos; //將響應(yīng)包體的長(zhǎng)度加上n u->state->response_length += n; //在input_filter中處理此次接收到的數(shù)據(jù) if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, 0); return; } //在該函數(shù)中,開(kāi)始向下游客戶(hù)端發(fā)送響應(yīng)包體, //發(fā)送完數(shù)據(jù)還會(huì)從上游接收包體 ngx_http_upstream_process_non_buffered_downstream(r); } else { //該buffer中目前僅有頭部,沒(méi)有回應(yīng)包體,那下次 //從頭部接收就可以了 u->buffer.pos = u->buffer.start; u->buffer.last = u->buffer.start; if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, 0); return; } //有數(shù)據(jù)可以進(jìn)行處理,處理上游數(shù)據(jù),在該函數(shù)中 //收完上游包體也會(huì)往下游發(fā)送相應(yīng)。 if (u->peer.connection->read->ready) { ngx_http_upstream_process_non_buffered_upstream(r, u); } } return; } /* TODO: preallocate event_pipe bufs, look "Content-Length" */ //下面這部分是buffer為1的情況,該情況允許nginx使用更多的buffer //去緩存包體數(shù)據(jù),或者使用文件來(lái)進(jìn)行緩存 p = u->pipe; //對(duì)pipe結(jié)構(gòu)進(jìn)行初始化,該結(jié)構(gòu)專(zhuān)用于上游網(wǎng)速優(yōu)先的情況 //設(shè)置向下游發(fā)送響應(yīng)的調(diào)用函數(shù) p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter; p->output_ctx = r; p->tag = u->output.tag; //設(shè)置可以使用的緩沖區(qū)的個(gè)數(shù) p->bufs = u->conf->bufs; //設(shè)置busy緩沖區(qū)中待發(fā)送的響應(yīng)長(zhǎng)度觸發(fā)值 p->busy_size = u->conf->busy_buffers_size; p->upstream = u->peer.connection; p->downstream = c; p->pool = r->pool; p->log = c->log; p->cache
生活不易,碼農(nóng)辛苦
如果您覺(jué)得本網(wǎng)站對(duì)您的學(xué)習(xí)有所幫助,可以手機(jī)掃描二維碼進(jìn)行捐贈(zèng)
程序員人生
------分隔線(xiàn)----------------------------
分享到:
------分隔線(xiàn)----------------------------
關(guān)閉
程序員人生
主站蜘蛛池模板: 在线观看视频网站 | 三级在线国产 | 国产精品视_精品国产免费 国产精品视频1区 | 色午夜影院 | 国产精品新婚门 | 伊人久久大香现线蕉 | 亚洲成人xxx | 免费一级毛片免费播放 | 国产欧美亚洲精品 | 老司机免费福利视频 | 国产一区三区二区中文在线 | 亚洲综合国产一区二区三区 | 免费观看欧美成人1314色 | 国产上床视频 | 久久丝袜精品综合网站 | 91国内揄拍国内精品对白不卡 | 欧美性video高清精品 | 网站视频大片www | 亚洲欧美一区二区三区另类 | 久久久久久综合 | 日本一区二区不卡视频 | 国产一级第一级毛片 | 最猛黑人xxxⅹ黑人猛交 | 美国美女一级毛片免费全 | 色综合欧美 | 亚洲综合第一欧美日韩中文 | 91亚洲精品一区二区福利 | 亚洲国产成人久久一区二区三区 | 亚洲精品国产男人的天堂 | 国产成人在线视频观看 | 国产精品亚洲精品日韩己满十八小 | 伊人亚洲影院 | 亚洲国产欧美在线人成 | 国产一区二区在线 |播放 | 精品一久久香蕉国产二月 | 日本精品一区二区三区在线观看 | 久久受www免费人成看片 | 国产精品无码久久综合网 | 伊人丁香婷婷综合一区二区 | 视频一区二区精品的福利 | 国产精品美乳免费看 |