原創(chuàng)文章,轉(zhuǎn)載請注明: 轉(zhuǎn)載自pagefault
這此主要是分析發(fā)送數(shù)據(jù)到客戶端的部分以及buffering狀態(tài)下,nginx接收upstream數(shù)據(jù)的部分,這也是upstream的最復(fù)雜的部分,這里我還是忽略了cache部分,以后我會專門寫blog來分析nginx的cache部分。
這部分的函數(shù)入口是ngx_http_upstream_send_response,這里有一個很重要的標記,那就是u->buffering,這個標記的含義就是nginx是否會盡可能多的讀取upstream的數(shù)據(jù)。如果關(guān)閉,則就是一個同步的發(fā)送,也就是接收多少,發(fā)送給客戶端多少。默認這個是打開的。也就是nginx會buf住upstream發(fā)送的數(shù)據(jù)。
不管buffering是否打開,后端發(fā)送的頭都不會被buffer,首先會發(fā)送header,然后才是body的發(fā)送,而body的發(fā)送就需要區(qū)分buffering選項了。如下圖所示:
下面這部分就是開始發(fā)送header,通過調(diào)用 ngx_http_send_header最終進入header filter的處理.
1
2
3
4
5
6
|
rc = ngx_http_send_header(r); if (rc == NGX_ERROR || rc > NGX_OK || r->post_action) { ngx_http_upstream_finalize_request(r, u, rc); return ; } |
然后就是發(fā)送body部分,這里我們先來看buffering被關(guān)閉的情況,這里有兩個要注意的回調(diào)函數(shù),分別是input_filter/input_filter_init,這個filter回調(diào)指的是對upstream發(fā)送給nginx的數(shù)據(jù)將要發(fā)送前的filter(嚴格來說是一個body filter).這里如果input_filter沒有被設(shè)置,則nginx會有默認的回調(diào).后面我們會分析這個默認的filter,以及這個filter具體是需要操作那個數(shù)據(jù)。要注意,這兩個回調(diào)都只是針對buffering被關(guān)閉的情況,而對應(yīng)buffering打開的時候的情況,有另外的hook,我們后面會分析到.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
if (!u->buffering) { //如果input_filter為空,則設(shè)置默認的filter if (u->input_filter == NULL) { u->input_filter_init = ngx_http_upstream_non_buffered_filter_init; u->input_filter = ngx_http_upstream_non_buffered_filter; u->input_filter_ctx = r; } //設(shè)置讀寫函數(shù) u->read_event_handler = ngx_http_upstream_process_non_buffered_upstream; r->write_event_handler = ngx_http_upstream_process_non_buffered_downstream; r->limit_rate = 0; //調(diào)用input filter 初始化函數(shù) if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, 0); return ; } //打開nodelay,準備將數(shù)據(jù)完全發(fā)送出去 if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) { ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c-> log , 0, "tcp_nodelay" ); tcp_nodelay = 1; if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY, ( const void *) &tcp_nodelay, sizeof ( int )) == -1) { ngx_connection_error(c, ngx_socket_errno, "setsockopt(TCP_NODELAY) failed" ); ngx_http_upstream_finalize_request(r, u, 0); return ; } c->tcp_nodelay = NGX_TCP_NODELAY_SET; } //得到將要發(fā)送的數(shù)據(jù)的大小 n = u->buffer.last - u->buffer.pos; if (n) { //注意這里,可以看到buffer被reset了。 u->buffer.last = u->buffer.pos; //設(shè)置將要發(fā)送的數(shù)據(jù)大小 u->state->response_length += n; //調(diào)用input filter if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, 0); return ; } //最終開始發(fā)送數(shù)據(jù)到downstream ngx_http_upstream_process_non_buffered_downstream(r); } else { //說明buffer是空 u->buffer.pos = u->buffer.start; u->buffer.last = u->buffer.start; //此時刷新數(shù)據(jù)到client if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, 0); return ; } //如果可讀,則繼續(xù)讀取upstream的數(shù)據(jù). if (u->peer.connection->read->ready) { ngx_http_upstream_process_non_buffered_upstream(r, u); } } return ; } |
上面的部分我們有2個函數(shù)需要詳細分析下,一個是input filter的hook,一個是ngx_http_upstream_process_non_buffered_downstream,一個個來,先是input filter的book。
u->input_filter hook主要是對upstream發(fā)送的body進行一些處理,類似body filter, 上面的分析中我們可以看到當調(diào)用u->input_filter之前將u->buffer.last重置為pos,這個做法我有些不太理解, 我的猜測是讓代碼更清晰一些,因為在u->input_filter中我們會真正更新u->buffer.last.
在u->input_filter中,主要是會分配一個chain,然后掛載到u->out_bufs上,因為最終nginx會發(fā)送u->out_bufs這個chain(后面的代碼會看到).并且u->buffer的last也會被更新,我們來看使用最多,也就是默認的u->input_filter的實現(xiàn):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
static ngx_int_t ngx_http_upstream_non_buffered_filter( void *data, ssize_t bytes) { ngx_http_request_t *r = data; ngx_buf_t *b; ngx_chain_t *cl, **ll; ngx_http_upstream_t *u; u = r->upstream; //遍歷u->out_bufs for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) { ll = &cl->next; } cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs); if (cl == NULL) { return NGX_ERROR; } *ll = cl; cl->buf->flush = 1; cl->buf->memory = 1; //取出將要發(fā)送的buffer b = &u->buffer; cl->buf->pos = b->last; //更新last b->last += bytes; cl->buf->last = b->last; cl->buf->tag = u->output.tag; //u->length表示將要發(fā)送的數(shù)據(jù)大小(content_length)如果為NGX_MAX_SIZE_T_VALUE,則說明后端協(xié)議并沒有指定需要發(fā)送的大小,此時我們只需要發(fā)送我們接收到的. if (u->length == NGX_MAX_SIZE_T_VALUE) { return NGX_OK; } //更新將要發(fā)送的數(shù)據(jù)大小 u->length -= bytes; return NGX_OK; } |
然后就是ngx_http_upstream_process_non_buffered_downstream函數(shù),這個函數(shù)用于非buffering狀態(tài)下發(fā)送數(shù)據(jù)給client,它會調(diào)用ngx_http_upstream_process_non_buffered_request來發(fā)送數(shù)據(jù),因此我們就來詳細分析這個函數(shù).
這個函數(shù)有兩個參數(shù),其中第二個do_write表示是否需要立即發(fā)送數(shù)據(jù).
主要來看這個函數(shù)的下面這部分,這部分主要是調(diào)用ngx_http_output_filter輸出給body filter,然后根據(jù)返回值來更新busy_bufs(沒有發(fā)送完畢,則保存未發(fā)送完畢的bufer到busy),可以看到和http部分的處理很類似.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
b = &u->buffer; do_write = do_write || u->length == 0; for ( ;; ) { if (do_write) { //如果u->out_bufs不為NULL則說明有需要發(fā)送的數(shù)據(jù),如果u->busy_bufs,則說明上次有未發(fā)送完畢的數(shù)據(jù). if (u->out_bufs || u->busy_bufs) { rc = ngx_http_output_filter(r, u->out_bufs); if (rc == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, 0); return ; } //更新busy chain ngx_chain_update_chains(&u->free_bufs, &u->busy_bufs, &u->out_bufs, u->output.tag); } //這里說明想要發(fā)送的數(shù)據(jù)都已經(jīng)發(fā)送完畢 if (u->busy_bufs == NULL) { //length為0,說明后端這次要發(fā)送的數(shù)據(jù)已經(jīng)發(fā)送完畢 if (u->length == 0 || upstream->read->eof || upstream->read->error) { //此時finalize request,結(jié)束這次請求 ngx_http_upstream_finalize_request(r, u, 0); return ; } //否則重置u->buffer,以便與下次使用 b->pos = b->start; b->last = b->start; } } //得到當前buf的剩余空間 size = b->end - b->last; //設(shè)置size為將要使用的buffer大小 if (size > u->length) { size = u->length; } //如果還有數(shù)據(jù)需要接受,并且upstream可讀,則讀取數(shù)據(jù) if (size && upstream->read->ready) { n = upstream->recv(upstream, b->last, size); if (n == NGX_AGAIN) { break ; } if (n > 0) { u->state->response_length += n; //再次調(diào)用input_filter,這里沒有reset u->buffer.last,這是因為我們這個值并沒有更新. if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, 0); return ; } } //設(shè)置do_write,然后發(fā)送數(shù)據(jù). do_write = 1; continue ; } break ; } |
這個函數(shù)剩下部分就很簡單了,就是掛載事件,刪除定時器等一系列操作。
然后我們來看nginx最復(fù)雜的一塊代碼,也就是使用了buffering標記的條件下,nginx如何處理.
這里有一個核心的數(shù)據(jù)結(jié)構(gòu)ngx_event_pipe_s。接下來,我們就來分析這個結(jié)構(gòu).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
struct ngx_event_pipe_s { //表示nginx和client,以及和后端的兩條連接 ngx_connection_t *upstream; ngx_connection_t *downstream; //保存了從upstream讀取的數(shù)據(jù)(沒有經(jīng)過任何處理的),以及緩存的buf. ngx_chain_t *free_raw_bufs; ngx_chain_t *in; ngx_chain_t **last_in; //buf到tempfile的數(shù)據(jù)會放到out中 ngx_chain_t *out; ngx_chain_t **last_out; ngx_chain_t * free ; ngx_chain_t *busy; /* * the input filter i.e. that moves HTTP/1.1 chunks * from the raw bufs to an incoming chain */ ngx_event_pipe_input_filter_pt input_filter; void *input_ctx; //這個filter就是輸出內(nèi)容到client的函數(shù),一般設(shè)置為ngx_chain_writer ngx_event_pipe_output_filter_pt output_filter; void *output_ctx; //一些狀態(tài)以及屬性 unsigned read:1; unsigned cacheable:1; unsigned single_buf:1; unsigned free_bufs:1; unsigned upstream_done:1; unsigned upstream_error:1; unsigned upstream_eof:1; unsigned upstream_blocked:1; unsigned downstream_done:1; unsigned downstream_error:1; unsigned cyclic_temp_file:1; //配合bufs使用,表示已經(jīng)分配了的buf的個數(shù) ngx_int_t allocated; //對應(yīng)xxx_buffers,也就是讀取后端的數(shù)據(jù)時的bufer大小以及個數(shù) ngx_bufs_t bufs; ngx_buf_tag_t tag; ssize_t busy_size; off_t read_length; //cache相關(guān),max_temp_file_size表示最大的temp file的大小,temp_file_write_size表示buf將會flush到temp file中的大小. off_t max_temp_file_size; ssize_t temp_file_write_size; //網(wǎng)絡(luò)相關(guān)的參數(shù),定時器,以及l(fā)owat ngx_msec_t read_timeout; ngx_msec_t send_timeout; ssize_t send_lowat; ngx_pool_t *pool; ngx_log_t * log ; //預(yù)讀的buf以及大小,這里預(yù)讀是指已經(jīng)從upstream讀取了的buf. ngx_chain_t *preread_bufs; size_t preread_size; //cache相關(guān)表示將要cache到文件的buf ngx_buf_t *buf_to_file; //cache相關(guān),表示temp file ngx_temp_file_t *temp_file; /* STUB */ int num; }; |
然后就是ngx_http_upstream_send_response的剩余部分,這部分主要是初始化event pipe結(jié)構(gòu).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
p = u->pipe; //設(shè)置filter,可以看到就是http的輸出filter p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter; p->output_ctx = r; p->tag = u->output.tag; //設(shè)置bufs,它就是upstream中設(shè)置的bufs p->bufs = u->conf->bufs; //busy buffers的大小 p->busy_size = u->conf->busy_buffers_size; //upstream p->upstream = u->peer.connection; p->downstream = c; p->pool = r->pool; p-> log = c-> log ; //設(shè)置是否需要cache p->cacheable = u->cacheable || u->store; //初始化temp_file p->temp_file = ngx_pcalloc(r->pool, sizeof (ngx_temp_file_t)); if (p->temp_file == NULL) { ngx_http_upstream_finalize_request(r, u, 0); return ; } p->temp_file->file.fd = NGX_INVALID_FILE; p->temp_file->file. log = c-> log ; p->temp_file->path = u->conf->temp_path; p->temp_file->pool = r->pool; if (p->cacheable) { p->temp_file->persistent = 1; } else { p->temp_file->log_level = NGX_LOG_WARN; p->temp_file->warn = "an upstream response is buffered " "to a temporary file" ; } //temp file的相關(guān)設(shè)置 p->max_temp_file_size = u->conf->max_temp_file_size; p->temp_file_write_size = u->conf->temp_file_write_size; //初始化preread bufs p->preread_bufs = ngx_alloc_chain_link(r->pool); if (p->preread_bufs == NULL) { ngx_http_upstream_finalize_request(r, u, 0); return ; } p->preread_bufs->buf = &u->buffer; p->preread_bufs->next = NULL; u->buffer.recycled = 1; p->preread_size = u->buffer.last - u->buffer.pos; //設(shè)置cache相關(guān) if (u->cacheable) { p->buf_to_file = ngx_calloc_buf(r->pool); if (p->buf_to_file == NULL) { ngx_http_upstream_finalize_request(r, u, 0); return ; } p->buf_to_file->pos = u->buffer.start; p->buf_to_file->last = u->buffer.pos; p->buf_to_file->temporary = 1; } if (ngx_event_flags & NGX_USE_AIO_EVENT) { /* the posted aio operation may currupt a shadow buffer */ p->single_buf = 1; } /* TODO: p->free_bufs = 0 if use ngx_create_chain_of_bufs() */ p->free_bufs = 1; /* * event_pipe would do u->buffer.last += p->preread_size * as though these bytes were read */ u->buffer.last = u->buffer.pos; if (u->conf->cyclic_temp_file) { /* * we need to disable the use of sendfile() if we use cyclic temp file * because the writing a new data may interfere with sendfile() * that uses the same kernel file pages (at least on FreeBSD) */ p->cyclic_temp_file = 1; c->sendfile = 0; } else { p->cyclic_temp_file = 0; } //事件相關(guān)的初始化 p->read_timeout = u->conf->read_timeout; p->send_timeout = clcf->send_timeout; p->send_lowat = clcf->send_lowat; //掛載讀寫回調(diào)函數(shù),這里注意一個是upstream的讀回調(diào),一個是r(client)的寫回調(diào) u->read_event_handler = ngx_http_upstream_process_upstream; r->write_event_handler = ngx_http_upstream_process_downstream; //進入upstream的操作 ngx_http_upstream_process_upstream(r, u); |
通過上面我們可以看到主要操作都在兩個回調(diào)函數(shù)中,一個是upstream的讀handler,一個是downstream的寫handler,我們一個個看,先來看upstream的讀handler。
這個函數(shù)首先會判斷是否超時,如果超時則設(shè)置錯誤,否則調(diào)用ngx_event_pipe進入pipe的讀處理,然后調(diào)用ngx_http_upstream_process_request對upstream進行處理,比如退出等一系列操作,因此這里最核心的函數(shù)就是ngx_event_pipe。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
static void ngx_http_upstream_process_upstream(ngx_http_request_t *r, ngx_http_upstream_t *u) { ngx_connection_t *c; c = u->peer.connection; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c-> log , 0, "http upstream process upstream" ); c-> log ->action = "reading upstream" ; //判斷超時 if (c->read->timedout) { u->pipe->upstream_error = 1; ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out" ); } else { //調(diào)用event_pipe進對讀取數(shù)據(jù)進行處理. if (ngx_event_pipe(u->pipe, 0) == NGX_ABORT) { ngx_http_upstream_finalize_request(r, u, 0); return ; } } ngx_http_upstream_process_request(r); } |
然后來看ngx_event_pipe的源代碼,ngx_event_pipe第二個參數(shù)是do_write,表示是否需要將數(shù)據(jù)立即寫到downstream,也就是client.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
ngx_int_t ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write) { u_int flags; ngx_int_t rc; ngx_event_t *rev, *wev; for ( ;; ) { //判斷是否需要將數(shù)據(jù)寫到downstream. if (do_write) { p-> log ->action = "sending to client" ; //寫數(shù)據(jù)到downstream rc = ngx_event_pipe_write_to_downstream(p); if (rc == NGX_ABORT) { return NGX_ABORT; } if (rc == NGX_BUSY) { return NGX_OK; } } p->read = 0; p->upstream_blocked = 0; p-> log ->action = "reading upstream" ; //讀取數(shù)據(jù) if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) { return NGX_ABORT; } //判斷是否需要退出循環(huán),p->read表示是否已經(jīng)讀取了upstream的數(shù)據(jù),upstream_blocked表示是否downstream可寫(后面代碼會看到這兩個變量的設(shè)置) if (!p->read && !p->upstream_blocked) { break ; } //可以看到如果讀取了數(shù)據(jù)就準備寫數(shù)據(jù)到downstream do_write = 1; } //判斷是否需要掛載讀事件 if (p->upstream->fd != -1) { rev = p->upstream->read; flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0; if (ngx_handle_read_event(rev, flags) != NGX_OK) { return NGX_ABORT; } if (rev->active && !rev->ready) { ngx_add_timer(rev, p->read_timeout); } else if (rev->timer_set) { ngx_del_timer(rev); } } //掛載寫事件。 if (p->downstream->fd != -1 && p->downstream->data == p->output_ctx) { wev = p->downstream->write; if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) { return NGX_ABORT; } if (!wev->delayed) { if (wev->active && !wev->ready) { ngx_add_timer(wev, p->send_timeout); } else if (wev->timer_set) { ngx_del_timer(wev); } } } return NGX_OK; } |
通過上面的代碼我們能看到核心的函數(shù)有這么2個,分別是ngx_event_pipe_read_upstream和ngx_event_pipe_write_to_downstream,這兩個 一個是從后端都數(shù)據(jù),一個是發(fā)送數(shù)據(jù)到前端。其中read最為復(fù)雜,因此我們先來看read回調(diào),這個函數(shù)比較長,我們一段段的來。
先來看這個函數(shù)的一個基本結(jié)構(gòu):
這個函數(shù)的主要處理都在一個for循環(huán)里面,這個for循環(huán)比較長,因此我們就來分段分析這個for循環(huán)。這個for循環(huán)的主要作用就是從后端讀取數(shù)據(jù)。
因此它首先需要做的就是分配buf,以便于從后端接收數(shù)據(jù),可是如果第一次我們接收頭的時候,多接收了一些buf,此時我們就先處理這部分buf,然后再接收新的buf.
下面這段主要是進行狀態(tài)判斷,以及當preread_bufs存在的情況的操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
//判斷upstream的狀態(tài) if (p->upstream_eof || p->upstream_error || p->upstream_done) { break ; } //如果preread_bufs為空(上面的初始化中這個buf也就是upstream讀取頭的時候,解析完頭,然后剩余的buf),并且upstream并不可讀,此時則說明對數(shù)據(jù)也沒有任何操作和讀取的必要,因此退出循環(huán). if (p->preread_bufs == NULL && !p->upstream->read->ready) { break ; } //如果preread_bufs存在 if (p->preread_bufs) { /* use the pre-read bufs if they exist */ //使用preread_bufs chain = p->preread_bufs; //可以看到設(shè)置preread_bufs為空,這樣子,下次循環(huán),則會進入另外的處理,也就是需要從upstream讀取數(shù)據(jù) p->preread_bufs = NULL; //n也就是u->buf的大小 n = p->preread_size; ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p-> log , 0, "pipe preread: %z" , n); if (n) { //設(shè)置read,也就是當前upstream中存在讀取還沒發(fā)送的數(shù)據(jù). p->read = 1; } } |
然后下面這段代碼就是當p->preread_bufs為空的情況,此時就需要從upstream來讀取數(shù)據(jù),而讀取之前則需要分配buf,以供upstream使用,因此下面這段代碼,就是用來分配buf的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
else { //首先是看free_raw_bufs是否存在,如果存在,則直接使用它。 if (p->free_raw_bufs) { /* use the free bufs if they exist */ chain = p->free_raw_bufs; if (p->single_buf) { p->free_raw_bufs = p->free_raw_bufs->next; chain->next = NULL; } else { p->free_raw_bufs = NULL; } } else if (p->allocated < p->bufs.num) { //如果free_raw_bufs不存在,并且分配的buf數(shù)量沒有超過bufs的個數(shù),此時則創(chuàng)建新的buf /* allocate a new buf if it's still allowed */ b = ngx_create_temp_buf(p->pool, p->bufs.size); if (b == NULL) { return NGX_ABORT; } p->allocated++; chain = ngx_alloc_chain_link(p->pool); if (chain == NULL) { return NGX_ABORT; } chain->buf = b; chain->next = NULL; } else if (!p->cacheable && p->downstream->data == p->output_ctx && p->downstream->write->ready && !p->downstream->write->delayed) { //如果已經(jīng)分配的bufs的個數(shù)大于預(yù)設(shè)定的個數(shù),并且沒有打開cache,而且downstream可寫,則設(shè)置upstream_blocked,準備寫數(shù)據(jù)到upstream(這個是為了發(fā)送數(shù)據(jù)之后,數(shù)據(jù)buf能夠被使用讀) p->upstream_blocked = 1; ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p-> log , 0, "pipe downstream ready" ); break ; } else if (p->cacheable || p->temp_file->offset < p->max_temp_file_size) { //到達這里有兩個情況,一個是cacheable打開,一個是當buf不夠用了,此時就會將一部分數(shù)據(jù)buf到temp file中。這個函數(shù)我下篇blog會詳細分析,這次之需要知道這個將會將數(shù)據(jù)buf到temp file,然后綁定到p->out中。 rc = ngx_event_pipe_write_chain_to_temp_file(p); ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p-> log , 0, "pipe temp offset: %O" , p->temp_file->offset); //處理返回值 if (rc == NGX_BUSY) { break ; } if (rc == NGX_AGAIN) { if (ngx_event_flags & NGX_USE_LEVEL_EVENT && p->upstream->read->active && p->upstream->read->ready) { if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0) == NGX_ERROR) { return NGX_ABORT; } } } if (rc != NGX_OK) { return rc; } //說明寫成功,此時free_raw_bufs已經(jīng)被重新賦值,也就是我們可以使用,所以類似上面free_raw_bufs存在的處理 chain = p->free_raw_bufs; if (p->single_buf) { p->free_raw_bufs = p->free_raw_bufs->next; chain->next = NULL; } else { p->free_raw_bufs = NULL; } } else { /* there are no bufs to read in */ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p-> log , 0, "no pipe bufs to read in" ); break ; } //開始從后端讀取數(shù)據(jù),可以看到數(shù)據(jù)被讀取進chain,n表示讀到的字節(jié)數(shù) n = p->upstream->recv_chain(p->upstream, chain); ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p-> log , 0, "pipe recv chain: %z" , n); //如果將chain添加到free_raw_bufs的開頭 if (p->free_raw_bufs) { chain->next = p->free_raw_bufs; } p->free_raw_bufs = chain; //設(shè)置error if (n == NGX_ERROR) { p->upstream_error = 1; return NGX_ERROR; } if (n == NGX_AGAIN) { if (p->single_buf) { ngx_event_pipe_remove_shadow_links(chain->buf); } break ; } //設(shè)置read,表示已經(jīng)讀取了數(shù)據(jù) p->read = 1; //如果返回0,則說明對端關(guān)閉了連接 if (n == 0) { p->upstream_eof = 1; break ; } } |
然后就是for循環(huán)的最后一段,到達這一段說明從后端的數(shù)據(jù)已經(jīng)讀取到了chain中,然后n為已經(jīng)讀取的數(shù)據(jù),于是開始遍歷已經(jīng)讀取的chain。
它會遍歷chain,然后調(diào)用input_filter來將buf拷貝到 p->in/last_in 中,最后將chain free掉。這里buf 會有一個shadow 的域,在遍歷chain的時候,需要將對應(yīng)buf的shadow刪除掉。
對于shadow域我是這么理解的,就是每次我們在input_filter中拷貝cl->buf的域到p->in/last_in的buf域的時候,也就是制造了一個cl->buf的影子,而他們是共享對應(yīng)的內(nèi)存。此時這兩個buf就是互相shadow的。他們的shadow域都是指向?qū)Ψ?。不過這個域我覺得并不是很必須,上次在郵件列表也看到igor說不喜歡這東西,不過不知道什么時候能刪掉它.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
//更新已經(jīng)讀取了的字節(jié)數(shù) p->read_length += n; cl = chain; p->free_raw_bufs = NULL; //開始遍歷chain, while (cl && n > 0) { //首先remove shadow buf ngx_event_pipe_remove_shadow_links(cl->buf); //得到當前的chain buf的空間大?。ㄒ驗樽x取數(shù)據(jù),是從cl->buf->last開始的) size = cl->buf->end - cl->buf->last; //如果已經(jīng)讀取的字節(jié)數(shù)大于等于chain buf,則對當前的buf進行更新。 if (n >= size) { //更新last cl->buf->last = cl->buf->end; /* STUB */ cl->buf->num = p->num++; //調(diào)用input_filter if (p->input_filter(p, cl->buf) == NGX_ERROR) { return NGX_ABORT; } //更新n/cl 最終free chain n -= size; ln = cl; cl = cl->next; ngx_free_chain(p->pool, ln); } else { //否則則說明當前的chain是最后一個chain,因此更新last,然后設(shè)置n,以便與退出循環(huán)。這里要注意,可以看到nginx并沒有調(diào)用input_filter,這是因為,nginx會盡量的使cl->buf最大情況下調(diào)用p->input_filter,不過這里會有個問題,當cl->buf沒有最大,此時后端斷開連接,這時就會少調(diào)用一次p->input_filter.不過nginx在最后會處理這個問題的。 cl->buf->last += n; n = 0; } } //如果cl還存在,則說明我們開始設(shè)置的chain,只有一部分被使用了,因此此時將這寫chain保存到free_raw_bufs中。可以看到如果chain只有一部分被使用,然后當再次循環(huán),則使用的chain會直接使用free_raw_bufs,也就是我們前一次沒有使用完全的chain if (cl) { for (ln = cl; ln->next; ln = ln->next) { /* void */ } ln->next = p->free_raw_bufs; p->free_raw_bufs = cl; } |
上面的p->input_filter我們后面再分析,先來看函數(shù)剩余的部分剩余的這一部分主要是處理free_raw_bufs,調(diào)用p->input_filter,將free_raw_bufs中的數(shù)據(jù)保存發(fā)送chain中,這個就是為了解決前面少調(diào)用一次p->input_filter的情況。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) { /* STUB */ p->free_raw_bufs->buf->num = p->num++; //調(diào)用input_filter. if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) { return NGX_ABORT; } p->free_raw_bufs = p->free_raw_bufs->next; //遍歷,然后 if (p->free_bufs && p->buf_to_file == NULL) { for (cl = p->free_raw_bufs; cl; cl = cl->next) { if (cl->buf->shadow == NULL) { ngx_pfree(p->pool, cl->buf->start); } } } } //如果cache打開,并且p->in存在(也就是有讀取的數(shù)據(jù)),則寫數(shù)據(jù)到temp file。 if (p->cacheable && p->in) { if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) { return NGX_ABORT; } } |
然后來看p->input_filter的實現(xiàn),這里我們就來分析,nginx默認實現(xiàn)的一個ngx_event_pipe_copy_input_filter,其中proxy等模塊都是調(diào)用這個filter。它主要是拷貝buf(不是buf的內(nèi)容,只是buf的屬性)到p->in或者p->last_in,這兩個域都是用來和write數(shù)據(jù)的時候交互用的。這兩個域的區(qū)別是這樣子的
p->in只能保存一個chain,而p->in這條鏈上的剩余的chain都保存在p->last_in中,這么做的原因還不太清楚,而且搜索了下代碼,last_in也沒有被使用到.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
ngx_int_t ngx_event_pipe_copy_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf) { ngx_buf_t *b; ngx_chain_t *cl; if (buf->pos == buf->last) { return NGX_OK; } //如果free存在,則從free中取得緩存的buf if (p-> free ) { cl = p-> free ; b = cl->buf; p-> free = cl->next; ngx_free_chain(p->pool, cl); } else { //否則分配buf b = ngx_alloc_buf(p->pool); if (b == NULL) { return NGX_ERROR; } } //拷貝buf的屬性 ngx_memcpy(b, buf, sizeof (ngx_buf_t)); b->shadow = buf; b->tag = p->tag; b->last_shadow = 1; b->recycled = 1; buf->shadow = b; //分配chain cl = ngx_alloc_chain_link(p->pool); if (cl == NULL) { return NGX_ERROR; } cl->buf = b; cl->next = NULL; ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p-> log , 0, "input buf #%d" , b->num); if (p->in) { *p->last_in = cl; } else { p->in = cl; } p->last_in = &cl->next; return NGX_OK; } |
這次就分析到這里,下次將會分析upstream的數(shù)據(jù)發(fā)送的部分。