select

1
2
3
4
5
6
7
8
9
10
11
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);

void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
  • nfds 是最大文件描述符号 +1 怎么可能这么简单,它限制的是最大值而不是个数
  • readfds 用来记录可读fd集合
  • writefds 用来记录可写fd集合
  • exceptfds 用来检查带外数据
  • timeout 决定select等待I/O时间

1.timeout该值为NULL,会阻塞一定等到监控的文件描述符集合中产生状态变化(可读,可写等)
2.timeout值为0分0毫秒,非阻塞,不关注文件描述符是否变化立刻返回
3.timeout正常值,timeout这段时间内阻塞,如果监控集合中有信号来临,select将返回,否则超时返回

返回值:

  • 0 出发信号的fd数目
  • =0 超时
  • -1 出错

作用:
用来管理fd集合,实现多fd集合监听操作

select用户态用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#define MYPORT 1234 // the port users will be connecting to

#define BACKLOG 5 // how many pending connections queue will hold

#define BUF_SIZE 200

int fd_A[BACKLOG]; // accepted connection fd
int conn_amount; // current connection amount

void showclient()
{
int i;
printf("client amount: %d\n", conn_amount);
for (i = 0; i < BACKLOG; i++) {
printf("[%d]:%d ", i, fd_A[i]);
}
printf("\n\n");
}

int main(void)
{
int sock_fd, new_fd; // listen on sock_fd, new connection on new_fd
struct sockaddr_in server_addr; // server address information
struct sockaddr_in client_addr; // connector's address information
socklen_t sin_size;
int yes = 1;
char buf[BUF_SIZE];
int ret;
int i;

if ((sock_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
perror("socket");
exit(1);
}

if (setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
perror("setsockopt");
exit(1);
}

server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(MYPORT);
server_addr.sin_addr.s_addr = INADDR_ANY;
memset(server_addr.sin_zero, '\0', sizeof(server_addr.sin_zero));

if (bind(sock_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
perror("bind");
exit(1);
}

if (listen(sock_fd, BACKLOG) == -1) {
perror("listen");
exit(1);
}

printf("listen port %d\n", MYPORT);

fd_set fdsr;
int maxsock;
struct timeval tv;

conn_amount = 0;
sin_size = sizeof(client_addr);
maxsock = sock_fd;
while (1) {
// initialize file descriptor set
FD_ZERO(&fdsr);
FD_SET(sock_fd, &fdsr);

// timeout setting
tv.tv_sec = 30;
tv.tv_usec = 0;

// add active connection to fd set
for (i = 0; i < BACKLOG; i++) {
if (fd_A[i] != 0) {
FD_SET(fd_A[i], &fdsr);
}
}

ret = select(maxsock + 1, &fdsr, NULL, NULL, &tv);
if (ret < 0) {
perror("select");
break;
} else if (ret == 0) {
printf("timeout\n");
continue;
}

// check every fd in the set
for (i = 0; i < conn_amount; i++) {
if (FD_ISSET(fd_A[i], &fdsr)) {
ret = recv(fd_A[i], buf, sizeof(buf), 0);
if (ret <= 0) { // client close
printf("client[%d] close\n", i);
close(fd_A[i]);
FD_CLR(fd_A[i], &fdsr);
fd_A[i] = 0;
} else { // receive data
if (ret < BUF_SIZE)
memset(&buf[ret], '\0', 1);
printf("client[%d] send:%s\n", i, buf);
}
}
}

// check whether a new connection comes
if (FD_ISSET(sock_fd, &fdsr)) {
new_fd = accept(sock_fd, (struct sockaddr *)&client_addr, &sin_size);
if (new_fd <= 0) {
perror("accept");
continue;
}

// add to fd queue
if (conn_amount < BACKLOG) {
fd_A[conn_amount++] = new_fd;
printf("new connection client[%d] %s:%d\n", conn_amount,
inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
if (new_fd > maxsock)
maxsock = new_fd;
}
else {
printf("max connections arrive, exit\n");
send(new_fd, "bye", 4, 0);
close(new_fd);
break;
}
}
showclient();
}

// close other connections
for (i = 0; i < BACKLOG; i++) {
if (fd_A[i] != 0) {
close(fd_A[i]);
}
}

exit(0);
}

代码实现采用select用法描述:

1.select作为服务端使用
2.select监听服务的fd,如果有客户端连接此服务端时候,服务端fd会被触发,然后调用accept完成连接
3.select监听服务端和客户端建立好连接的fd,如果客户端发送数据过来,select可监听到读信号,然后recv读出数据。

select实现分析

用户态select 系统调用 sys_select

调用栈如下:
0xffffffff81213f80 : sys_select+0x0/0x110 [kernel]
0xffffffff81697189 : system_call_fastpath+0x16/0x1b [kernel]

实现代码位于:fs/select.c SYSCALL_DEFINE5(select,…

select功能概述

sys_select实现分析

分析结论:

sys_select

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
fd_set __user *, exp, struct timeval __user *, tvp)
{
struct timespec end_time, *to = NULL;
struct timeval tv;
int ret;

//用户态时间处理,将用户态时间拷入内核态并将参数规整为struct timespec以供调用
if (tvp) {
if (copy_from_user(&tv, tvp, sizeof(tv)))
return -EFAULT;

to = &end_time;
if (poll_select_set_timeout(to,
tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),
(tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))
return -EINVAL;
}

//select的核心实现
ret = core_sys_select(n, inp, outp, exp, to);

//该函数会将剩余的时间拷入到用户态的tvp 中
ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);
return ret;
}

1.将用户态select时间参数拷入内核
2.调用core_sys_select
3.将select退出后剩余时间结果拷入用户态时间参数中

core_sys_select

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
fd_set __user *exp, struct timespec *end_time)
{
fd_set_bits fds;
void *bits;
int ret, max_fds;
unsigned int size;
struct fdtable *fdt;
/* Allocate small arguments on the stack to save memory and be faster */
long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];

//用户态给予参数nfds < 0 ,直接返并报告参数非法 -EINVAL
ret = -EINVAL;
if (n < 0)
goto out_nofds;

/* max_fds can increase, so grab it once to avoid race */
rcu_read_lock();
fdt = files_fdtable(current->files);
max_fds = fdt->max_fds;
rcu_read_unlock();
if (n > max_fds)
n = max_fds;

/*
* We need 6 bitmaps (in/out/ex for both incoming and outgoing),
* since we used fdset we need to allocate memory in units of
* long-words.
*/
//以一个文件描述符占1bit,传递进来的这么多fd共占多数字
size = FDS_BYTES(n);
bits = stack_fds;
//检查默认静态数据资源是否够用
if (size > sizeof(stack_fds) / 6) {
/* Not enough space in on-stack array; must use kmalloc */
ret = -ENOMEM;
bits = kmalloc(6 * size, GFP_KERNEL);
if (!bits)
goto out_nofds;
}
//fds用来指向具体的存储空间
fds.in = bits;
fds.out = bits + size;
fds.ex = bits + 2*size;
fds.res_in = bits + 3*size;
fds.res_out = bits + 4*size;
fds.res_ex = bits + 5*size;

//将用户空间的inp outp exp 拷入内核空间
if ((ret = get_fd_set(n, inp, fds.in)) ||
(ret = get_fd_set(n, outp, fds.out)) ||
(ret = get_fd_set(n, exp, fds.ex)))
goto out;

//存放返回状态的字段清零,后续可用作返回结果使用
zero_fd_set(n, fds.res_in);
zero_fd_set(n, fds.res_out);
zero_fd_set(n, fds.res_ex);

//select核心逻辑处理函数
ret = do_select(n, &fds, end_time);

//存在错误
if (ret < 0)
goto out;
//超时情况
if (!ret) {
ret = -ERESTARTNOHAND;
if (signal_pending(current))
goto out;
ret = 0;
}

//把结果集拷入用户空间
if (set_fd_set(n, inp, fds.res_in) ||
set_fd_set(n, outp, fds.res_out) ||
set_fd_set(n, exp, fds.res_ex))
ret = -EFAULT;

out:
//释放辅助内存
if (bits != stack_fds)
kfree(bits);
out_nofds:
return ret;
}

1.检验nfds,如果其小于0,参数异常返回;并规整nfds(最大不能超过当前进程的max_fds)
2.将用户态fd集合拷入内核态
3.运行do_select
4.将do_select检测结果拷入用户空间
5.释放select运算中辅助内存

do_select

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
ktime_t expire, *to = NULL;
struct poll_wqueues table;
poll_table *wait;
int retval, i, timed_out = 0;
unsigned long slack = 0;
unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;
unsigned long busy_end = 0;

rcu_read_lock();
//检查fd对应file状态,且找出最大fd
retval = max_select_fd(n, fds);
rcu_read_unlock();

if (retval < 0)
return retval;
n = retval;

poll_initwait(&table);
wait = &table.pt;
//传入的时间为0s 0ms time_out标记为1 这种情况不阻塞直接返回
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
wait->_qproc = NULL;
timed_out = 1;
}

//正常情况处理。 超时时间转换
if (end_time && !timed_out)
slack = select_estimate_accuracy(end_time);

retval = 0;
for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
bool can_busy_loop = false;

inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;

//所有监听的fd大循环
for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, mask, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;

//32个文件描述符号,没有任何状态被检测,进入下一轮32个
in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex;
if (all_bits == 0) {
i += BITS_PER_LONG;
continue;
}
// 这一轮32个fd存在需要检测的状态
for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {
struct fd f;
//超过最大待检测fd n直接退出循环
if (i >= n)
break;

//跳过没有状态检测的fd
if (!(bit & all_bits))
continue;

f = fdget(i);
if (f.file) {
const struct file_operations *f_op;
f_op = f.file->f_op;

//设置fd检测事件掩码,poll相关情况处理
mask = DEFAULT_POLLMASK;
if (f_op && f_op->poll) {
//设置用户需要探查的标记
wait_key_set(wait, in, out,
bit, busy_flag);
//获取fd当前对应的信号掩码
mask = (*f_op->poll)(f.file, wait);
}
fdput(f);

//可读
if ((mask & POLLIN_SET) && (in & bit)) {
res_in |= bit;
retval++;
wait->_qproc = NULL;
}
//可写
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
wait->_qproc = NULL;
}
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
wait->_qproc = NULL;
}
/* got something, stop busy polling */
if (retval) {
can_busy_loop = false;
busy_flag = 0;

/*
* only remember a returned
* POLL_BUSY_LOOP if we asked for it
*/
} else if (busy_flag & mask)
can_busy_loop = true;

}
}
//将检测结果存下来
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;

//增加抢占点 该抢占点可达到效果是:判断是否有进程需要抢占当前进程,如果是将立即发生调度
//已经检查过的fd如果此时被唤醒,则会在此产生调度
cond_resched();
}
wait->_qproc = NULL;
if (retval || timed_out || signal_pending(current))
break;

//设备就绪异常超时终止灯信号触发,直接break,可跳出大循环结束程序
if (table.error) {
retval = table.error;
break;
}

/* only if found POLL_BUSY_LOOP sockets && not out of time */
if (can_busy_loop && !need_resched()) {
if (!busy_end) {
busy_end = busy_loop_end_time();
continue;
}
if (!busy_loop_timeout(busy_end))
continue;
}
busy_flag = 0;

/*
* If this is the first loop and we have a timeout
* given, then we convert to ktime_t and set the to
* pointer to the expiry value.
*/
if (end_time && !to) {
expire = timespec_to_ktime(*end_time);
to = &expire;
}

//当前用户进程从这里进入睡眠,超时后timed_out 置1 直接退出
if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
to, slack))
timed_out = 1;
}

poll_freewait(&table);
return retval;
}

do_select为select的核心实现,其处理过程如下:

1.调用poll_initwait初始化poll_wqueues对象table,包括其成员poll_table;

2.如果用户传入的timeout不为NULL,但是设定的时间为0,那么设置poll_table指针wait(即 &table.pt)为NULL;当&table.pt为NULL,它并不会被加到等到队列中。

3.将in,out和exception进行或运算,得到all_bits,然后遍历all_bits中bit为1的fd,根据进程的fd_table查找到file指针filp,然后设置wait的key值(POLLEX_SET, POLLIN_SET,POLLIN_SET三者的或运算,取决于用户输入),并调用filp->poll(filp, wait),获得返回值mask。 再根据mask值检查该文件是否立即满足条件,如果满足,设置res_in/res_out/res_exception的值,执行retval++, 并设置wait为NULL。

4.在每遍历32(取决于long型整数的位数)个文件后,调用1次cond_resched(),主动寻求调度,可以等待已经遍历过的文件是否有唤醒的;

5.在遍历完所有文件之后,设置wait为NULL,并检查是否有满足条件的文件(retval值是否为0),或者是否超时,或者是否有未决信号,如果有那么直接跳出循环,进入步骤7;

6.否则调用poll_schedule_timeout,使进程进入睡眠,直到超时(如果未设置超时,那么是直接调用的schedule())。如果是超时后进程继续执行,那么设置pwq->triggered为0;如果是被文件对应的驱动程序唤醒的,那么pwq->triggered被设置为1.

7.最终,函数调用poll_freewait,将本进程从所有文件的等待队列中删掉,并删除分配的poll_table_page对象,回收内存,并返回retval值。

8.拷贝res_in, res_out和res_exception到传入的in, out, exception,并返回ret。

select睡眠过程

do_select


步骤1
poll_initwait(&table);
wait = &table.pt;

步骤2
if (f_op && f_op->poll) {
wait_key_set(wait, in, out,
bit, busy_flag);
//如果是socket此处调用的是sock_poll
mask = (*f_op->poll)(f.file, wait);
}
步骤3

if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
to, slack))

步骤1:初始化table
struct poll_wqueues table;

1
2
3
4
5
6
7
8
9
10
void poll_initwait(struct poll_wqueues *pwq)
{
init_poll_funcptr(&pwq->pt, __pollwait);
pwq->polling_task = current;
pwq->triggered = 0;
pwq->error = 0;
pwq->table = NULL;
pwq->inline_index = 0;
}
EXPORT_SYMBOL(poll_initwait);
  • 将当前进程标志current给table让其记录下来
  • 将__pollwait给table->pt->_qproc让其记录下来

步骤2:调用sock_poll最终调用tcp_pool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/* No kernel lock held - perfect */
static unsigned int sock_poll(struct file *file, poll_table *wait)
{
unsigned int busy_flag = 0;
struct socket *sock;

/*
* We can't return errors to poll, so it's either yes or no.
*/
sock = file->private_data;

if (sk_can_busy_loop(sock->sk)) {
/* this socket can poll_ll so tell the system call */
busy_flag = POLL_BUSY_LOOP;

/* once, only if requested by syscall */
if (wait && (wait->_key & POLL_BUSY_LOOP))
sk_busy_loop(sock->sk, 1);
}
//针对于tcpsocket来讲此处调用tcp_pool
return busy_flag | sock->ops->poll(file, sock, wait);
}

/*
* Wait for a TCP event.
*
* Note that we don't need to lock the socket, as the upper poll layers
* take care of normal races (between the test and the event) and we don't
* go look at any of the socket buffers directly.
*/
unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
unsigned int mask;
struct sock *sk = sock->sk;
const struct tcp_sock *tp = tcp_sk(sk);

sock_rps_record_flow(sk);

sock_poll_wait(file, sk_sleep(sk), wait);
if (sk->sk_state == TCP_LISTEN)
return inet_csk_listen_poll(sk);

/* Socket is not locked. We are protected from async events
* by poll logic and correct handling of state changes
* made by other threads is impossible in any case.
*/

mask = 0;
if (sk->sk_shutdown == SHUTDOWN_MASK || sk->sk_state == TCP_CLOSE)
mask |= POLLHUP;
if (sk->sk_shutdown & RCV_SHUTDOWN)
mask |= POLLIN | POLLRDNORM | POLLRDHUP;

/* Connected or passive Fast Open socket? */
if (sk->sk_state != TCP_SYN_SENT &&
(sk->sk_state != TCP_SYN_RECV || tp->fastopen_rsk != NULL)) {
int target = sock_rcvlowat(sk, 0, INT_MAX);

if (tp->urg_seq == tp->copied_seq &&
!sock_flag(sk, SOCK_URGINLINE) &&
tp->urg_data)
target++;

/* Potential race condition. If read of tp below will
* escape above sk->sk_state, we can be illegally awaken
* in SYN_* states. */
if (tp->rcv_nxt - tp->copied_seq >= target)
mask |= POLLIN | POLLRDNORM;

if (!(sk->sk_shutdown & SEND_SHUTDOWN)) {
if (sk_stream_is_writeable(sk)) {
mask |= POLLOUT | POLLWRNORM;
} else { /* send SIGIO later */
set_bit(SOCK_ASYNC_NOSPACE,
&sk->sk_socket->flags);
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);

/* Race breaker. If space is freed after
* wspace test but before the flags are set,
* IO signal will be lost.
*/
if (sk_stream_is_writeable(sk))
mask |= POLLOUT | POLLWRNORM;
}
} else
mask |= POLLOUT | POLLWRNORM;

if (tp->urg_data & TCP_URG_VALID)
mask |= POLLPRI;
}
/* This barrier is coupled with smp_wmb() in tcp_reset() */
smp_rmb();
if (sk->sk_err)
mask |= POLLERR;

return mask;
}

  • 收集信号状态以mask方式返回
  • 调用sock_poll_wait然后poll_wait最终调用_qproc也就是__pollwait
  • __pollwait
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* Add a new entry */
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
poll_table *p)
{
struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
struct poll_table_entry *entry = poll_get_entry(pwq);
if (!entry)
return;
entry->filp = get_file(filp);
entry->wait_address = wait_address;
entry->key = p->_key;
init_waitqueue_func_entry(&entry->wait, pollwake);
entry->wait.private = pwq;
add_wait_queue(wait_address, &entry->wait);
}
  • 为每个fd对应文件分配 poll_table_entry
  • 将fd对应poll_table_entry加入到等待队列中

步骤3:
poll_schedule_timeout,作用是使进程进入睡眠,直到超时或者被唤醒
如果超时后进程继续执行设置pwq->triggered为0
如果是被文件对应的驱动程序唤醒pwq->triggered为1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int poll_schedule_timeout(struct poll_wqueues *pwq, int state,
ktime_t *expires, unsigned long slack)
{
int rc = -EINTR;

set_current_state(state);
if (!pwq->triggered)
rc = freezable_schedule_hrtimeout_range(expires, slack,
HRTIMER_MODE_ABS);
__set_current_state(TASK_RUNNING);

/*
* Prepare for the next iteration.
*
* The following set_mb() serves two purposes. First, it's
* the counterpart rmb of the wmb in pollwake() such that data
* written before wake up is always visible after wake up.
* Second, the full barrier guarantees that triggered clearing
* doesn't pass event check of the next iteration. Note that
* this problem doesn't exist for the first iteration as
* add_wait_queue() has full barrier semantics.
*/
set_mb(pwq->triggered, 0);

return rc;
}

select唤醒过程

0xffffffff81213130 : pollwake+0x0/0x90 [kernel]
0xffffffff810ba628 : wake_up_common+0x58/0x90 [kernel]
0xffffffff810bc4a4 :
wake_up_sync_key+0x44/0x60 [kernel]
0xffffffff8155825a : sock_def_readable+0x3a/0x70 [kernel]
0xffffffff815c8197 : tcp_data_queue+0x497/0xdd0 [kernel]
0xffffffff815cb4a7 : tcp_rcv_established+0x217/0x760 [kernel]
0xffffffff815d5f8a : tcp_v4_do_rcv+0x10a/0x340 [kernel]
0xffffffff815d76d9 : tcp_v4_rcv+0x799/0x9a0 [kernel]
0xffffffff815b1094 : ip_local_deliver_finish+0xb4/0x1f0 [kernel]
0xffffffff815b1379 : ip_local_deliver+0x59/0xd0 [kernel]
0xffffffff815b0d1a : ip_rcv_finish+0x8a/0x350 [kernel]
0xffffffff815b16a6 : ip_rcv+0x2b6/0x410 [kernel]
0xffffffff815700d2 : netif_receive_skb_core+0x582/0x800 [kernel]
0xffffffff81570368 :
netif_receive_skb+0x18/0x60 [kernel]
0xffffffff815703f0 : netif_receive_skb_internal+0x40/0xc0 [kernel]
0xffffffff81571578 : napi_gro_receive+0xd8/0x130 [kernel]
0xffffffffa00472fc [e1000]

pollwake ->__pollwake->default_wake_function->try_to_wake_up

try_to_wake_up会把进程的状态设置为TASK_RUNNING,并把进程插入CPU运行队列,来唤醒睡眠的进程

linux select 1024限制魔咒

__FD_SETSIZE 默认最大为1024,一个int占用4个byte,也就是32个bit,所以使用了一个int数组大小为32位来表示了我们要操作的fd的数值,每个bit代表了一个handle数值

需要注意的问题是,这里的最大为1024,如果handle数值为1025是不能处理的(而且很容易导致破坏堆栈),不是说可以容纳1024个网络客户端句柄,而是最大的handle数值为1024,再算上系统本身使用的stdout,stdin, stderr默认的3个,因此最多也就是1021个,再算上程序打开的文件句柄等等,实际上使用可能要比1024少上好多。

另外,ulimit对每个进程打开的句柄也有限制。

why 1024 ?

内核参数适用结构体是fd_set

1
2
3
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
fd_set __user *, exp, struct timeval __user *, tvp)
{

fd_set是 __kernel_fd_set

1
typedef __kernel_fd_set		fd_set;

__kernel_fd_set 中fds_bits 最大只能1024

1
2
3
4
5
#define __FD_SETSIZE	1024

typedef struct {
unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof(long))];
} __kernel_fd_set;

我该怎么办才能突破1024限制?

修改掉此宏重新编译吧,当然还有其他办法,但是没必要这么复杂,直接用pool或者epool解决吧
当然你也可以多进程或者多线程,每个进程/线程 分别select

select缺点总结

select效率低下,用户空间和内核空间来回拷贝,select内部吧存进程上下文切换,大型项目不适用
可同时监听的文件数量有限,linux平台1024个
每次调用select都要遍历完成所有的fd,每隔32fd需要调度一次
多个fd情况下,如果小的fs一直可读,会导致大的fd信号不会被收集到
需要在用户态和内核态来回拷贝fd_set,睡眠唤醒机制需要为fd分配poll_table_entry