epool基本用法

epool基本概念

是什么?

改进的pool,一种I/O多路复用技术,可管理大批量文件描述符。

工作原理?

内核中,一切皆文件,epoll向内核注册一个文件系统,用于存储要管理的文件描述符号。调用epoll_create时,会在虚拟文件系统中创建一个file节点服务epool同时也会创建就绪事件list链表。
操作系统启动后,会开辟出自己的高速cache,socket问价描述符会以红黑树存入cache,方便查找、插入、删除。
epool_ctl,把socket放到epool文件系统里file对应的红黑树,也会注册一个回调函数,文件描述符有信号后,会调用该组册函数,内核把网卡数据copy到内核中把socket插入就绪列表中。
epoll_wait调用时候,看一眼就绪列表,所以效率很高。监控百万描述符,但是准备就绪fd却很少。

适用场景?

非常适用大量并发连接中只有少量活跃连接情况,且在该情况下CPU适用率很低。

可能缺点?

所有socket基本都是活跃的,比如在一个高速的LAN环境,使用epool可能会比select/pool效率低

分为LT和ET

LT和ET作用在epool_wait过程中,LT模式下,只要一个文件描述符没有处理完,后续再次调用epool_wait时也会返回。实现过程为,内核会把socket事件插入就绪链表,epool_wait调用会被把就绪的文件描述符拷入用户态,清空就绪链表,如果是ET则额外检测如果存在没有处理文件描述符,则将再次放入就绪列表中。

epool例子

epoll_create函数

用途:创建一个epool事件管理并返回描述符号

1
2
#include <sys/epoll.h>
int epoll_create(int size);

参数:size 最大fd数
返回值:epool使用的文件描述符

  • -1 失败
  • >= 0 成功

epoll_ctl函数

用途:控制epoll事件,添加修改删除事件

1
2
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

参数:

epfd:epoll_create的返回值
op:要进行的操作例如注册事件,可能的取值EPOLL_CTL_ADD 注册、EPOLL_CTL_MOD 修 改、EPOLL_CTL_DEL 删除
fd:要在epool事件管理上加入删除或者修改的文件描述符
event:event.data.fd 要处理的文件描述符
event:event.events = EPOLLIN|EPOLLET;
EPOLLIN :表示对应的文件描述符可以读;
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET:边缘触发;

返回值:成功返回0,失败返回-1
When successful, epoll_ctl() returns zero. When an error occurs, epoll_ctl() returns -1 and errno is set appropriately

epool_wait函数

用途:返回IO事件就绪的fd

1
2
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);

参数:
epfd:epoll_create的返回值
events:取出内核结果的事件数组
maxevents:要处理的事件数
timeout:等待IO发生超时值
-1 阻塞直到有事件
0 非阻塞
>0: 阻塞时间,单位毫秒

epoool函数实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#include  <unistd.h>
#include <sys/types.h> /* basic system data types */
#include <sys/socket.h> /* basic socket definitions */
#include <netinet/in.h> /* sockaddr_in{} and other Internet defns */
#include <arpa/inet.h> /* inet(3) functions */
#include <sys/epoll.h> /* epoll function */
#include <fcntl.h> /* nonblocking */
#include <sys/resource.h> /*setrlimit */

#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>

#define MAXEPOLLSIZE 10000
#define MAXLINE 10240
int handle(int connfd);
int setnonblocking(int sockfd)
{
if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK) == -1) {
return -1;
}
return 0;
}

int main(int argc, char **argv)
{
int servPort = 8080;
int listenq = 1024;

int listenfd, connfd, kdpfd, nfds, n, nread, curfds,acceptCount = 0;
struct sockaddr_in servaddr, cliaddr;
socklen_t socklen = sizeof(struct sockaddr_in);
struct epoll_event ev;
struct epoll_event events[MAXEPOLLSIZE];
struct rlimit rt;
char buf[MAXLINE];

/* 设置每个进程允许打开的最大文件数 */
rt.rlim_max = rt.rlim_cur = MAXEPOLLSIZE;
if (setrlimit(RLIMIT_NOFILE, &rt) == -1)
{
perror("setrlimit error");
return -1;
}


bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl (INADDR_ANY);
servaddr.sin_port = htons (servPort);

listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd == -1) {
perror("can't create socket file");
return -1;
}

int opt = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

if (setnonblocking(listenfd) < 0) {
perror("setnonblock error");
}

if (bind(listenfd, (struct sockaddr *) &servaddr, sizeof(struct sockaddr)) == -1)
{
perror("bind error");
return -1;
}
if (listen(listenfd, listenq) == -1)
{
perror("listen error");
return -1;
}
/* 创建 epoll 句柄,把监听 socket 加入到 epoll 集合里 */
kdpfd = epoll_create(MAXEPOLLSIZE);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = listenfd;
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listenfd, &ev) < 0)
{
fprintf(stderr, "epoll set insertion error: fd=%d\n", listenfd);
return -1;
}
curfds = 1;

printf("epollserver startup,port %d, max connection is %d, backlog is %d\n", servPort, MAXEPOLLSIZE, listenq);

for (;;) {
/* 等待有事件发生 */
nfds = epoll_wait(kdpfd, events, curfds, -1);
if (nfds == -1)
{
perror("epoll_wait");
continue;
}
/* 处理所有事件 */
for (n = 0; n < nfds; ++n)
{
if (events[n].data.fd == listenfd)
{
connfd = accept(listenfd, (struct sockaddr *)&cliaddr,&socklen);
if (connfd < 0)
{
perror("accept error");
continue;
}

sprintf(buf, "accept form %s:%d\n", inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port);
printf("%d:%s", ++acceptCount, buf);

if (curfds >= MAXEPOLLSIZE) {
fprintf(stderr, "too many connection, more than %d\n", MAXEPOLLSIZE);
close(connfd);
continue;
}
if (setnonblocking(connfd) < 0) {
perror("setnonblocking error");
}
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = connfd;
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, connfd, &ev) < 0)
{
fprintf(stderr, "add socket '%d' to epoll failed: %s\n", connfd, strerror(errno));
return -1;
}
curfds++;
continue;
}
// 处理客户端请求
if (handle(events[n].data.fd) < 0) {
epoll_ctl(kdpfd, EPOLL_CTL_DEL, events[n].data.fd,&ev);
curfds--;


}
}
}
close(listenfd);
return 0;
}
int handle(int connfd) {
int nread;
char buf[MAXLINE];
nread = read(connfd, buf, MAXLINE);//读取客户端socket流

if (nread == 0) {
printf("client close the connection\n");
close(connfd);
return -1;
}
if (nread < 0) {
perror("read error");
close(connfd);
return -1;
}
printf("recv:%s\n",buf);
write(connfd, buf, nread);//响应客户端
printf("send:%s\n",buf);
return 0;
}