I/O多路复用select、poll和epoll的区别与使用方法

非阻塞I/O的应用程序经常使用poll、selec和epoll系统调用, select本质是通过设置或检查存放fd(文件描述符)标志位的数据结构来进行下一步的处理,会阻塞到有一个或多个I/O就绪;epoll支持一个进程打开最大数目的socket描述符,IO效率不随fd数目增加而线性下降,使用mmap减少复制开销,加速内核与用户空间的消息传递,支持边缘触发!

使用非阻塞I/O的应用程序经常使用poll、selec和epoll系统调用。

select、poll、epoll它们的职责不是做IO,而是帮助调用者寻找当前就绪的设备;linux中设备都被抽象成文件,所以设备在系统调用参数中的表示就是文件描述符(fd),fd其实就是一个整数(例如,标准输入stdin、标准输出stdout、标准出错stderr分别对应的fd是0、1、2)

1. select

概述

select本质是通过设置或检查存放fd(文件描述符)标志位的数据结构来进行下一步的处理。会阻塞,直到有一个或多个I/O就绪

readfds列出的文件描述符被监视是否有数据可供读取。(可读)

writefds列出的文件描述符被监视是否有写入操作完成。(可写)

exceptfds列出的文件描述符被监视是否发生异常,或无法控制的数据是否可用。(仅仅用于socket)

这三类set为NULL时,select()不监视其对应的该类事件。

select()成功返回时,每组set都被修改以使它只包含准备好的I/O描述符。

缺点

(a)单个进程可监视的fd数量被限制;

(b)需要维护一个用来存放大量fd的数据结构,这样会使用户空间和内核空间在传递该结构时复制开销大;

(c)对fd进行扫描是线性的,fd剧增后,IO效率较低,因为每次调用都对fd进行线性扫描遍历,所以随着fd的增加会造成遍历速度慢的性能问题;

(d)内核需要将消息传递用户空间,需要内核拷贝动作;

(e)最大支持1024个fd

2. poll

概述

和select基本一样,除了poll没有使用低效的三个基于位的文件描述符set,而是采用了一个单独的结构体pollfd数组,由fds指针指向这个组

特点

(a)它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态,如果设备就绪则在设备等待队列中加入一项并继续遍历。如果遍历完所有fd后没有发现就绪设备,则挂起当前进程,直到设备就绪或主动超时,被唤醒后它又要再次遍历fd;

(b)没有最大连接数的限制,原因是它是基于链表来存储的;

(c)大量的fd的数组被整体复制于用户态和内核地址空间;

(d)对fd的扫描是线性的;

(e)水平触发:如果报告了fd后,没有被处理,那么下次poll时会再次报告该fd

3. epoll

概述

epoll的接口非常简单,一共就三个函数:

(1). int epoll_create(int size);

创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。

当创建好epoll句柄后,它就是会占用一个fd值,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

(2). int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件注册函数,

第一个参数是epoll_create()的返回值,

第二个参数表示动作,用三个宏来表示:

EPOLL_CTL_ADD:注册新的fd到epfd中;

EPOLL_CTL_MOD:修改已经注册的fd的监听事件;

EPOLL_CTL_DEL:从epfd中删除一个fd;

第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:

events可以是以下几个宏的集合:

EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);

EPOLLOUT:表示对应的文件描述符可以写;

EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);

EPOLLERR:表示对应的文件描述符发生错误;

EPOLLHUP:表示对应的文件描述符被挂断;

EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。

EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。

(3). int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

等待事件的产生,类似于select()调用。

参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

具体使用步骤

  • 首先通过create_epoll(int maxfds)来创建一个epoll的句柄,其中maxfds为epoll所支持的最大句柄数。这个函数会返回一个新的epoll句柄,之后的所有操作将通过这个句柄来进行操作

  • 之后在你的网络主循环里面,每一帧的调用epoll_wait(int epfd, epoll_event events, int max events, int timeout)来查询所有的网络接口,看哪一个可以读,哪一个可以写了。基本的语法为:

1
nfds = epoll_wait(epfd, events, maxevents, -1);

其中epfd为用epoll_create创建之后的句柄,events是一个epoll_event*的指针,当epoll_wait这个函数操作成功之后,epoll_events里面将储存所有的读写事件。max_events是当前需要监听的所有socket句柄数。最后一个timeout是 epoll_wait的超时,为0的时候表示马上返回,为-1的时候表示一直等下去,直到有事件返回,为任意正整数的时候表示等这么长的时间,如果一直没有事件,则返回。一般如果网络主循环是单独的线程的话,可以用-1来等,这样可以保证一些效率,如果是和主逻辑在同一个线程的话,则可以用0来保证主循环的效率。

epoll_wait范围之后应该是一个循环,遍利所有的事件。

特点

(a) 支持一个进程打开最大数目的socket描述符(FD)。所支持的FD上限是最大可以打开文件的数组,在1GB机器上,大约为10万左右;

(b) IO效率不随fd数目增加而线性下降;(select/poll每次调用都会线性扫描全部的集合;epoll中只有活跃的socket才会主动调用callback函数,其他idle状态的socket则不会)

(c) 使用mmap减少复制开销,加速内核与用户空间的消息传递;(epoll是通过内核和用户空间共享同一块内存实现的)

(d) 支持边缘触发,只告诉进程中哪些fd刚刚变为就绪态,并且只通知一次。(epoll使用事件的就绪通知方式,通过epoll_ctl函数注册fd。一旦该fd就绪,内核就会采用类似callback的回调机制激活该fd,epoll_wait便可以收到通知。)

这是epoll用法比较全面的一个例子的伪代码,几乎所有的epoll程序都使用下面的框架:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
while (1) {
nfds = epoll_wait(epfd, events, 20, 500);
for (i = 0; i< nfds; i++) {//被内核IO事件唤醒的fd
if (event[i].data.fd == listenfd) { //有新的连接
connfd = accept(listenfd, (sockaddr *) &clientaddr, &clilen); //accept这个连接
ev.data.fd = connfd;
ev.events = EPOLLIN | EPOLLET;
epoll_ctl = (epfd, EPOLL_CTL_ADD, connfd, &ev); //将新的fd添加到epoll的监听队列中
} else if (event[i].events & EPOLLIN) { //接受到数据,读socket
n = read(sockfd, line, MAXLINE); //读数据
ev.data.ptr = md; //md为自定义的数据,添加数据
ev.events = EPOLLOUT | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);//修改标识符,等待下一个循环时发送数据,异步处理的精髓
} else if (event[i].events & EPOLLOUT) { //有数据要发送,写socket
struct myepoll_data *md = (myepoll_data *)events[i].data.ptr;//取数据
sockfd = md->fd;
send(sockfd, md->ptr, strlen((char *)md->ptr), 0);
ev.data.fd = sockfd;
ev.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev); //修改标识符,等待下一个循环时接受数据
} else {
//其他处理
}
}
}

select的用法举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include <sys/types.h>

#define MAXBUF 1024

/*********************************************************************
* filename: select-server.c
* 演示网络异步通讯、select用法,这是服务器端程序
* ./server 7838 1
*********************************************************************/


int main(int argc, char **argv)
{

int sockfd, new_fd;
socklen_t len;
struct sockaddr_in my_addr, their_addr;

unsigned int myport, lisnum;
char buf[MAXBUF + 1];

fd_set rfds;
struct timeval tv;

int retval, maxfd = -1;

if (argv[1])
myport = atoi(argv[1]);
else
myport = 7838;

if (argv[2])
lisnum = atoi(argv[2]);
else
lisnum = 2;

// create an new socket
if ((sockfd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
{
perror("socket");
exit(1);
}

bzero(&my_addr, sizeof(my_addr));
my_addr.sin_family = PF_INET;
my_addr.sin_port = htons(myport);

if (argv[3])
my_addr.sin_addr.s_addr = inet_addr(argv[3]);
else
my_addr.sin_addr.s_addr = INADDR_ANY;

if (bind(sockfd, (struct sockaddr *) &my_addr, sizeof(struct sockaddr)) == -1)
{
perror("bind");
exit(1);
}

if (listen(sockfd, lisnum) == -1)
{
perror("listen");
exit(1);
}

while (1)
{
printf("\n----等待新的连接到来开始新一轮聊天……\n");

len = sizeof(struct sockaddr);

if ((new_fd = accept(sockfd, (struct sockaddr *) &their_addr, &len)) == -1)
{
perror("accept");
exit(errno);
}
else
{
printf("server: got connection from %s, port %d, socket %d\n", inet_ntoa(their_addr.sin_addr), ntohs(their_addr.sin_port), new_fd);
}

// 开始处理每个新连接上的数据收发
printf("\n准备就绪,可以开始聊天了……直接输入消息回车即可发信息给对方\n");

while (1)
{
// 把集合清空
FD_ZERO(&rfds);

// 把标准输入(stdin)句柄0加入到集合中
FD_SET(0, &rfds);

// 把当前连接(socket)句柄new_fd加入到集合中
FD_SET(new_fd, &rfds);

maxfd = 0;
if (new_fd > maxfd)
{
maxfd = new_fd;
}

// 设置最大等待时间
tv.tv_sec = 5;
tv.tv_usec = 0;

// 开始等待
retval = select(maxfd + 1, &rfds, NULL, NULL, &tv);

if (retval == -1)
{
printf("将退出,select出错! %s", strerror(errno));
break;
}
else if (retval == 0)
{
printf("没有任何消息到来,用户也没有按键,继续等待……\n");
continue;
}
else
{
// 判断当前IO是否是stdin
if (FD_ISSET(0, &rfds)) // 用户按键了,则读取用户输入的内容发送出去
{
bzero(buf, MAXBUF + 1);
fgets(buf, MAXBUF, stdin);
if (!strncasecmp(buf, "quit", 4))
{
printf("自己请求终止聊天!\n");
break;
}
len = send(new_fd, buf, strlen(buf) - 1, 0);
if (len > 0)
printf("消息:%s\t发送成功,共发送了%d个字节!\n", buf, len);
else
{
printf("消息'%s'发送失败!错误代码是%d,错误信息是'%s'\n", buf, errno, strerror(errno));
break;
}
}

// 判断当前IO是否是来自socket
if (FD_ISSET(new_fd, &rfds)) // 当前连接的socket上有消息到来则接收对方发过来的消息并显示
{
bzero(buf, MAXBUF + 1);
// 接收客户端的消息
len = recv(new_fd, buf, MAXBUF, 0);
if (len > 0)
{
printf("接收消息成功:'%s',共%d个字节的数据\n", buf, len);
}
else
{
if (len < 0)
printf("消息接收失败!错误代码是%d,错误信息是'%s'\n", errno, strerror(errno));
else
printf("对方退出了,聊天终止\n");
break;
}
}
}
}

close(new_fd);
// 处理每个新连接上的数据收发结束

printf("还要和其它连接聊天吗?(no->退出)");
fflush(stdout);

bzero(buf, MAXBUF + 1);
fgets(buf, MAXBUF, stdin);
if (!strncasecmp(buf, "no", 2))
{
printf("终止聊天!\n");
break;
}
}

close(sockfd);
return 0;
}

epoll的用法举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <sys/epoll.h>

#define MAXBUF 1024
#define MAXEPOLLSIZE 10000

/*
* 设置句柄为非阻塞方式
*/

int setnonblocking(int sockfd)
{

if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK) == -1)
{
return -1;
}
return 0;
}

/*********************************************************************
* filename: epoll-server.c
* 演示epoll接受海量socket并进行处理响应的方法
*********************************************************************/


int main(int argc, char **argv)
{

int listenfd, connfd, epfd, sockfd, nfds, n, curfds;
socklen_t len;

struct sockaddr_in my_addr, their_addr;
unsigned int myport, lisnum;
char buf[MAXBUF + 1];

// 声明epoll_event结构体的变量,ev用于注册事件,events数组用于回传要处理的事件
struct epoll_event ev;
struct epoll_event events[MAXEPOLLSIZE];

if (argv[1])
myport = atoi(argv[1]);
else
myport = 7838;

if (argv[2])
lisnum = atoi(argv[2]);
else
lisnum = 2;

// 开启 socket 监听
if ((listenfd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
{
perror("socket");
exit(1);
} else
printf("socket 创建成功!\n");

// 把socket设置为非阻塞方式
setnonblocking(listenfd);

bzero(&my_addr, sizeof(my_addr));
my_addr.sin_family = PF_INET;
my_addr.sin_port = htons(myport);
if (argv[3])
my_addr.sin_addr.s_addr = inet_addr(argv[3]);
else
my_addr.sin_addr.s_addr = INADDR_ANY;

if (bind(listenfd, (struct sockaddr *) &my_addr, sizeof(struct sockaddr)) == -1)
{
perror("bind");
exit(1);
} else
printf("IP 地址和端口绑定成功\n");

if (listen(listenfd, lisnum) == -1)
{
perror("listen");
exit(1);
} else
printf("开启服务成功!\n");

// 创建 epoll句柄,把监听 socket 加入到 epoll 集合里 */
epfd = epoll_create(MAXEPOLLSIZE); /*epoll专用的文件描述符*/
len = sizeof(struct sockaddr_in);
ev.events = EPOLLIN|EPOLLET;
ev.data.fd = listenfd;

// 将listenfd注册到epoll事件
if (epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev) < 0)
{
fprintf(stderr, "epoll set insertion error: fd=%d\n", listenfd);
return -1;
} else
printf("监听 socket 加入 epoll 成功!\n");
curfds = 1;

while (1)
{
// 等待有事件发生
nfds = epoll_wait(epfd, events, curfds, -1);

if (nfds == -1)
{
perror("epoll_wait");
break;
}

// 处理所有事件
for (n = 0; n < nfds; ++n)
{
// 如果新监测到一个SOCKET用户连接到了绑定的SOCKET端口,建立新的连接
if (events[n].data.fd == listenfd)
{
len = sizeof(struct sockaddr);
connfd = accept(listenfd, (struct sockaddr *) &their_addr, &len);
if (connfd < 0)
{
perror("accept");
continue;
}
else
printf("有连接来自于: %s:%d, 分配的 socket 为:%d\n", inet_ntoa(their_addr.sin_addr), ntohs(their_addr.sin_port), connfd);

setnonblocking(connfd);

// 设置用于注册的 读操作 事件
ev.events = EPOLLIN | EPOLLET;
// 设置用于读操作的文件描述符
ev.data.fd = connfd;

//注册ev
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
curfds ++;
}
else if (events[n].events & EPOLLIN) // 如果是已经连接的用户,并且收到数据,那么进行读入
{
printf("EPOLLIN\n");
if ((sockfd = events[n].data.fd) < 0)
continue;
int len;
bzero(buf, MAXBUF + 1);

/* 接收客户端的消息 */
/*len = read(sockfd, buf, MAXBUF);*/

len = recv(sockfd, buf, MAXBUF, 0);
if (len > 0)
printf("%d接收消息成功:'%s',共%d个字节的数据\n", sockfd, buf, len);
else
{
if (len < 0)
{
printf("消息接收失败!错误代码是%d,错误信息是'%s'\n", errno, strerror(errno));
epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, &ev);
curfds--;
continue;
}
}

// 设置用于写操作的文件描述符
ev.data.fd = sockfd;
// 设置用于注册的写操作事件
ev.events = EPOLLOUT | EPOLLET;

/*修改sockfd上要处理的事件为EPOLLOUT*/
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev); //修改标识符,等待下一个循环时发送数据,异步处理的精髓!!!!! ?????
}
else if (events[n].events & EPOLLOUT) // 如果有数据发送
{
printf("EPOLLOUT\n");
sockfd = events[n].data.fd;

bzero(buf, MAXBUF + 1);
strcpy(buf, "Server already processes!");

send(sockfd, buf, strlen(buf), 0);

// 设置用于读操作的文件描述符
ev.data.fd = sockfd;
// 设置用于注册的读操作事件
ev.events = EPOLLIN | EPOLLET;

// 修改sockfd上要处理的事件为EPOLIN
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
}
}
}

close(listenfd);
return 0;
}