您的位置:首页 > 运维架构 > Nginx

Nginx Upstream模块源码分析(上)

2013-10-27 22:22 411 查看
Upstream模块是一个很重要的模块,很多其他模块都会使用它来完成对后端服务器的访问,
达到反向代理和负载均衡的效果。例如Fastcgi、Memcached、SessionSticky等。
如果自己实现这部分功能,采用传统的实现方式,很可能会阻塞Nginx降低其性能,因为Nginx是全异步非阻塞的。

所以要想不破坏其优美的架构,就得按照其规范实现很多回调函数,注册这些钩子到Nginx的处理流程中。
下面以一个使用Upstream模块的第三方模块SessionSticky为例,分析一下Upstream模块的执行流程。

一、配置解析

每个模块的入口变量ngx_module_t中,都需要指明:
一个ngx_command_t数组表示模块可以解析的配置;
一个module_ctx上下文,注册初始化和合并配置时的回调函数;
一个解析配置的函数;

Upstream模块的ngx_command_t数组的配置如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49
static
ngx_command_t  ngx_http_upstream_commands[] = {


{ ngx_string(
"upstream"
),


  
NGX_HTTP_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_TAKE1,


  
ngx_http_upstream,


  
0,


  
0,


  
NULL },


{ ngx_string(
"server"
),


  
NGX_HTTP_UPS_CONF|NGX_CONF_1MORE,


  
ngx_http_upstream_server,


  
NGX_HTTP_SRV_CONF_OFFSET,


  
0,


  
NULL },


  
ngx_null_command


};


static
ngx_http_module_t  ngx_http_upstream_module_ctx = {


ngx_http_upstream_add_variables,   
/* preconfiguration */


NULL,  
/* postconfiguration */


ngx_http_upstream_create_main_conf,
/* create main configuration */


ngx_http_upstream_init_main_conf,  
/* init main configuration */


NULL,  
/* create server configuration */


NULL,  
/* merge server configuration */


NULL,  
/* create location configuration */


NULL   
/* merge location configuration */


};


ngx_module_t  ngx_http_upstream_module = {


NGX_MODULE_V1,


&ngx_http_upstream_module_ctx, 
/* module context */


ngx_http_upstream_commands,
/* module directives */


NGX_HTTP_MODULE,   
/* module type */


NULL,  
/* init master */


NULL,  
/* init module */


NULL,  
/* init process */


NULL,  
/* init thread */


NULL,  
/* exit thread */


NULL,  
/* exit process */


NULL,  
/* exit master */


NGX_MODULE_V1_PADDING


};


1)配置项

从上面的配置可知,Upstream模块可以解析http内的upstream块和块内的server。配置项含义如下,
Nginx就是靠这些选项帮助它找到能解析当前配置的模块:

NGX_CONF_TAKE1:配置指令接受1个参数。

NGX_CONF_1MORE:配置指令接受至少一个参数。

NGX_CONF_BLOCK:配置指令可以接受的值是一个配置信息块。也就是一对大括号括起来的内容。里面可以再包括很多的配置指令。比如常见的server指令就是这个属性的。

NGX_HTTP_MAIN_CONF: 可以直接出现在http配置指令里。

NGX_HTTP_UPS_CONF: 可以出现在http里面的upstream配置指令里。

2)初始化函数

初始化函数的工作很简单,create函数中为配置分配空间,init函数中会调用peer的init_stream(),
这是Upstream与使用它的模块的第一次交互。其他模块就是通过注册各种回调函数,加入到Upstream处理的生命周期的。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80
static
void
*


ngx_http_upstream_create_main_conf(ngx_conf_t *cf)


{


ngx_http_upstream_main_conf_t  *umcf;


umcf = ngx_pcalloc(cf->pool,
sizeof
(ngx_http_upstream_main_conf_t));


if
(umcf == NULL) {


return
NULL;


}


if
(ngx_array_init(&umcf->upstreams, cf->pool, 4,


   
sizeof
(ngx_http_upstream_srv_conf_t *))


!= NGX_OK)


{


return
NULL;


}


return
umcf;


}


static
char
*


ngx_http_upstream_init_main_conf(ngx_conf_t *cf,
void
*conf)


{


ngx_http_upstream_main_conf_t  *umcf = conf;


ngx_uint_t  i;


ngx_array_t headers_in;


ngx_hash_key_t *hk;


ngx_hash_init_t hash;


ngx_http_upstream_init_pt   init;


ngx_http_upstream_header_t *header;


ngx_http_upstream_srv_conf_t  **uscfp;


uscfp = umcf->upstreams.elts;


for
(i = 0; i < umcf->upstreams.nelts; i++) {


// 就是在这里回调peer的init_stream()函数


init = uscfp[i]->peer.init_upstream ? uscfp[i]->peer.init_upstream:


ngx_http_upstream_init_round_robin;


if
(init(cf, uscfp[i]) != NGX_OK) {


return
NGX_CONF_ERROR;


}


}


/* upstream_headers_in_hash */


if
(ngx_array_init(&headers_in, cf->temp_pool, 32,
sizeof
(ngx_hash_key_t))


!= NGX_OK)


{


return
NGX_CONF_ERROR;


}


for
(header = ngx_http_upstream_headers_in; header->name.len; header++) {


hk = ngx_array_push(&headers_in);


if
(hk == NULL) {


return
NGX_CONF_ERROR;


}


hk->key = header->name;


hk->key_hash = ngx_hash_key_lc(header->name.data, header->name.len);


hk->value = header;


}


hash.hash = &umcf->headers_in_hash;


hash.key = ngx_hash_key_lc;


hash.max_size = 512;


hash.bucket_size = ngx_align(64, ngx_cacheline_size);


hash.name =
"upstream_headers_in_hash"
;


hash.pool = cf->pool;


hash.temp_pool = NULL;


if
(ngx_hash_init(&hash, headers_in.elts, headers_in.nelts) != NGX_OK) {


return
NGX_CONF_ERROR;


}


return
NGX_CONF_OK;


}


3)解析函数

解析函数解析出ngx_http_upstream_srv_conf_t并保存到全局配置数组中。其他使用Upstream的模块,
会大量使用读取配置,完成自己的功能。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108
static
char
*


ngx_http_upstream(ngx_conf_t *cf, ngx_command_t *cmd,
void
*dummy)


{


char
*rv;


void
*mconf;


ngx_str_t *value;


ngx_url_t  u;


ngx_uint_t m;


ngx_conf_t pcf;


ngx_http_module_t *module;


ngx_http_conf_ctx_t   *ctx, *http_ctx;


ngx_http_upstream_srv_conf_t  *uscf;


ngx_memzero(&u,
sizeof
(ngx_url_t));


value = cf->args->elts;


u.host = value[1];


u.no_resolve = 1;


u.no_port = 1;


uscf = ngx_http_upstream_add(cf, &u, NGX_HTTP_UPSTREAM_CREATE


 
|NGX_HTTP_UPSTREAM_WEIGHT


 
|NGX_HTTP_UPSTREAM_MAX_FAILS


 
|NGX_HTTP_UPSTREAM_FAIL_TIMEOUT


 
|NGX_HTTP_UPSTREAM_DOWN


 
|NGX_HTTP_UPSTREAM_BACKUP);


if
(uscf == NULL) {


return
NGX_CONF_ERROR;


}


ctx = ngx_pcalloc(cf->pool,
sizeof
(ngx_http_conf_ctx_t));


if
(ctx == NULL) {


return
NGX_CONF_ERROR;


}


http_ctx = cf->ctx;


ctx->main_conf = http_ctx->main_conf;


/* the upstream{}'s srv_conf */


ctx->srv_conf = ngx_pcalloc(cf->pool,
sizeof
(
void
*) * ngx_http_max_module);


if
(ctx->srv_conf == NULL) {


return
NGX_CONF_ERROR;


}


// 将ngx_http_upstream_srv_conf_t保存到配置数组中,其他模块会读取配置


ctx->srv_conf[ngx_http_upstream_module.ctx_index] = uscf;


uscf->srv_conf = ctx->srv_conf;


/* the upstream{}'s loc_conf */


ctx->loc_conf = ngx_pcalloc(cf->pool,
sizeof
(
void
*) * ngx_http_max_module);


if
(ctx->loc_conf == NULL) {


return
NGX_CONF_ERROR;


}


for
(m = 0; ngx_modules[m]; m++) {


if
(ngx_modules[m]->type != NGX_HTTP_MODULE) {


continue
;


}


module = ngx_modules[m]->ctx;


if
(module->create_srv_conf) {


mconf = module->create_srv_conf(cf);


if
(mconf == NULL) {


return
NGX_CONF_ERROR;


}


ctx->srv_conf[ngx_modules[m]->ctx_index] = mconf;


}


if
(module->create_loc_conf) {


mconf = module->create_loc_conf(cf);


if
(mconf == NULL) {


return
NGX_CONF_ERROR;


}


ctx->loc_conf[ngx_modules[m]->ctx_index] = mconf;


}


}


/* parse inside upstream{}*/


pcf = *cf;


cf->ctx = ctx;


cf->cmd_type = NGX_HTTP_UPS_CONF;


rv = ngx_conf_parse(cf, NULL);


*cf = pcf;


if
(rv != NGX_CONF_OK) {


return
rv;


}


if
(uscf->servers == NULL) {


ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,


   
"no servers are inside upstream"
);


return
NGX_CONF_ERROR;


}


return
rv;


}


二、模块启动

Upstream模块都初始化好了之后,是如何被启动的呢?一般我们是这样使用Upstream模块的:

1

2

3

4

5

6

7

8

9

10

11

12

13
upstream backend {


session_sticky;


server www.baidu.com weight=10;


server www.google.com weight=10;


}


server {


listen 80;


server_name sessionsticky.com;


location / {


proxy_pass http://backend;[/code] 
}


}


奥秘模块HttpProxyModule中,此Handler模块解析proxy_pass,配置如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36
static
ngx_command_t  ngx_http_proxy_commands[] = {


{ ngx_string(
"proxy_pass"
),


  
NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF|NGX_HTTP_LMT_CONF|NGX_CONF_TAKE1,


  
ngx_http_proxy_pass,


  
NGX_HTTP_LOC_CONF_OFFSET,


  
0,


  
NULL },


  
......


}


static
char
*


ngx_http_proxy_pass(ngx_conf_t *cf, ngx_command_t *cmd,
void
*conf)


{


ngx_http_proxy_loc_conf_t *plcf = conf;


size_t
add;


u_short port;


ngx_str_t  *value, *url;


ngx_url_t   u;


ngx_uint_t  n;


ngx_http_core_loc_conf_t   *clcf;


ngx_http_script_compile_t   sc;


if
(plcf->upstream.upstream || plcf->proxy_lengths) {


return
"is duplicate"
;


}


clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);




// 注册Handler的处理函数


clcf->handler = ngx_http_proxy_handler;


.......


return
NGX_CONF_OK;


}


解析函数中注册的Handler处理函数ngx_http_proxy_handler(),在调用ngx_http_read_client_request_body()时
将ngx_http_upstream_init传入,作为接收客户端请求体的后处理函数。这样每次从客户端读取完请求Body后,
都会回调Upstream的init函数。

注意Nginx与Squid的区别,Nginx会将请求体全部读取完后再进行后续处理。而Squid会边读边转发。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84
static
ngx_int_t


ngx_http_proxy_handler(ngx_http_request_t *r)


{


ngx_int_t   rc;


ngx_http_upstream_t*u;


ngx_http_proxy_ctx_t   *ctx;


ngx_http_proxy_loc_conf_t  *plcf;


if
(ngx_http_upstream_create(r) != NGX_OK) {


return
NGX_HTTP_INTERNAL_SERVER_ERROR;


}


ctx = ngx_pcalloc(r->pool,
sizeof
(ngx_http_proxy_ctx_t));


if
(ctx == NULL) {


return
NGX_ERROR;


}


ngx_http_set_ctx(r, ctx, ngx_http_proxy_module);


plcf = ngx_http_get_module_loc_conf(r, ngx_http_proxy_module);


u = r->upstream;


if
(plcf->proxy_lengths == NULL) {


ctx->vars = plcf->vars;


u->schema = plcf->vars.schema;


#if (NGX_HTTP_SSL)


u->ssl = (plcf->upstream.ssl != NULL);


#endif


}
else
{


if
(ngx_http_proxy_eval(r, ctx, plcf) != NGX_OK) {


return
NGX_HTTP_INTERNAL_SERVER_ERROR;


}


}


u->output.tag = (ngx_buf_tag_t) &ngx_http_proxy_module;


u->conf = &plcf->upstream;


//以下就是HttpProxyModule注册的默认的回调函数


#if (NGX_HTTP_CACHE)


u->create_key = ngx_http_proxy_create_key;


#endif


u->create_request = ngx_http_proxy_create_request;


u->reinit_request = ngx_http_proxy_reinit_request;


u->process_header = ngx_http_proxy_process_status_line;


u->abort_request = ngx_http_proxy_abort_request;


u->finalize_request = ngx_http_proxy_finalize_request;


r->state = 0;


if
(plcf->redirects) {


u->rewrite_redirect = ngx_http_proxy_rewrite_redirect;


}


if
(plcf->cookie_domains || plcf->cookie_paths) {


u->rewrite_cookie = ngx_http_proxy_rewrite_cookie;


}


u->buffering = plcf->upstream.buffering;


u->pipe = ngx_pcalloc(r->pool,
sizeof
(ngx_event_pipe_t));


if
(u->pipe == NULL) {


return
NGX_HTTP_INTERNAL_SERVER_ERROR;


}


u->pipe->input_filter = ngx_http_proxy_copy_filter;


u->pipe->input_ctx = r;


u->input_filter_init = ngx_http_proxy_input_filter_init;


u->input_filter = ngx_http_proxy_non_buffered_copy_filter;


u->input_filter_ctx = r;


u->accel = 1;


// 在这里注册ngx_http_upstream_init回调函数,读取完请求体后就会触发


rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init);


if
(rc >= NGX_HTTP_SPECIAL_RESPONSE) {


return
rc;


}


return
NGX_DONE;


}


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: