select与epoll


select与epoll是操作系统提供的两种I/O多路复用的机制,I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。select是比较早出现的技术,但是select同时处理的描述符有个数限制(默认1024)。为了弥补其缺点出现了poll,虽然poll没有个数限制,但是其实现机制与select类似,且随着文件描述符变多系统性能会下降。为了解决这些问题,后来就出现了epoll复用机制,也是现在比较常用的I/O复用机制。

如果写过最基础的TCP服务,那就应该清楚 accept 和 recv 函数是阻塞式的(默认),也就是说程序就卡在这个地方等待,直到有连接或者数据来到。单线程处理这种事情时,一旦有数据到来就会一直处理这个连接的数据,而没法接收新的连接。这种情况可以用多线程处理,但是服务器并发量大的时候,如果每个请求都新建一个线程的话,会占用很多系统资源。因此I/O复用机制得以广泛使用

如何同时高效监视多个socket?

服务端需要管理多个客户端连接,而recv只能监视单个socket,这种矛盾下,人们开始寻找监视多个socket的方法。epoll的要义是高效的监视多个socket。从历史发展角度看,必然先出现一种不太高效的方法,人们再加以改进。只有先理解了不太高效的方法,才能够理解epoll的本质。

假如能够预先传入一个socket列表,如果列表中的socket都没有数据,挂起进程,直到有一个socket收到数据,唤醒进程。这种方法很直接,也是select的设计思想。

select

select的实现机制是先定义一个含有一共1024比特的long型数组的结构fd_set,用来“存放”监听的文件描述符,首先使用宏FD_ZERO把这个集合清空。然后使用宏FD_SET把需要监听的文件描述符放在这个集合中,最后调用select函数来监听这些文件描述符,可以一直阻塞等待直到有可操作的描述符才返回,也可以设置一个超时时间。当调用select()函数的时候,内核会根据I/O状态修改与此描述符匹配的fd_set中的标志位。当select函数返回的时候,返回的是所有句柄列表,并没有告知哪个描述符准备好了。需要手动检查哪个描述符对应的标志位发生了变化,再对相应的描述符进行读写操作。

select API

int select (int __nfds, fd_set *__readfds, fd_set *__writefds, fd_set *__exceptfds, struct timeval *__timeout);
  • 参数1:一般使用最大文件描述符+1
  • 参数2:关注读状态的描述符集,一般都用的这个
  • 参数3:关注写状态的描述符集,不用设置为NULL
  • 参数4:异常状态描述符集,没用过,一般设置NULL
  • 参数5:设置阻塞超时时间,这个参数有3种可能
    1. 设置空指针则一直等待
    2. 等待timeval指定的固定时间
    3. timeval结构值为0,则每次调用都不等待

select缺点:

  1. 可以从函数参数列表上来看,select只能监听读、写、异常这三个事件
  2. selct监听的描述符是有最大值限制的,在Linux内核中是1024
  3. select的实现是每次将待检测的描述符放在位数组中,全部传给内核进行监听,内核监听之后会返回一个就绪描述符个数,并且修改了监听的事件值,以表示该事件就绪。内核再将修改后的数组传给用户空间。用户空间只能通过遍历所有描述符来处理就绪的描述符,之后再将描述符传给内核继续监听……很明显,这样在监听的描述符少的情况下并不影响效率,但是监听的描述符数量特别大的情况下,每次又只有少数描述符上有事件就绪,大量的换入换出会使得效率十分低下(每次调用 select,都需要把 fd 集合从用户态拷贝到内核态,这个开销在 fd 很多时会很大)

select使用示例

#include<stdio.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>                        
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>    

void server() {

    // 创建socket连接
    int lfd = socket(AF_INET,SOCK_STREAM,0);
    struct sockaddr_in my_addr; 
    bzero(&my_addr, sizeof(my_addr));
    my_addr.sin_family = AF_INET; // ipv4
    my_addr.sin_port   = htons(9090);
    my_addr.sin_addr.s_addr = htonl(INADDR_ANY); 
    // 绑定端口
    bind(lfd, (struct sockaddr*)&my_addr, sizeof(my_addr));
    // 监听连接请求
    listen(lfd, 128);
    printf("listen client @port=%d...\n", 9090);
    int lastfd = lfd;
    // 定义文件描述符集
    fd_set read_fd_set, all_fd_set;
    // 服务socket描述符加入set集合中
    FD_ZERO(&all_fd_set);
    FD_SET(lfd, &all_fd_set);
    printf("准备进入while循环\n");
    while (1) {
        read_fd_set = all_fd_set;
        printf("阻塞中... lastfd=%d\n", lastfd);
        int nready = select(lastfd+1, &read_fd_set, NULL, NULL, NULL);
        switch (nready) {
            case 0 :
                printf("select time out ......\n");
                break;
            case -1 :
                perror("select error \n");
                break;
            default:
                // 监听到新的客户端连接
                if (FD_ISSET(lfd, &read_fd_set)) {
                    struct sockaddr_in client_addr;    
                    socklen_t cliaddr_len = sizeof(client_addr);
                    char cli_ip[INET_ADDRSTRLEN] = "";    
                    // 肯定有连接不会阻塞
                    int clientfd = accept(lfd, (struct sockaddr*)&client_addr, &cliaddr_len);
                    inet_ntop(AF_INET, &client_addr.sin_addr, cli_ip, INET_ADDRSTRLEN);
                    printf("----------------------------------------------\n");
                    printf("client ip=%s,port=%d\n", cli_ip, ntohs(client_addr.sin_port));
                    // 将clientfd加入读集合
                    FD_SET(clientfd, &all_fd_set);    
                    lastfd = clientfd;
                    if(0 == --nready) {
                        continue;
                    }
                }
                int i;
                for (i = lfd + 1;i <= lastfd; i++) {
                    // 处理读事件
                    if (FD_ISSET(i, &read_fd_set)) {
                        char recv_buf[512] = "";
                        int rs = read(i, recv_buf, sizeof(recv_buf));
                        if (rs == 0 ) {
                            close(i);
                            FD_CLR(i, &all_fd_set);
                        } else {
                            printf("%s\n",recv_buf);
                            // 给每一个服务端写数据
                            int j;
                            for (j = lfd + 1;j <= lastfd; j++) {
                                if (j != i) {
                                    write(j, recv_buf, strlen(recv_buf));
                                }
                            }
                        }
                    }
                }
        }

    }
}

int main(){
    server();
    return 0;
}

参数2:关注读状态的描述符集,一般都用的这个

参数3:关注写状态的描述符集,不用设置为NULL参数4:异常状态描述符集,没用过,一般设置NULL参数5:设置阻塞超时时间,这个参数有3种可能。1. 设置空指针则一直等待,2. 等待timeval指定的固定时间,3. timeval结构值为0,则每次调用都不等待。

pollselect的改进版,本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态, 但是它没有最大连接数的限制,原因是它是基于链表来存储的

epoll

epoll是在select出现N多年后才出现的,epoll可以理解为event pool,不同与select、poll的轮询机制,epoll采用的是事件驱动机制,每个fd上有注册有回调函数,当网卡接收到数据时会回调该函数,同时将该fd的引用放入rdlist就绪列表中。当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可。如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户。

epoll函数的接口定义

#include <sys/epoll.h>

// 数据结构
// 每一个epoll对象都有一个独立的eventpoll结构体
// 用于存放通过epoll_ctl方法向epoll对象中添加进来的事件
// epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可
struct eventpoll {
    /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
    struct rb_root  rbr;
    /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
    struct list_head rdlist;
};

// API
// 内核中间加一个 ep 对象,把所有需要监听的socket都放到ep对象中
int epoll_create(int size); 
// epoll_ctl 负责把 socket 增加、删除到内核红黑树
int epoll_ctl(int epfd,  // 创建的ep对象
              int op,    // 操作类型 新增、删除等
              int fd,    // 要操作的对象
              struct epoll_event *event  // 事件
             ); 
// epoll_wait 负责检测可读队列,没有可读 socket 则阻塞进程
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

epoll执行流程

  • 调用epoll_create()创建一个ep对象,即红黑树的根节点,返回一个文件句柄
  • 调用epoll_ctl()向这个ep对象(红黑树)中添加、删除、修改感兴趣的事件
  • 调用epoll_wait()等待,当有事件发生时网卡驱动会调用fd上注册的函数并将该fd添加到rdlist中,解除阻塞

下面是 epoll 工作原理的一张汇总图,仅供参考

epoll 的触发模式

  1. 水平触发(LT,Level Triggered):epoll_wait() 会通知你某个描述符上有数据可读写,如果你不处理,下次调用的时候还会通知你,直到你处理为止。如果有大量不关心的文件描述符出现可读写状态,就会一直通知你,严重影响你检查关心的文件描述符的效率。
  2. 边缘触发(ET, Edge Triggered):与水平触发模式相反,调用epoll_wait()的时候会通知你哪个文件描述符可读写,如果你不处理或者没处理完下次也不通知你,只通知你这一次,爱咋咋地。直到第二次有数据可读写的时候再次通知你。这种效率比较高,但是不能保证数据的完整性,如果一次处理不完就不告诉你了。

epoll示例代码

#include<stdio.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>                        
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>    
#include <sys/epoll.h>

void server() {

    // 创建socket连接
    int lfd = socket(AF_INET,SOCK_STREAM,0);
    struct sockaddr_in my_addr; 
    bzero(&my_addr, sizeof(my_addr));
    my_addr.sin_family = AF_INET; // ipv4
    my_addr.sin_port   = htons(8088);
    my_addr.sin_addr.s_addr = htonl(INADDR_ANY); 
    // 绑定端口
    bind(lfd, (struct sockaddr*)&my_addr, sizeof(my_addr));
    // 监听连接请求
    listen(lfd, 128);
    printf("listen client @port=%d...\n", 8088);
    int epct, i;
    struct epoll_event event;
    struct epoll_event events[100];
    memset(events, 0, 100 * sizeof(struct epoll_event));
    int epfd = epoll_create(1);
    event.data.fd = lfd;
    event.events = EPOLLIN;
    epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &event);
    while (1) {
        printf("阻塞中....\n");
        int nready = epoll_wait(epfd, events, 20, -1);
        int i;
        for (i = 0; i < nready; ++i) {
            // 监听到新的客户端连接
            if (events[i].data.fd == lfd) {
                struct sockaddr_in client_addr;    
                socklen_t cliaddr_len = sizeof(client_addr);
                char cli_ip[INET_ADDRSTRLEN] = "";    
                // 肯定有连接不会阻塞
                int clientfd = accept(lfd, (struct sockaddr*)&client_addr, &cliaddr_len);
                inet_ntop(AF_INET, &client_addr.sin_addr, cli_ip, INET_ADDRSTRLEN);

                event.data.fd = clientfd;
                event.events = EPOLLIN | EPOLLET;
                epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &event);

                printf("----------------------------------------------\n");
                printf("client ip=%s,port=%d\n", cli_ip, ntohs(client_addr.sin_port));
            } else {
                char recv_buf[512] = "";
                int rs = read(events[i].data.fd, recv_buf, sizeof(recv_buf));
                if (rs < 0) {
                    close(events[i].data.fd);
                    epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &event);
                    continue;
                }
                printf("%s\n",recv_buf);
            }
        }


    }
}

int main(){
    server();
    return 0;
}


Author: 顺坚
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint polocy. If reproduced, please indicate source 顺坚 !
评论
 Previous
红黑树的翻转 红黑树的翻转
红黑树(R-B Tree),它是二叉树中,最经典也是最复杂的数据结构。也是一种常见的自平衡二分搜索树。说到自平衡就涉及到旋转,这是红黑树最难的地方之一。我在一开始接触时也是云里雾里,不知道为什么红黑树相比普通的二叉树要设置这么多条件,更别说
2022-06-06
Next 
Java的反射与动态编译 Java的反射与动态编译
JAVA反射机制是在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法;对于任意一个对象,都能够调用它的任意一个方法;这种动态获取的信息以及动态调用对象的方法的功能称为Java语言的反射机制。这种技术在一些开源框架中使用非常广泛,
2022-06-05
  TOC