惊群探究

惊群

发生在多进程或者多线程,等待同一个socket事件,当该事件发生,这些进程或者线程都被唤醒

发生位置

2.6版本内核accept已经解决该问题了,但是select/poll或者epool_wait仍然存在该问题

产生影响

一个连接来临时,多个子进程同时被唤醒,却只有一个子进程accept成功,其余都失败,重新休眠;产生了没有必要的唤醒和上下文切换,造成性能浪费。

惊群实例分析

总体结论:accept linux内核已经解决惊群问题3.10.0-514.16.1.el7.x86_64,具体唤醒方式实现参见socket信号处理博文
select/pool和epool内核并没有支持解决惊群问题。
之前一直觉得epool和select使用非常平凡,为什么操作系统不直接自己解决惊群问题?应该是epool和select不仅仅使用在socket监听中,会存在需要唤醒多个进程的使用场景。

accept

实例背景:3.10.0-514.16.1.el7.x86_64内核下,在创建、绑定和监听后,创建多进程accept 建立好的fd,当一个客户端访问该服务端时候,观察进程的连接获取情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <strings.h>
#define SERV_PORT 9999

int main(int argc,char **argv)
{
int listenfd,connfd;
pid_t childpid,childpid2;
socklen_t clilen;
struct sockaddr_in cliaddr,servaddr;


listenfd = socket(AF_INET,SOCK_STREAM,0);
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl (INADDR_ANY);
servaddr.sin_port = htons (SERV_PORT);


bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr));
listen(listenfd,1000);

clilen = sizeof(cliaddr);

if( (childpid = fork()) == 0)
{
while(1)
{
connfd = accept(listenfd,(struct sockaddr *) &cliaddr,&clilen);
printf("fork 1 is [%d],error is %m\n",connfd);
}
}

if( (childpid2 = fork()) == 0)
{

while(1){
connfd = accept(listenfd,(struct sockaddr *) &cliaddr,&clilen);
printf("fork 2 is [%d],error is %m\n",connfd);
}
}

sleep(100);
return 1;
}

窗口1:编译运行例子

1
2
3
[root@localhost demo]# gcc jq.c -o jq
[root@localhost demo]# ./jq
fork 1 is [4],error is Success

窗口2:访问9999端口

1
[root@localhost ~]# curl http://127.0.0.1:9999

结果:在创建、绑定和监听后,创建多进程监听后,只有一个进程被唤醒接收处理fd,其它进程均在休眠阶段,在linux内核3.10.0-514.16.1.el7.x86_64版本下,多进程accept连接时候不存在惊群现象。

select

实例背景:3.10.0-514.16.1.el7.x86_64内核下,在创建、绑定和监听后,创建多进程select监听 建立好的fd,当一个客户端访问该服务端时候,多进程活动情况

jingqunselect.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <strings.h>
#define SERV_PORT 8888

int main(int argc,char **argv)
{
int listenfd,connfd;
pid_t childpid,childpid2;
socklen_t clilen;
struct sockaddr_in cliaddr,servaddr;


listenfd = socket(AF_INET,SOCK_STREAM,0);
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl (INADDR_ANY);
servaddr.sin_port = htons (SERV_PORT);


bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr));
listen(listenfd,1000);

clilen = sizeof(cliaddr);

if( (childpid = fork()) == 0)
{
int maxsock = listenfd+1;
fd_set fdsr;
FD_ZERO(&fdsr);
FD_SET(listenfd, &fdsr);
struct timeval tv;
tv.tv_sec = 30;
tv.tv_usec = 0;
int ret = select(maxsock, &fdsr,NULL,NULL,&tv);
if(ret < 0)
{
printf("[%d]child err 1 \n",getpid());
}
else if(ret == 0)
{
printf("time out\n");
}
else
{
printf("[%d] rcv singal \n",getpid());
while(1)
{
connfd = accept(listenfd,(struct sockaddr *) &cliaddr,&clilen);
printf("[%d] [%d],error is %m\n",getpid(),connfd);
}

}
}

if( (childpid2 = fork()) == 0)
{
int maxsock = listenfd+1;
fd_set fdsr;
FD_ZERO(&fdsr);
FD_SET(listenfd, &fdsr);
struct timeval tv;
tv.tv_sec = 30;
tv.tv_usec = 0;
int ret = select(maxsock, &fdsr,NULL,NULL,&tv);
if(ret < 0)
{
printf("[%d]child 2 err\n",getpid());
}
else if(ret == 0)
{
printf("time out\n");
}
else
{
printf("[%d] rcv singal \n",getpid());
while(1)
{
connfd = accept(listenfd,(struct sockaddr *) &cliaddr,&clilen);
printf("[%d] [%d],error is %m\n",getpid(),connfd);
}
}
}

sleep(100);
return 1;
}

窗口1:编译运行上述代码

1
2
3
4
5
6
[root@localhost demo]# gcc jingqunselect.c -o jqselect
[root@localhost demo]#
[root@localhost demo]# ./jqselect
[23954] rcv singal
[23955]rcv singal
[23954] [4],error is Success

窗口2:[root@localhost ~]# curl http://127.0.0.1:8888
结论:有连接建立信号时候两个监听进程均被唤醒,也就是存在惊群问题。

epool

实例背景:3.10.0-514.16.1.el7.x86_64内核下,在创建、绑定和监听后,创建10个子进程epool监听 建立好的fd,当一个客户端访问该服务端时候,多进程活动情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
[root@localhost demo]# cat epjq.c
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netdb.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/wait.h>
#define PROCESS_NUM 10
static int
create_and_bind (char *port)
{
int fd = socket(PF_INET, SOCK_STREAM, 0);
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(atoi(port));
bind(fd, (struct sockaddr*)&serveraddr, sizeof(serveraddr));
return fd;
}
static int
make_socket_non_blocking (int sfd)
{
int flags, s;

flags = fcntl (sfd, F_GETFL, 0);
if (flags == -1)
{
perror ("fcntl");
return -1;
}

flags |= O_NONBLOCK;
s = fcntl (sfd, F_SETFL, flags);
if (s == -1)
{
perror ("fcntl");
return -1;
}

return 0;
}

#define MAXEVENTS 64

int
main (int argc, char *argv[])
{
int sfd, s;
int efd;
struct epoll_event event;
struct epoll_event *events;

sfd = create_and_bind("8888");
if (sfd == -1)
abort ();

s = make_socket_non_blocking (sfd);
if (s == -1)
abort ();

s = listen(sfd, SOMAXCONN);
if (s == -1)
{
perror ("listen");
abort ();
}

efd = epoll_create(MAXEVENTS);
if (efd == -1)
{
perror("epoll_create");
abort();
}

event.data.fd = sfd;
//event.events = EPOLLIN | EPOLLET;
event.events = EPOLLIN;
s = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event);
if (s == -1)
{
perror("epoll_ctl");
abort();
}

/* Buffer where events are returned */
events = calloc(MAXEVENTS, sizeof event);
int k;
for(k = 0; k < PROCESS_NUM; k++)
{
int pid = fork();
if(pid == 0)
{

/* The event loop */
while (1)
{
int n, i;
n = epoll_wait(efd, events, MAXEVENTS, -1);
printf("process %d return from epoll_wait!\n", getpid());
/* sleep here is very important!*/
sleep(2);

for (i = 0; i < n; i++)
{
if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN)))
{
/* An error has occured on this fd, or the socket is not
ready for reading (why were we notified then?) */
fprintf (stderr, "epoll error\n");
close (events[i].data.fd);
continue;
}
else if (sfd == events[i].data.fd)
{
/* We have a notification on the listening socket, which
means one or more incoming connections. */
struct sockaddr in_addr;
socklen_t in_len;
int infd;
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

in_len = sizeof in_addr;
infd = accept(sfd, &in_addr, &in_len);
if (infd == -1)
{
printf("process %d accept failed!\n", getpid());
break;
}
printf("process %d accept successed!\n", getpid());

/* Make the incoming socket non-blocking and add it to the
list of fds to monitor. */
close(infd);
}
}
}
}
}
int status;
wait(&status);
free (events);
close (sfd);
return EXIT_SUCCESS;
}

窗口1:编译运行epooldemo
[root@localhost demo]# ./epjq
process 24197 return from epoll_wait!
process 24198 return from epoll_wait!
process 24196 return from epoll_wait!
process 24195 return from epoll_wait!
process 24194 return from epoll_wait!
process 24193 return from epoll_wait!
process 24192 return from epoll_wait!
process 24191 return from epoll_wait!
process 24190 return from epoll_wait!
process 24189 return from epoll_wait!
process 24193 accept successed!
process 24194 accept failed!
process 24197 accept failed!
process 24195 accept failed!
process 24192 accept failed!
process 24191 accept failed!
process 24196 accept failed!
process 24198 accept failed!
process 24189 accept failed!
process 24190 accept failed!

窗口2:[root@localhost ~]# curl http://127.0.0.1:8888

结论:epoll_wait监听事件时候没有解决惊群问题;所有监听进程均会被打扰惊醒,进行上下文切换后然后进入睡眠。

how to slove ?

多进程需要从某一个端口获取连接,为了高性能我们摒弃直接accept而根据具体使用场景选用epool/poll/select等多socket管控机制,这种管控机制会带来惊群问题(具体内容上述实验已经表达)。
目前标准解决方案有两种:

1.锁机制(见后文详细说明)
2.复用端口

kernel 3.9增加了SO_REUSEPORT socket option,该选项允许服务端socket复用端口,通过hash机制将连接分配客户端到具体的进程;而这一切都是内核在处理。

实例背景:3.10.0-514.16.1.el7.x86_64内核,fork多个进程,通过设置SO_REUSEPORT标记,多进程一起监听端口8888,具体实验demo程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netdb.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/wait.h>
#define PROCESS_NUM 10
static int
create_and_bind (char *port)
{
int fd = socket(PF_INET, SOCK_STREAM, 0);
int optval = 1;
setsockopt(fd,SOL_SOCKET,SO_REUSEPORT,&optval,sizeof(optval));

struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(atoi(port));
bind(fd, (struct sockaddr*)&serveraddr, sizeof(serveraddr));
return fd;
}
static int
make_socket_non_blocking (int sfd)
{
int flags, s;

flags = fcntl (sfd, F_GETFL, 0);
if (flags == -1)
{
perror ("fcntl");
return -1;
}

flags |= O_NONBLOCK;
s = fcntl (sfd, F_SETFL, flags);
if (s == -1)
{
perror ("fcntl");
return -1;
}

return 0;
}

#define MAXEVENTS 64


static void socket_proc()
{
int sfd, s;
int efd;
struct epoll_event event;
struct epoll_event *events;

sfd = create_and_bind("8888");
if (sfd == -1)
abort ();

s = make_socket_non_blocking (sfd);
if (s == -1)
abort ();

s = listen(sfd, SOMAXCONN);
if (s == -1)
{
perror ("listen");
abort ();
}

efd = epoll_create(MAXEVENTS);
if (efd == -1)
{
perror("epoll_create");
abort();
}

event.data.fd = sfd;
//event.events = EPOLLIN | EPOLLET;
event.events = EPOLLIN;
s = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event);
if (s == -1)
{
perror("epoll_ctl");
abort();
}

/* Buffer where events are returned */
events = calloc(MAXEVENTS, sizeof event);

/* The event loop */
while (1)
{
int n, i;
n = epoll_wait(efd, events, MAXEVENTS, -1);
printf("process %d return from epoll_wait!\n", getpid());
/* sleep here is very important!*/
// sleep(2);

for (i = 0; i < n; i++)
{
if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN)))
{
/* An error has occured on this fd, or the socket is not
ready for reading (why were we notified then?) */
fprintf (stderr, "epoll error\n");
close (events[i].data.fd);
continue;
}
else if (sfd == events[i].data.fd)
{
/* We have a notification on the listening socket, which
means one or more incoming connections. */
struct sockaddr in_addr;
socklen_t in_len;
int infd;
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

in_len = sizeof in_addr;
infd = accept(sfd, &in_addr, &in_len);
if (infd == -1)
{
printf("process %d accept failed!\n", getpid());
break;
}
printf("process %d accept successed!\n", getpid());

/* Make the incoming socket non-blocking and add it to the
list of fds to monitor. */
close(infd);
}
}
}
free (events);
close (sfd);
}


int
main (int argc, char *argv[])
{

int k;
for(k = 0; k < PROCESS_NUM; k++)
{

int pid = fork();
if(pid == 0)
{
socket_proc();
}
}
int status;
wait(&status);
return EXIT_SUCCESS;
}

编译运行:

1
2
3
4
5
6
7
[root@localhost demo]#
[root@localhost demo]# gcc epjqreuseport.c -o e.out
[root@localhost demo]# ./e.out
process 31071 return from epoll_wait!
process 31071 accept successed!
process 31075 return from epoll_wait!
process 31075 accept successed!

总结:SO_REUSEPORT允许多进程共同bind同一个端口,内核会按照一定机制分配访问连接到不同的进程

nginx采用epool模型,怎么解决惊群?

如果进程并没有处于过载状态,那么就会去争用锁,当然,实际上是争用监听套接口的监控权.

  • 争锁成功就会把所有监听套接口加入到自身的事件监控机制里(如果原本不在)
  • 争锁失败就会把监听套接口从自身的事件监控机制里删除(如果原本在)

争抢成功的进程,可以调用epoll_ctl把所有要监听的端口加入该进程的epool事件中,然后epool_wait阻塞及时获取客户端的新建tcp事件,如果获取到相应事件,该进程调用accept正式建立建立连接;然后释放锁。当锁被释放后所有进程可以共同争抢锁了。

也就是说,因为锁的原因,同一时间只能有一个进程拥有监听端口的监控权利(将监听端口放入自己epool中中控制并且调用epoolwait监控新建事件)。这种机制保障了不会有多进程共同拥有套接口的监控权,从而避免了惊群问题。