I/O多路复用理解

这篇文章我们将详细说明I/O多路复用的内容

在了解I/O多路复用前,我们首先要了解

I/O的四种形式

  • 同步阻塞形式

  • 异步阻塞形式

    异步操作是可以被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知时被阻塞。

  • 同步非阻塞形式

    很多人会写阻塞的read/write 操作(因为网卡上还没数据),但是别忘了可以对fd设置O_NONBLOCK 标志位,这样就可以将同步操作变成非阻塞的了。

  • 异步非阻塞形式

I/O 多路复用

要想客户端和服务器能在网络中通信,那必须得使用 Socket 编程,它是进程间通信里比较特别的方式,特别之处在于它是可以跨主机间通信。

概念

只使用一个进程来维护多个请求
一个进程虽然任一时刻只能处理一个请求,但是处理每个请求的事件时,耗时控制在 1 毫秒以内,这样 1 秒内就可以处理上千个请求,把时间拉长来看,多个请求复用了一个进程,这就是多路复用,这种思想很类似一个 CPU 并发多个进程,所以也叫做时分多路复用

TCP协议下的Socket

TCP 协议的 Socket 程序的整个过程如下图:
socket全过程
有没有觉得读写 Socket 的方式,好像读写文件一样。

基于 Linux 一切皆文件的理念,在内核中 Socket 也是以「文件」的形式存在的,也是有对应的文件描述符。
关于文件描述符的详细请看这里→文件描述符/文件表项

select/poll

select 实现多路复用的方式是,将已连接的 Socket 都放到一个文件描述符集合,然后调用 select 函数将文件描述符集合拷贝到内核里,让内核来检查是否有网络事件产生,检查的方式很粗暴,就是通过遍历文件描述符集合的方式,当检查到有事件产生后,将此 Socket 标记为可读或可写, 接着再把整个文件描述符集合拷贝回用户态里,然后用户态还需要再通过遍历的方法找到可读或可写的 Socket,然后再对其处理。

所以,对于 select 这种方式,需要进行 2 次「遍历」文件描述符集合,一次是在内核态里,一个次是在用户态里 ,而且还会发生 2 次「拷贝」文件描述符集合,先从用户空间传入内核空间,由内核修改后,再传出到用户空间中。

select使用固定长度的 BitsMap,表示文件描述符集合,而且所支持的文件描述符的个数是有限制的,在 Linux 系统中,由内核中的 FD_SETSIZE 限制, 默认最大值为 1024,只能监听 0~1023 的文件描述符。

poll 不再用 BitsMap 来存储所关注的文件描述符,取而代之用动态数组,以链表形式来组织,突破了select的文件描述符个数限制,当然还会受到系统文件描述符限制。

但是 poll select 并没有太大的本质区别,都是使用「线性结构」存储进程关注的 Socket 集合,因此都需要遍历文件描述符集合来找到可读或可写的 Socket,时间复杂度为 O(n),而且也需要在用户态与内核态之间拷贝文件描述符集合,这种方式随着并发数上来,性能的损耗会呈指数级增长。

epoll

1
2
3
4
5
6
//epoll_create 建立一个epoll对象
int epoll_create(int size);
//epoll_ctl 向epoll对象中添加socket套接字;
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// epoll_wait 返回已经准备就绪事件的连接
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

具体使用请看后面的echo服务器示例

从下图你可以看到 epoll 相关的接口作用:

1616141449-ykDyCB-file_1616141448576

epoll 通过两个方面,很好解决了 select/poll 的问题。

第一点,epoll 在内核里使用红黑树来跟踪进程所有待检测的文件描述字,把需要监控的 socket 通过 epoll_ctl() 函数加入内核中的红黑树里,红黑树是个高效的数据结构,增删查一般时间复杂度是 O(logn),通过对这棵黑红树进行操作,这样就不需要像 select/poll 每次操作时都传入整个 socket 集合,只需要传入一个待检测的 socket,减少了内核和用户空间大量的数据拷贝和内存分配。

第二点,epoll 使用事件驱动的机制,所有添加到epoll中的事件都会与设备(如网卡)驱动程序**建立回调关系,也就是说相应事件的发生时会调用这里的回调方法。这个回调方法在内核中叫做ep_poll_callback,它会把这样的事件放到上面的rdllist双向链表**中, 当用户调用epoll_wait()函数时,只会返回有事件发生的文件描述符的个数,不需要像 select/poll 那样轮询扫描整个 socket 集合,大大提高了检测的效率。

epoll

epoll 支持两种事件触发模式,分别是边缘触发(edge-triggered,ET水平触发(level-triggered,LT

  • 水平触发:只要这个文件描述符还有数据可读,每次 epoll_wait都会返回它的事件,提醒用户程序去操作
  • 边缘触发:当文件描述符上有新的I/O事件到来时,内核才会通知进程

如果使用水平触发模式,当内核通知文件描述符可读写时,接下来还可以继续去检测它的状态,看它是否依然可读或可写。所以在收到通知后,没必要一次执行尽可能多的读写操作。

如果使用边缘触发模式,I/O 事件发生时只会通知一次,而且我们不知道到底能读写多少数据,所以在收到通知后应尽可能地读写数据,以免错失读写的机会。因此,我们会循环从文件描述符读写数据,那么如果文件描述符是阻塞的,那这个一直读或一直写势必会在最后一次阻塞。所以,边缘触发模式一般和非阻塞 I/O 搭配使用,程序会一直执行 I/O 操作,直到系统调用(如 read 和 write)返回错误,错误类型为 EAGAIN EWOULDBLOCK

触发方式

一般来说,边缘触发的效率比水平触发的效率要高,因为边缘触发可以减少 epoll_wait 的系统调用次数,系统调用也是有一定的开销的的,毕竟也存在上下文的切换。

select/poll 只有水平触发模式,epoll 默认的触发模式是水平触发,但是可以根据应用场景设置为边缘触发模式。

另外,使用 I/O 多路复用时,最好搭配非阻塞 I/O 一起使用,Linux 手册关于 select 的内容中有如下说明:

Under Linux, select() may report a socket file descriptor as “ready for reading”, while nevertheless a subsequent read blocks. This could for example happen when data has arrived but upon examination has wrong checksum and is discarded. There may be other circumstances in which a file descriptor is spuriously reported as ready. Thus it may be safer to use O_NONBLOCK on sockets that should not block.

为什么部分IO设备不需要CPU(题外话)

字符设备, 比如键盘, 打印机等, 还是得通过CPU的。
块设备I/O模块增加DMA(Direct Memory Access 直接存储器)控制器. DMA控制器类似于一个小的CPU, 有自己的寄存器(记录主存地址和取到的字的count等). CPU可以发起一个DMA请求, 传入读写操作类型, 相关I/O设备地址, 内存的起始地址, 要操作的字数.然
后DMA就可以获取总线的控制权, 将一大块内存和外部I/O读入或写出.等操作完成后, 再通知CPU. 释放总线控制权。

echo服务器

我们通过一个回射服务器程序来展示epoll的标准用法。先简要介绍epoll的接口。主要由三个函数组成:

  • epoll_create创建新的epoll实例,一般使用最新的epoll_create1调用
  • epoll_ctl管理感兴趣的文件描述符和相应事件
  • epoll_wait返回就绪的文件描述符,然后对它们进行I/O操作

程序的主要流程如下:

  1. 创建一个监听套接字lfd,调用set_nonblocking函数将其设置为非阻塞
  2. 调用epoll_create1函数创建一个epoll实例,对应的文件描述符为epfd
  3. 将监听套接字lfd加入到epfd的事件列表event中,监听的事件为EPOLLIN
  4. 进入死循环,epoll_wait函数一直阻塞,直到有事件发生。事件信息保存在evlist
  5. 检查文件描述符及其事件:
    • 如果是发生在监听套接字lfd上的事件,则收到了一个客户请求,将返回的连接套接字cfd设置为边沿触发(EPOLLET)模式,加入到事件列表event
    • 如果是连接套接字cfd上发生的事件,对EPOLLIN事件,调用do_echo函数执行回射操作;对EPOLLERREPOLLHUP事件,则关闭描述符

全部代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>

#define PORT_NUM 40713
#define MAX_EVENTS 1024

void error(char *msg) {
perror(msg);
exit(EXIT_FAILURE);
}
// 将 fd 设置为非阻塞
void set_nonblocking(int fd)
{
int flags;
if ((flags = fcntl(fd, F_GETFL, NULL)) < 0) {
error("ERROR fcntl F_GETFL");
}
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
error("ERROR fcntl F_SETFL");
}
}

void do_echo(int fd) {
char c, buf[100];
int cnt = 0;
for (;;) {
int n = read(fd, &c, 1);
if (n < 0) {
if (errno == EAGAIN) {
break;
}
error("ERROR read from client");
} else if (n == 0) {
break;
}
buf[cnt++] = c;
if (c == '\n') {
if (write(fd, buf, cnt) < 0) {
error("ERROR write to client");
}
cnt = 0;
continue;
}
}
}

int main(int argc, char **argv) {
int lfd;
struct sockaddr_in server_addr;
struct sockaddr_in client_addr;

int epfd;
struct epoll_event event;
struct epoll_event *evlist;

server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(PORT_NUM);

if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
error("ERROR socket");
}
set_nonblocking(lfd);
if (bind(lfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
error("ERROR bind");
}
if (listen(lfd, 16) < 0) {
error("ERROR listen");
}
//创建一个epoll实例,对应的文件描述符为epfd
epfd = epoll_create1(0);
if (epfd < 0) {
perror("ERROR epoll_create1");
}
//将监听套接字 lfd 加入到 epfd 的事件列表 event 中,监听的事件为`EPOLLIN`
event.events = EPOLLIN;
event.data.fd = lfd;
//对事件进行监听
if (epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &event) < 0) {
error("ERROR epoll_ctl");
}
evlist = malloc(sizeof(struct epoll_event)*MAX_EVENTS);

for (;;) {
int cfd;
socklen_t len = sizeof(client_addr);
//epoll_wait函数一直阻塞,直到有事件发生。事件信息保存在evlist中
//-1表示如果取不到东西,就无限等待
//0代表立马返回
//它会把传入的时间转为绝对时间
int ready = epoll_wait(epfd, evlist, MAX_EVENTS, -1);
if (ready == -1) {
error("ERROR epoll_wait");
}

for (int i = 0; i < ready; i++) {
//如果是 套接字lfd 就获取
if (evlist[i].data.fd == lfd) {
cfd = accept(lfd, (struct sockaddr *)&client_addr, &len);
if (cfd < 0) {
perror("ERROR accept");
continue;
}
set_nonblocking(cfd);
event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
event.data.fd = cfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &event);
//如果是可读事件发生,进行操作
} else if (evlist[i].events & EPOLLIN) {
cfd = evlist[i].data.fd;
do_echo(cfd);
} else if (evlist[i].events & (EPOLLHUP | EPOLLERR)) {
if (close(evlist[i].data.fd) == -1) {
error("ERROR close");
}
}
}
}

close(lfd);
exit(EXIT_SUCCESS);
}

参考资料

  1. 分享|再也不怕被问 I/O 多路复用 | 技术人求职记 - 力扣(LeetCode) (leetcode-cn.com)
  2. I/O多路复用之epoll | 辛未羊的网络日志 (panqiincs.me)
  3. epoll原理详解及epoll反应堆模型
  4. Linux的I/O多路复用机制 - Journey-C
您的支持将鼓励我继续创作!