一区二区三区日韩精品-日韩经典一区二区三区-五月激情综合丁香婷婷-欧美精品中文字幕专区

分享

nginx中upstream的設(shè)計和實現(xiàn)

 syden1981 2011-08-21

nginx中upstream的設(shè)計和實現(xiàn)(四)

WP Greet Box icon
Hello there! If you are new here, you might want to subscribe to the RSS feed for updates on this topic.

原創(chuàng)文章,轉(zhuǎn)載請注明: 轉(zhuǎn)載自pagefault

本文鏈接地址: nginx中upstream的設(shè)計和實現(xiàn)(四)

這此主要是分析發(fā)送數(shù)據(jù)到客戶端的部分以及buffering狀態(tài)下,nginx接收upstream數(shù)據(jù)的部分,這也是upstream的最復(fù)雜的部分,這里我還是忽略了cache部分,以后我會專門寫blog來分析nginx的cache部分。

這部分的函數(shù)入口是ngx_http_upstream_send_response,這里有一個很重要的標記,那就是u->buffering,這個標記的含義就是nginx是否會盡可能多的讀取upstream的數(shù)據(jù)。如果關(guān)閉,則就是一個同步的發(fā)送,也就是接收多少,發(fā)送給客戶端多少。默認這個是打開的。也就是nginx會buf住upstream發(fā)送的數(shù)據(jù)。

不管buffering是否打開,后端發(fā)送的頭都不會被buffer,首先會發(fā)送header,然后才是body的發(fā)送,而body的發(fā)送就需要區(qū)分buffering選項了。如下圖所示:

upstream_ac

下面這部分就是開始發(fā)送header,通過調(diào)用 ngx_http_send_header最終進入header filter的處理.

1
2
3
4
5
6
rc = ngx_http_send_header(r);
 
if (rc == NGX_ERROR || rc > NGX_OK || r->post_action) {
    ngx_http_upstream_finalize_request(r, u, rc);
    return;
}

然后就是發(fā)送body部分,這里我們先來看buffering被關(guān)閉的情況,這里有兩個要注意的回調(diào)函數(shù),分別是input_filter/input_filter_init,這個filter回調(diào)指的是對upstream發(fā)送給nginx的數(shù)據(jù)將要發(fā)送前的filter(嚴格來說是一個body filter).這里如果input_filter沒有被設(shè)置,則nginx會有默認的回調(diào).后面我們會分析這個默認的filter,以及這個filter具體是需要操作那個數(shù)據(jù)。要注意,這兩個回調(diào)都只是針對buffering被關(guān)閉的情況,而對應(yīng)buffering打開的時候的情況,有另外的hook,我們后面會分析到.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
    if (!u->buffering) {
//如果input_filter為空,則設(shè)置默認的filter
        if (u->input_filter == NULL) {
            u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
            u->input_filter = ngx_http_upstream_non_buffered_filter;
            u->input_filter_ctx = r;
        }
//設(shè)置讀寫函數(shù)
        u->read_event_handler = ngx_http_upstream_process_non_buffered_upstream;
        r->write_event_handler =
                             ngx_http_upstream_process_non_buffered_downstream;
  
        r->limit_rate = 0;
//調(diào)用input filter 初始化函數(shù)
        if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u, 0);
            return;
        }
//打開nodelay,準備將數(shù)據(jù)完全發(fā)送出去
        if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
            ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay");
  
            tcp_nodelay = 1;
  
            if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY,
                               (const void *) &tcp_nodelay, sizeof(int)) == -1)
            {
                ngx_connection_error(c, ngx_socket_errno,
                                     "setsockopt(TCP_NODELAY) failed");
                ngx_http_upstream_finalize_request(r, u, 0);
                return;
            }
  
            c->tcp_nodelay = NGX_TCP_NODELAY_SET;
        }
//得到將要發(fā)送的數(shù)據(jù)的大小
        n = u->buffer.last - u->buffer.pos;
  
        if (n) {
//注意這里,可以看到buffer被reset了。
            u->buffer.last = u->buffer.pos;
//設(shè)置將要發(fā)送的數(shù)據(jù)大小
            u->state->response_length += n;
//調(diào)用input filter
            if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
                ngx_http_upstream_finalize_request(r, u, 0);
                return;
            }
//最終開始發(fā)送數(shù)據(jù)到downstream
            ngx_http_upstream_process_non_buffered_downstream(r);
  
        } else {
//說明buffer是空
            u->buffer.pos = u->buffer.start;
            u->buffer.last = u->buffer.start;
//此時刷新數(shù)據(jù)到client
            if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) {
                ngx_http_upstream_finalize_request(r, u, 0);
                return;
            }
//如果可讀,則繼續(xù)讀取upstream的數(shù)據(jù).
            if (u->peer.connection->read->ready) {
                ngx_http_upstream_process_non_buffered_upstream(r, u);
            }
        }
  
        return;
    }

上面的部分我們有2個函數(shù)需要詳細分析下,一個是input filter的hook,一個是ngx_http_upstream_process_non_buffered_downstream,一個個來,先是input filter的book。

u->input_filter hook主要是對upstream發(fā)送的body進行一些處理,類似body filter, 上面的分析中我們可以看到當調(diào)用u->input_filter之前將u->buffer.last重置為pos,這個做法我有些不太理解, 我的猜測是讓代碼更清晰一些,因為在u->input_filter中我們會真正更新u->buffer.last.

在u->input_filter中,主要是會分配一個chain,然后掛載到u->out_bufs上,因為最終nginx會發(fā)送u->out_bufs這個chain(后面的代碼會看到).并且u->buffer的last也會被更新,我們來看使用最多,也就是默認的u->input_filter的實現(xiàn):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
static ngx_int_t
ngx_http_upstream_non_buffered_filter(void *data, ssize_t bytes)
{
    ngx_http_request_t  *r = data;
  
    ngx_buf_t            *b;
    ngx_chain_t          *cl, **ll;
    ngx_http_upstream_t  *u;
  
    u = r->upstream;
//遍歷u->out_bufs
    for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
        ll = &cl->next;
    }
  
    cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);
    if (cl == NULL) {
        return NGX_ERROR;
    }
  
    *ll = cl;
  
    cl->buf->flush = 1;
    cl->buf->memory = 1;
//取出將要發(fā)送的buffer
    b = &u->buffer;
  
    cl->buf->pos = b->last;
//更新last
    b->last += bytes;
    cl->buf->last = b->last;
    cl->buf->tag = u->output.tag;
//u->length表示將要發(fā)送的數(shù)據(jù)大小(content_length)如果為NGX_MAX_SIZE_T_VALUE,則說明后端協(xié)議并沒有指定需要發(fā)送的大小,此時我們只需要發(fā)送我們接收到的.
    if (u->length == NGX_MAX_SIZE_T_VALUE) {
        return NGX_OK;
    }
//更新將要發(fā)送的數(shù)據(jù)大小
    u->length -= bytes;
  
    return NGX_OK;
}

然后就是ngx_http_upstream_process_non_buffered_downstream函數(shù),這個函數(shù)用于非buffering狀態(tài)下發(fā)送數(shù)據(jù)給client,它會調(diào)用ngx_http_upstream_process_non_buffered_request來發(fā)送數(shù)據(jù),因此我們就來詳細分析這個函數(shù).
這個函數(shù)有兩個參數(shù),其中第二個do_write表示是否需要立即發(fā)送數(shù)據(jù).

主要來看這個函數(shù)的下面這部分,這部分主要是調(diào)用ngx_http_output_filter輸出給body filter,然后根據(jù)返回值來更新busy_bufs(沒有發(fā)送完畢,則保存未發(fā)送完畢的bufer到busy),可以看到和http部分的處理很類似.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
    b = &u->buffer;
  
    do_write = do_write || u->length == 0;
  
    for ( ;; ) {
  
        if (do_write) {
//如果u->out_bufs不為NULL則說明有需要發(fā)送的數(shù)據(jù),如果u->busy_bufs,則說明上次有未發(fā)送完畢的數(shù)據(jù).
            if (u->out_bufs || u->busy_bufs) {
                rc = ngx_http_output_filter(r, u->out_bufs);
  
                if (rc == NGX_ERROR) {
                    ngx_http_upstream_finalize_request(r, u, 0);
                    return;
                }
//更新busy chain
                ngx_chain_update_chains(&u->free_bufs, &u->busy_bufs,
                                        &u->out_bufs, u->output.tag);
            }
//這里說明想要發(fā)送的數(shù)據(jù)都已經(jīng)發(fā)送完畢
            if (u->busy_bufs == NULL) {
//length為0,說明后端這次要發(fā)送的數(shù)據(jù)已經(jīng)發(fā)送完畢
                if (u->length == 0
                    || upstream->read->eof
                    || upstream->read->error)
                {
//此時finalize request,結(jié)束這次請求
                    ngx_http_upstream_finalize_request(r, u, 0);
                    return;
                }
//否則重置u->buffer,以便與下次使用
                b->pos = b->start;
                b->last = b->start;
            }
        }
//得到當前buf的剩余空間
        size = b->end - b->last;
//設(shè)置size為將要使用的buffer大小
        if (size > u->length) {
            size = u->length;
        }
//如果還有數(shù)據(jù)需要接受,并且upstream可讀,則讀取數(shù)據(jù)
        if (size && upstream->read->ready) {
  
            n = upstream->recv(upstream, b->last, size);
  
            if (n == NGX_AGAIN) {
                break;
            }
  
            if (n > 0) {
                u->state->response_length += n;
//再次調(diào)用input_filter,這里沒有reset u->buffer.last,這是因為我們這個值并沒有更新.
                if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
                    ngx_http_upstream_finalize_request(r, u, 0);
                    return;
                }
            }
//設(shè)置do_write,然后發(fā)送數(shù)據(jù).
            do_write = 1;
  
            continue;
        }
  
        break;
    }

這個函數(shù)剩下部分就很簡單了,就是掛載事件,刪除定時器等一系列操作。

然后我們來看nginx最復(fù)雜的一塊代碼,也就是使用了buffering標記的條件下,nginx如何處理.
這里有一個核心的數(shù)據(jù)結(jié)構(gòu)ngx_event_pipe_s。接下來,我們就來分析這個結(jié)構(gòu).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
struct ngx_event_pipe_s {
//表示nginx和client,以及和后端的兩條連接
    ngx_connection_t  *upstream;
    ngx_connection_t  *downstream;
//保存了從upstream讀取的數(shù)據(jù)(沒有經(jīng)過任何處理的),以及緩存的buf.
    ngx_chain_t       *free_raw_bufs;
    ngx_chain_t       *in;
    ngx_chain_t      **last_in;
//buf到tempfile的數(shù)據(jù)會放到out中
    ngx_chain_t       *out;
    ngx_chain_t      **last_out;
  
    ngx_chain_t       *free;
    ngx_chain_t       *busy;
  
    /*
     * the input filter i.e. that moves HTTP/1.1 chunks
     * from the raw bufs to an incoming chain
     */
  
    ngx_event_pipe_input_filter_pt    input_filter;
    void                             *input_ctx;
//這個filter就是輸出內(nèi)容到client的函數(shù),一般設(shè)置為ngx_chain_writer
    ngx_event_pipe_output_filter_pt   output_filter;
    void                             *output_ctx;
//一些狀態(tài)以及屬性
    unsigned           read:1;
    unsigned           cacheable:1;
    unsigned           single_buf:1;
    unsigned           free_bufs:1;
    unsigned           upstream_done:1;
    unsigned           upstream_error:1;
    unsigned           upstream_eof:1;
    unsigned           upstream_blocked:1;
    unsigned           downstream_done:1;
    unsigned           downstream_error:1;
    unsigned           cyclic_temp_file:1;
//配合bufs使用,表示已經(jīng)分配了的buf的個數(shù)
    ngx_int_t          allocated;
//對應(yīng)xxx_buffers,也就是讀取后端的數(shù)據(jù)時的bufer大小以及個數(shù)
    ngx_bufs_t         bufs;
    ngx_buf_tag_t      tag;
  
    ssize_t            busy_size;
  
    off_t              read_length;
//cache相關(guān),max_temp_file_size表示最大的temp file的大小,temp_file_write_size表示buf將會flush到temp file中的大小.
    off_t              max_temp_file_size;
    ssize_t            temp_file_write_size;
//網(wǎng)絡(luò)相關(guān)的參數(shù),定時器,以及l(fā)owat
    ngx_msec_t         read_timeout;
    ngx_msec_t         send_timeout;
    ssize_t            send_lowat;
  
    ngx_pool_t        *pool;
    ngx_log_t         *log;
//預(yù)讀的buf以及大小,這里預(yù)讀是指已經(jīng)從upstream讀取了的buf.
    ngx_chain_t       *preread_bufs;
    size_t             preread_size;
//cache相關(guān)表示將要cache到文件的buf
    ngx_buf_t         *buf_to_file;
//cache相關(guān),表示temp file
    ngx_temp_file_t   *temp_file;
  
    /* STUB */ int     num;
};

然后就是ngx_http_upstream_send_response的剩余部分,這部分主要是初始化event pipe結(jié)構(gòu).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
    p = u->pipe;
//設(shè)置filter,可以看到就是http的輸出filter
    p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter;
    p->output_ctx = r;
    p->tag = u->output.tag;
//設(shè)置bufs,它就是upstream中設(shè)置的bufs
    p->bufs = u->conf->bufs;
//busy buffers的大小
    p->busy_size = u->conf->busy_buffers_size;
//upstream
    p->upstream = u->peer.connection;
    p->downstream = c;
    p->pool = r->pool;
    p->log = c->log;
//設(shè)置是否需要cache
    p->cacheable = u->cacheable || u->store;
//初始化temp_file
    p->temp_file = ngx_pcalloc(r->pool, sizeof(ngx_temp_file_t));
    if (p->temp_file == NULL) {
        ngx_http_upstream_finalize_request(r, u, 0);
        return;
    }
  
    p->temp_file->file.fd = NGX_INVALID_FILE;
    p->temp_file->file.log = c->log;
    p->temp_file->path = u->conf->temp_path;
    p->temp_file->pool = r->pool;
  
    if (p->cacheable) {
        p->temp_file->persistent = 1;
  
    } else {
        p->temp_file->log_level = NGX_LOG_WARN;
        p->temp_file->warn = "an upstream response is buffered "
                             "to a temporary file";
    }
//temp file的相關(guān)設(shè)置
    p->max_temp_file_size = u->conf->max_temp_file_size;
    p->temp_file_write_size = u->conf->temp_file_write_size;
//初始化preread bufs
    p->preread_bufs = ngx_alloc_chain_link(r->pool);
    if (p->preread_bufs == NULL) {
        ngx_http_upstream_finalize_request(r, u, 0);
        return;
    }
  
    p->preread_bufs->buf = &u->buffer;
    p->preread_bufs->next = NULL;
    u->buffer.recycled = 1;
  
    p->preread_size = u->buffer.last - u->buffer.pos;
//設(shè)置cache相關(guān)
    if (u->cacheable) {
  
        p->buf_to_file = ngx_calloc_buf(r->pool);
        if (p->buf_to_file == NULL) {
            ngx_http_upstream_finalize_request(r, u, 0);
            return;
        }
  
        p->buf_to_file->pos = u->buffer.start;
        p->buf_to_file->last = u->buffer.pos;
        p->buf_to_file->temporary = 1;
    }
  
    if (ngx_event_flags & NGX_USE_AIO_EVENT) {
        /* the posted aio operation may currupt a shadow buffer */
        p->single_buf = 1;
    }
  
    /* TODO: p->free_bufs = 0 if use ngx_create_chain_of_bufs() */
    p->free_bufs = 1;
  
    /*
     * event_pipe would do u->buffer.last += p->preread_size
     * as though these bytes were read
     */
    u->buffer.last = u->buffer.pos;
  
    if (u->conf->cyclic_temp_file) {
  
        /*
         * we need to disable the use of sendfile() if we use cyclic temp file
         * because the writing a new data may interfere with sendfile()
         * that uses the same kernel file pages (at least on FreeBSD)
         */
  
        p->cyclic_temp_file = 1;
        c->sendfile = 0;
  
    } else {
        p->cyclic_temp_file = 0;
    }
//事件相關(guān)的初始化
    p->read_timeout = u->conf->read_timeout;
    p->send_timeout = clcf->send_timeout;
    p->send_lowat = clcf->send_lowat;
//掛載讀寫回調(diào)函數(shù),這里注意一個是upstream的讀回調(diào),一個是r(client)的寫回調(diào)
    u->read_event_handler = ngx_http_upstream_process_upstream;
    r->write_event_handler = ngx_http_upstream_process_downstream;
//進入upstream的操作
    ngx_http_upstream_process_upstream(r, u);

通過上面我們可以看到主要操作都在兩個回調(diào)函數(shù)中,一個是upstream的讀handler,一個是downstream的寫handler,我們一個個看,先來看upstream的讀handler。

這個函數(shù)首先會判斷是否超時,如果超時則設(shè)置錯誤,否則調(diào)用ngx_event_pipe進入pipe的讀處理,然后調(diào)用ngx_http_upstream_process_request對upstream進行處理,比如退出等一系列操作,因此這里最核心的函數(shù)就是ngx_event_pipe。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static void
ngx_http_upstream_process_upstream(ngx_http_request_t *r,
    ngx_http_upstream_t *u)
{
    ngx_connection_t  *c;
  
    c = u->peer.connection;
  
    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http upstream process upstream");
  
    c->log->action = "reading upstream";
//判斷超時
    if (c->read->timedout) {
        u->pipe->upstream_error = 1;
        ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
  
    } else {
//調(diào)用event_pipe進對讀取數(shù)據(jù)進行處理.
        if (ngx_event_pipe(u->pipe, 0) == NGX_ABORT) {
            ngx_http_upstream_finalize_request(r, u, 0);
            return;
        }
    }
  
    ngx_http_upstream_process_request(r);
}

然后來看ngx_event_pipe的源代碼,ngx_event_pipe第二個參數(shù)是do_write,表示是否需要將數(shù)據(jù)立即寫到downstream,也就是client.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
ngx_int_t
ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
{
    u_int         flags;
    ngx_int_t     rc;
    ngx_event_t  *rev, *wev;
  
    for ( ;; ) {
//判斷是否需要將數(shù)據(jù)寫到downstream.
        if (do_write) {
            p->log->action = "sending to client";
//寫數(shù)據(jù)到downstream
            rc = ngx_event_pipe_write_to_downstream(p);
  
            if (rc == NGX_ABORT) {
                return NGX_ABORT;
            }
  
            if (rc == NGX_BUSY) {
                return NGX_OK;
            }
        }
  
        p->read = 0;
        p->upstream_blocked = 0;
  
        p->log->action = "reading upstream";
//讀取數(shù)據(jù)
        if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
            return NGX_ABORT;
        }
//判斷是否需要退出循環(huán),p->read表示是否已經(jīng)讀取了upstream的數(shù)據(jù),upstream_blocked表示是否downstream可寫(后面代碼會看到這兩個變量的設(shè)置)
        if (!p->read && !p->upstream_blocked) {
            break;
        }
//可以看到如果讀取了數(shù)據(jù)就準備寫數(shù)據(jù)到downstream
        do_write = 1;
    }
//判斷是否需要掛載讀事件
    if (p->upstream->fd != -1) {
        rev = p->upstream->read;
  
        flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;
  
        if (ngx_handle_read_event(rev, flags) != NGX_OK) {
            return NGX_ABORT;
        }
  
        if (rev->active && !rev->ready) {
            ngx_add_timer(rev, p->read_timeout);
  
        } else if (rev->timer_set) {
            ngx_del_timer(rev);
        }
    }
//掛載寫事件。
    if (p->downstream->fd != -1 && p->downstream->data == p->output_ctx) {
        wev = p->downstream->write;
        if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
            return NGX_ABORT;
        }
  
        if (!wev->delayed) {
            if (wev->active && !wev->ready) {
                ngx_add_timer(wev, p->send_timeout);
  
            } else if (wev->timer_set) {
                ngx_del_timer(wev);
            }
        }
    }
  
    return NGX_OK;
}

通過上面的代碼我們能看到核心的函數(shù)有這么2個,分別是ngx_event_pipe_read_upstream和ngx_event_pipe_write_to_downstream,這兩個 一個是從后端都數(shù)據(jù),一個是發(fā)送數(shù)據(jù)到前端。其中read最為復(fù)雜,因此我們先來看read回調(diào),這個函數(shù)比較長,我們一段段的來。

先來看這個函數(shù)的一個基本結(jié)構(gòu):
upstream_read

這個函數(shù)的主要處理都在一個for循環(huán)里面,這個for循環(huán)比較長,因此我們就來分段分析這個for循環(huán)。這個for循環(huán)的主要作用就是從后端讀取數(shù)據(jù)。

因此它首先需要做的就是分配buf,以便于從后端接收數(shù)據(jù),可是如果第一次我們接收頭的時候,多接收了一些buf,此時我們就先處理這部分buf,然后再接收新的buf.

下面這段主要是進行狀態(tài)判斷,以及當preread_bufs存在的情況的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//判斷upstream的狀態(tài)
        if (p->upstream_eof || p->upstream_error || p->upstream_done) {
            break;
        }
//如果preread_bufs為空(上面的初始化中這個buf也就是upstream讀取頭的時候,解析完頭,然后剩余的buf),并且upstream并不可讀,此時則說明對數(shù)據(jù)也沒有任何操作和讀取的必要,因此退出循環(huán).
        if (p->preread_bufs == NULL && !p->upstream->read->ready) {
            break;
        }
//如果preread_bufs存在
        if (p->preread_bufs) {
  
            /* use the pre-read bufs if they exist */
//使用preread_bufs
            chain = p->preread_bufs;
//可以看到設(shè)置preread_bufs為空,這樣子,下次循環(huán),則會進入另外的處理,也就是需要從upstream讀取數(shù)據(jù)
            p->preread_bufs = NULL;
//n也就是u->buf的大小
            n = p->preread_size;
  
            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                           "pipe preread: %z", n);
  
            if (n) {
//設(shè)置read,也就是當前upstream中存在讀取還沒發(fā)送的數(shù)據(jù).
                p->read = 1;
            }
  
        }

然后下面這段代碼就是當p->preread_bufs為空的情況,此時就需要從upstream來讀取數(shù)據(jù),而讀取之前則需要分配buf,以供upstream使用,因此下面這段代碼,就是用來分配buf的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
else {
//首先是看free_raw_bufs是否存在,如果存在,則直接使用它。
            if (p->free_raw_bufs) {
  
                /* use the free bufs if they exist */
  
                chain = p->free_raw_bufs;
                if (p->single_buf) {
                    p->free_raw_bufs = p->free_raw_bufs->next;
                    chain->next = NULL;
                } else {
                    p->free_raw_bufs = NULL;
                }
  
            } else if (p->allocated < p->bufs.num) {
//如果free_raw_bufs不存在,并且分配的buf數(shù)量沒有超過bufs的個數(shù),此時則創(chuàng)建新的buf
                /* allocate a new buf if it's still allowed */
  
                b = ngx_create_temp_buf(p->pool, p->bufs.size);
                if (b == NULL) {
                    return NGX_ABORT;
                }
  
                p->allocated++;
  
                chain = ngx_alloc_chain_link(p->pool);
                if (chain == NULL) {
                    return NGX_ABORT;
                }
  
                chain->buf = b;
                chain->next = NULL;
  
            } else if (!p->cacheable
                       && p->downstream->data == p->output_ctx
                       && p->downstream->write->ready
                       && !p->downstream->write->delayed)
            {
//如果已經(jīng)分配的bufs的個數(shù)大于預(yù)設(shè)定的個數(shù),并且沒有打開cache,而且downstream可寫,則設(shè)置upstream_blocked,準備寫數(shù)據(jù)到upstream(這個是為了發(fā)送數(shù)據(jù)之后,數(shù)據(jù)buf能夠被使用讀)
                p->upstream_blocked = 1;
  
                ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                               "pipe downstream ready");
  
                break;
  
            } else if (p->cacheable
                       || p->temp_file->offset < p->max_temp_file_size)
            {
//到達這里有兩個情況,一個是cacheable打開,一個是當buf不夠用了,此時就會將一部分數(shù)據(jù)buf到temp file中。這個函數(shù)我下篇blog會詳細分析,這次之需要知道這個將會將數(shù)據(jù)buf到temp file,然后綁定到p->out中。
                rc = ngx_event_pipe_write_chain_to_temp_file(p);
  
                ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                               "pipe temp offset: %O", p->temp_file->offset);
//處理返回值
                if (rc == NGX_BUSY) {
                    break;
                }
  
                if (rc == NGX_AGAIN) {
                    if (ngx_event_flags & NGX_USE_LEVEL_EVENT
                        && p->upstream->read->active
                        && p->upstream->read->ready)
                    {
                        if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
                            == NGX_ERROR)
                        {
                            return NGX_ABORT;
                        }
                    }
                }
  
                if (rc != NGX_OK) {
                    return rc;
                }
//說明寫成功,此時free_raw_bufs已經(jīng)被重新賦值,也就是我們可以使用,所以類似上面free_raw_bufs存在的處理
                chain = p->free_raw_bufs;
                if (p->single_buf) {
                    p->free_raw_bufs = p->free_raw_bufs->next;
                    chain->next = NULL;
                } else {
                    p->free_raw_bufs = NULL;
                }
  
            } else {
  
                /* there are no bufs to read in */
  
                ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                               "no pipe bufs to read in");
  
                break;
            }
//開始從后端讀取數(shù)據(jù),可以看到數(shù)據(jù)被讀取進chain,n表示讀到的字節(jié)數(shù)
            n = p->upstream->recv_chain(p->upstream, chain);
  
            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                           "pipe recv chain: %z", n);
//如果將chain添加到free_raw_bufs的開頭
            if (p->free_raw_bufs) {
                chain->next = p->free_raw_bufs;
            }
            p->free_raw_bufs = chain;
//設(shè)置error
            if (n == NGX_ERROR) {
                p->upstream_error = 1;
                return NGX_ERROR;
            }
  
            if (n == NGX_AGAIN) {
                if (p->single_buf) {
                    ngx_event_pipe_remove_shadow_links(chain->buf);
                }
  
                break;
            }
//設(shè)置read,表示已經(jīng)讀取了數(shù)據(jù)
            p->read = 1;
//如果返回0,則說明對端關(guān)閉了連接
            if (n == 0) {
                p->upstream_eof = 1;
                break;
            }
        }

然后就是for循環(huán)的最后一段,到達這一段說明從后端的數(shù)據(jù)已經(jīng)讀取到了chain中,然后n為已經(jīng)讀取的數(shù)據(jù),于是開始遍歷已經(jīng)讀取的chain。

它會遍歷chain,然后調(diào)用input_filter來將buf拷貝到 p->in/last_in 中,最后將chain free掉。這里buf 會有一個shadow 的域,在遍歷chain的時候,需要將對應(yīng)buf的shadow刪除掉。

對于shadow域我是這么理解的,就是每次我們在input_filter中拷貝cl->buf的域到p->in/last_in的buf域的時候,也就是制造了一個cl->buf的影子,而他們是共享對應(yīng)的內(nèi)存。此時這兩個buf就是互相shadow的。他們的shadow域都是指向?qū)Ψ?。不過這個域我覺得并不是很必須,上次在郵件列表也看到igor說不喜歡這東西,不過不知道什么時候能刪掉它.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//更新已經(jīng)讀取了的字節(jié)數(shù)
        p->read_length += n;
        cl = chain;
        p->free_raw_bufs = NULL;
//開始遍歷chain,
        while (cl && n > 0) {
//首先remove shadow buf
            ngx_event_pipe_remove_shadow_links(cl->buf);
//得到當前的chain buf的空間大?。ㄒ驗樽x取數(shù)據(jù),是從cl->buf->last開始的)
            size = cl->buf->end - cl->buf->last;
//如果已經(jīng)讀取的字節(jié)數(shù)大于等于chain buf,則對當前的buf進行更新。
            if (n >= size) {
//更新last
                cl->buf->last = cl->buf->end;
  
                /* STUB */ cl->buf->num = p->num++;
//調(diào)用input_filter
                if (p->input_filter(p, cl->buf) == NGX_ERROR) {
                    return NGX_ABORT;
                }
//更新n/cl 最終free chain
                n -= size;
                ln = cl;
                cl = cl->next;
                ngx_free_chain(p->pool, ln);
  
            } else {
//否則則說明當前的chain是最后一個chain,因此更新last,然后設(shè)置n,以便與退出循環(huán)。這里要注意,可以看到nginx并沒有調(diào)用input_filter,這是因為,nginx會盡量的使cl->buf最大情況下調(diào)用p->input_filter,不過這里會有個問題,當cl->buf沒有最大,此時后端斷開連接,這時就會少調(diào)用一次p->input_filter.不過nginx在最后會處理這個問題的。
                cl->buf->last += n;
                n = 0;
            }
        }
//如果cl還存在,則說明我們開始設(shè)置的chain,只有一部分被使用了,因此此時將這寫chain保存到free_raw_bufs中。可以看到如果chain只有一部分被使用,然后當再次循環(huán),則使用的chain會直接使用free_raw_bufs,也就是我們前一次沒有使用完全的chain
        if (cl) {
            for (ln = cl; ln->next; ln = ln->next) { /* void */ }
  
            ln->next = p->free_raw_bufs;
            p->free_raw_bufs = cl;
        }

上面的p->input_filter我們后面再分析,先來看函數(shù)剩余的部分剩余的這一部分主要是處理free_raw_bufs,調(diào)用p->input_filter,將free_raw_bufs中的數(shù)據(jù)保存發(fā)送chain中,這個就是為了解決前面少調(diào)用一次p->input_filter的情況。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {
  
        /* STUB */ p->free_raw_bufs->buf->num = p->num++;
//調(diào)用input_filter.
        if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
            return NGX_ABORT;
        }
  
        p->free_raw_bufs = p->free_raw_bufs->next;
//遍歷,然后
        if (p->free_bufs && p->buf_to_file == NULL) {
            for (cl = p->free_raw_bufs; cl; cl = cl->next) {
                if (cl->buf->shadow == NULL) {
                    ngx_pfree(p->pool, cl->buf->start);
                }
            }
        }
    }
//如果cache打開,并且p->in存在(也就是有讀取的數(shù)據(jù)),則寫數(shù)據(jù)到temp file。
    if (p->cacheable && p->in) {
        if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
            return NGX_ABORT;
        }
    }

然后來看p->input_filter的實現(xiàn),這里我們就來分析,nginx默認實現(xiàn)的一個ngx_event_pipe_copy_input_filter,其中proxy等模塊都是調(diào)用這個filter。它主要是拷貝buf(不是buf的內(nèi)容,只是buf的屬性)到p->in或者p->last_in,這兩個域都是用來和write數(shù)據(jù)的時候交互用的。這兩個域的區(qū)別是這樣子的
p->in只能保存一個chain,而p->in這條鏈上的剩余的chain都保存在p->last_in中,這么做的原因還不太清楚,而且搜索了下代碼,last_in也沒有被使用到.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
ngx_int_t
ngx_event_pipe_copy_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
{
    ngx_buf_t    *b;
    ngx_chain_t  *cl;
  
    if (buf->pos == buf->last) {
        return NGX_OK;
    }
//如果free存在,則從free中取得緩存的buf
    if (p->free) {
        cl = p->free;
        b = cl->buf;
        p->free = cl->next;
        ngx_free_chain(p->pool, cl);
  
    } else {
//否則分配buf
        b = ngx_alloc_buf(p->pool);
        if (b == NULL) {
            return NGX_ERROR;
        }
    }
//拷貝buf的屬性
    ngx_memcpy(b, buf, sizeof(ngx_buf_t));
    b->shadow = buf;
    b->tag = p->tag;
    b->last_shadow = 1;
    b->recycled = 1;
    buf->shadow = b;
//分配chain
    cl = ngx_alloc_chain_link(p->pool);
    if (cl == NULL) {
        return NGX_ERROR;
    }
  
    cl->buf = b;
    cl->next = NULL;
  
    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d", b->num);
  
    if (p->in) {
        *p->last_in = cl;
    } else {
        p->in = cl;
    }
    p->last_in = &cl->next;
  
    return NGX_OK;
}

這次就分析到這里,下次將會分析upstream的數(shù)據(jù)發(fā)送的部分。

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多

    精品国产91亚洲一区二区三区 | 天堂网中文字幕在线观看| 老司机精品视频在线免费看| 99久久精品免费精品国产| 亚洲国产精品一区二区| 欧美色婷婷综合狠狠爱| 日系韩系还是欧美久久| 日本午夜免费啪视频在线| 亚洲高清亚洲欧美一区二区| 国产一区二区三区香蕉av| 经典欧美熟女激情综合网| 欧美日韩精品久久第一页| 国产日产欧美精品视频| 一区二区三区在线不卡免费| 亚洲一区在线观看蜜桃| 欧美一本在线免费观看| 中文字幕日韩欧美理伦片| 国产91麻豆精品成人区| 东北老熟妇全程露脸被内射| 一区二区三区日本高清| 在线精品首页中文字幕亚洲| 国产精品视频一区二区秋霞| 扒开腿狂躁女人爽出白浆av| 日韩精品中文字幕在线视频| 黄片免费在线观看日韩| 精品熟女少妇一区二区三区| 国产不卡免费高清视频| 成人精品日韩专区在线观看| 亚洲另类欧美综合日韩精品| 嫩草国产福利视频一区二区| 又大又紧又硬又湿又爽又猛| 视频一区日韩经典中文字幕| 日本女优一色一伦一区二区三区| 亚洲高清中文字幕一区二三区 | 国产精品久久香蕉国产线| 国产日韩欧美国产欧美日韩| 国产精品第一香蕉视频| 色狠狠一区二区三区香蕉蜜桃| 国产精品午夜一区二区三区 | 国产一级性生活录像片| 97人妻精品免费一区二区|