IO多路复用技术总结

IO多路复用技术总结

来源:微信公众号「编程学习基地」

目录
  • IO 多路复用概述
  • 网络IO模型
  • IO多路复用
  • select
    • select调用
    • timeval结构体
    • select置位
    • fd_set结构体
  • select使用
    • Server
    • client
  • 简易聊天室select版本
    • server
    • client
  • poll调用
    • struct pollfd
    • nfds
    • timeout
    • 返回值
  • poll使用
  • epoll调用
    • epoll_create
    • epoll_ctl
    • struct epoll_event
      • Epoll Events:
    • epoll_wait
  • 基于epoll的简易http服务
    • Build
    • Usage

IO 多路复用概述

I/O 多路复用技术是为了解决进程线程阻塞到某个 I/O 系统调用而出现的技术,使进程不阻塞于某个特定的 I/O 系统调用。

在IO多路复用技术描述前,先讲解下同步,异步,阻塞,非阻塞的概念。

网络IO模型

linux网络IO中涉及到的模型如下:

(1)阻塞式IO

(2)非阻塞式IO

(3)IO多路复用

(4)信号驱动IO

(5)异步IO

今天不谈信号驱动IO,略过..

同步/异步

在学习IO模型的时候,我们必须明确一个概念,处理 IO 的时候,阻塞和非阻塞都是同步 IO。

只有使用了特殊的 API 才是异步 IO,例如Linux网络中的AIO。

再看下POSIX对同步和异步这两个术语的定义:

  • 同步IO操作:导致请求进程阻塞,直到I/O操作完成;
  • 异步IO操作:不导致请求进程阻塞;

通俗的理解下同步和异步

  • 同步:当执行系统调用read时,需要用户等待内核完成从内核缓冲区到用户缓冲区的数据拷贝。

  • 异步:当执行异步IO操作例如aio_read时,用户不需要等待,只需要接收内核完成操作的通知,由内核来完成数据的读取。

阻塞/非阻塞

在知晓阻塞和非阻塞都是同步 IO后,阻塞和非阻塞就很好理解了

阻塞IO:由系统调用read,导致线程一直等待数据返回。

非阻塞IO:系统调用read后立即返回一个状态,当数据达到内核缓冲区之前都是非阻塞的,即返回一个系统调用状态。

ps:闪客的动图做的非常的形象,上述gif动图来源「低并发编程」

IO多路复用

IO多路复用是一种同步IO模型,实现一个线程可以监视多个文件句柄;

select

select 是操作系统提供的系统调用函数,select()用来等待文件描述词(普通文件、终端、伪终端、管道、FIFO、套接字及其他类型的字符型)状态的改变。是一个轮循函数,循环询问文件节点,可设置超时时间,超时时间到了就跳过代码继续往下执行。

通过select,我们可以把一个文件描述符的数组发给操作系统, 让操作系统去遍历,确定哪个文件描述符可以读写, 然后告诉我们去处理:

头文件

#include <sys/select.h>#include <sys/time.h>#include <sys/types.h>#include <unistd.h>

select调用

拥塞函数,拥塞等待文件描述符事件的到来

int select(int maxfdp, fd_set *readset, fd_set *writeset, fd_set *exceptset,struct timeval *timeout);

参数说明:

maxfdp:被监听的文件描述符的最大值,它比所有文件描述符集合中的文件描述符的最大值大1,因为文件描述符是从0开始计数的;

readfds、writefds、exceptset:分别指向可读、可写和异常等事件对应的描述符集合。

timeout:用于设置select函数的超时时间,即告诉内核select等待多长时间之后就放弃等待。timeout == NULL 表示等待无限长的时间,timeout == 0,select立即返回

timeval结构体

struct timeval{          long tv_sec;   /*秒 */    long tv_usec;  /*微秒 */   };

select置位

int FD_ZERO(int fd, fd_set *fdset);   //一个 fd_set类型变量的所有位都设为 0int FD_CLR(int fd, fd_set *fdset);  //清除某个位时可以使用int FD_SET(int fd, fd_set *fd_set);   //设置变量的某个位置int FD_ISSET(int fd, fd_set *fdset); //测试某个位是否被置位

当声明了一个文件描述符集后,必须用FD_ZERO将所有位置零

调用 select函数,拥塞等待文件描述符事件的到来 ;如果超过设定的时间,则不再等待,继续往下执行

select返回后,用FD_ISSET测试给定位是否置位:

if(FD_ISSET(fd, &rset)   {     ...     //do something  }

fd_set结构体

fd_set其实这是一个数组的宏定义,实际上是一long类型的数组,每一个数组元素都能与一打开的文件句柄(socket、文件、管道、设备等)建立联系,建立联系的工作程序员完成,当调用select()时,由内核根据IO状态修改fd_set的内容,由此来通知执行了select()的进程哪个句柄可读。

select使用

整个 select 的流程图如下:

Demo1:select示例

Server

#include <stdio.h>#include <stdlib.h>#include <errno.h>#include <string.h>#include <sys/types.h>#include <netinet/in.h>#include <sys/socket.h>#include <sys/wait.h>#include <unistd.h>#include <arpa/inet.h>#include <sys/time.h>#include <sys/types.h>#define MAXBUF 1024#define LISTEN_NUM 2int main(int argc, char **argv){    int default_port = 8000;    int optch = 0;    while ((optch = getopt(argc, argv, "s:p:")) != -1)    {        switch (optch)        {        case 'p':            default_port = atoi(optarg);            printf("port: %s\n", optarg);            break;        case '?':            printf("Unknown option: %c\n", (char)optopt);            break;        default:            break;        }    }    int sockfd, new_fd;    socklen_t len;    struct sockaddr_in my_addr, their_addr;    char buf[MAXBUF + 1];    fd_set rfds;            // select    struct timeval tv;      //超时时间    int retval, maxfd = -1; // select返回值 select监听句柄的最大数量        if ((sockfd = socket(PF_INET, SOCK_STREAM, 0)) == -1)    {        perror("socket");        exit(EXIT_FAILURE);    }    bzero(&my_addr, sizeof(my_addr));    my_addr.sin_family = PF_INET;    my_addr.sin_port = htons(default_port);    my_addr.sin_addr.s_addr = INADDR_ANY;    if (bind(sockfd, (struct sockaddr *)&my_addr, sizeof(struct sockaddr)) == -1)    {        perror("bind");        exit(EXIT_FAILURE);    }    if (listen(sockfd, LISTEN_NUM) == -1)    {        perror("listen");        exit(EXIT_FAILURE);    }    /*数据处理*/    while (1)    {        printf("\n----wait for new connect port:%d\n",default_port);        len = sizeof(struct sockaddr);        if ((new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &len)) == -1)        {            perror("accept");            exit(errno);        }        else            printf("server: got connection from %s, port %d, socket %d\n",                   inet_ntoa(their_addr.sin_addr), ntohs(their_addr.sin_port), new_fd);        while (1)        {            FD_ZERO(&rfds);            FD_SET(0, &rfds);            FD_SET(new_fd, &rfds);            maxfd = new_fd;            tv.tv_sec = 1;            tv.tv_usec = 0;            retval = select(maxfd + 1, &rfds, NULL, NULL, &tv);            if (retval == -1)            {                perror("select");                exit(EXIT_FAILURE);            }            else if (retval == 0)            {                continue;            }            else            {                /*标准输入*/                if (FD_ISSET(0, &rfds))                {                    bzero(buf, MAXBUF + 1);                    fgets(buf, MAXBUF, stdin);                    if (!strncasecmp(buf, "quit", 4))                    {                        printf("i will quit!\n");                        break;                    }                    len = send(new_fd, buf, strlen(buf) - 1, 0);                    if (len > 0)                        printf("send successful,%d byte send..\n", len);                    else                    {                        printf("send failure!");                        break;                    }                }                if (FD_ISSET(new_fd, &rfds))                {                    bzero(buf, MAXBUF + 1);                    len = recv(new_fd, buf, MAXBUF, 0);                    if (len > 0)                        printf("recv success :'%s', %d byte recv..\n", buf, len);                    else                    {                        if (len < 0)                            printf("recv failure\n");                        else                        {                            printf("the client close ,quit\n");                            break;                        }                    }                }            }        }        close(new_fd);        printf("need othe connecdt (no->quit)");        fflush(stdout);        bzero(buf, MAXBUF + 1);        fgets(buf, MAXBUF, stdin);        if (!strncasecmp(buf, "no", 2))        {            printf("quit!\n");            break;        }    }    close(sockfd);    return 0;}

makefile:

TARGET=serverSRC = $(wildcard *.cpp *.c)OBJ = $(patsubst %.cpp *.c,%.o,$(SRC))DEFS =CFLAGS = -gCC =g++LIBS =  -lpthread$(TARGET):$(OBJ)$(CC) $(CFLAGS) $(DEFS) -o $@ $^ $(LIBS).PHONY:clean:rm -rf *.o $(TARGET)
ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select$ makeg++ -g  -o server select.c -lpthreadubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select$ ./server----wait for new connect port:8000

client

#include <stdio.h>#include <string.h>#include <errno.h>#include <sys/socket.h>#include <resolv.h>#include <stdlib.h>#include <netinet/in.h>#include <arpa/inet.h>#include <unistd.h>#include <sys/time.h>#include <sys/types.h>#define MAXBUF 1024int main(int argc, char **argv){    int sockfd, len;    struct sockaddr_in dest;    char buffer[MAXBUF + 1];    fd_set rfds;    struct timeval tv;    int retval, maxfd = -1;    int optch,ret = -1;    const char*server_addr;    int default_port = 8000;    /*判断是否为合法输入 必须传入一个参数:服务器Ip*/    if(argc<3)    {        printf("usage:tcpcli <IPaddress>");        return 0;    }    while((optch = getopt(argc, argv, "s:p:")) != -1){switch (optch){        case 's':            server_addr = optarg;            break;        case 'p':            default_port = atoi(optarg);            printf("port: %s\n", optarg);            break;        case '?':            printf("Unknown option: %c\n",(char)optopt);                break;        default:            break;}}        if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)     {        perror("Socket");        exit(EXIT_FAILURE);    }    bzero(&dest, sizeof(dest));    dest.sin_family = AF_INET;    dest.sin_port = htons(default_port);    if (inet_aton(server_addr, (struct in_addr *) &dest.sin_addr.s_addr) == 0)     {        perror(server_addr);        exit(EXIT_FAILURE);    }    if (connect(sockfd, (struct sockaddr *) &dest, sizeof(dest)) != 0)     {        perror("Connect ");        exit(EXIT_FAILURE);    }    printf("\nget ready message chat:\n");    while (1) {        FD_ZERO(&rfds);        FD_SET(0, &rfds);        FD_SET(sockfd, &rfds);        maxfd = sockfd;        tv.tv_sec = 1;        tv.tv_usec = 0;        retval = select(maxfd + 1, &rfds, NULL, NULL, &tv);        if (retval == -1) {            printf("select %s", strerror(errno));            break;        } else if (retval == 0)            continue;else{            if (FD_ISSET(sockfd, &rfds)) {                bzero(buffer, MAXBUF + 1);                len = recv(sockfd, buffer, MAXBUF, 0);                if (len > 0)                    printf ("recv message:'%s', %d byte recv..\n",buffer, len);                else {                    if (len < 0)                        printf ("message recv failure\n");                    else{                        printf("server close ,quit\n");                    break;}                }            }            if (FD_ISSET(0, &rfds)){                bzero(buffer, MAXBUF + 1);                fgets(buffer, MAXBUF, stdin);                if (!strncasecmp(buffer, "quit", 4)) {                    printf("i will quit\n");                    break;                }                len = send(sockfd, buffer, strlen(buffer) - 1, 0);                if (len < 0) {                    printf ("message send failure");                    break;                } else                    printf                        ("send success,%d byte send..\n",len);            }        }    }    close(sockfd);    return 0;} 
TARGET=serverSRC = $(wildcard *.cpp *.c)OBJ = $(patsubst %.cpp *.c,%.o,$(SRC))DEFS =CFLAGS = -gCC =g++LIBS =  -lpthread$(TARGET):$(OBJ)$(CC) $(CFLAGS) $(DEFS) -o $@ $^ $(LIBS).PHONY:clean:rm -rf *.o $(TARGET)
ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select/client$ makeg++ -g  -o client client.c -lpthreadubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select/client$ ./client -s 0.0.0.0get ready message chat:

简易聊天室select版本

server

#include<stdio.h>#include<sys/types.h>#include<stdlib.h>#include<sys/socket.h>#include<sys/select.h>#include<netinet/in.h>#include<arpa/inet.h>#include<unistd.h>#include<string.h>#define _BACKLOG_ 5 //监听队列里允许等待的最大值#define MAX_CONNECT 20int fds[MAX_CONNECT];        //用来存放需要处理的IO事件int listen_sock = -1;int creat_sock(int port){    int sock = socket(AF_INET,SOCK_STREAM,0);    if(sock < 0){        perror("creat_sock error");        exit(1);    }    struct sockaddr_in local;    local.sin_family = AF_INET;    local.sin_port = htons(port);    local.sin_addr.s_addr = INADDR_ANY; //inet_addr(0.0.0.0)        // 设置允许socket立即重用    setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&sock, sizeof(sock));      if( bind(sock,(struct sockaddr*)&local,sizeof(local)) < 0){        perror("bind");        exit(2);     }    if(listen(sock,_BACKLOG_) < 0 ){        perror("listen");        exit(4);     }    return sock;}int accept_sock(){    struct sockaddr_in client;    socklen_t len = sizeof(client);    int accept_sock = accept(listen_sock, (struct sockaddr *)&client, &len);    if (accept_sock < 0)    {        perror("accept");        exit(5);    }    printf("connect by a client, ip:%s port:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));    size_t i = 0;    for (; i < MAX_CONNECT; ++i) //将新接受的描述符存入集合中    {        if (fds[i] == -1)        {            fds[i] = accept_sock;            break;        }    }    if (i == MAX_CONNECT)    {        printf("accept is upper limit..\n");        close(accept_sock);    }}int groupChat(int sockFd,void* pBuf,int iSize){    for(int index=0;index<MAX_CONNECT;index++){        if(fds[index] == sockFd || fds[index] == listen_sock)        {            continue;        }        if(fds[index]!=-1){            printf("write fd:%d..socketFd:%d\n",fds[index],sockFd);            write(fds[index],pBuf,iSize);        }    }}int handle_read(int* socketFd){    int socket = *socketFd;    char buf[1024];    memset(buf, '\0', sizeof(buf));    ssize_t size = read(socket, buf, sizeof(buf) - 1);    if (size < 0)    {        perror("read");        exit(6);    }    else if (size == 0)    {        printf("client close..\n");        close(socket);        *socketFd = -1;    }    else    {        printf("client say: %s\n", buf);        groupChat(socket, buf, size);    }}int main(int argc,char* argv[]){    int default_port = 8000;    int optch = 0;    while ((optch = getopt(argc, argv, "s:p:")) != -1)    {        switch (optch)        {        case 'p':            default_port = atoi(optarg);            printf("port: %s\n", optarg);            break;        case '?':            printf("Unknown option: %c\n", (char)optopt);            break;        default:            break;        }    }    listen_sock = creat_sock(default_port);    size_t fds_num = sizeof(fds)/sizeof(fds[0]);    size_t i = 0;    for(;i < fds_num;++i)    {        fds[i] = -1;    }        int max_fd = listen_sock;fds[0] = listen_sock;    fd_set rset;    while(1){        FD_ZERO(&rset);        FD_SET(listen_sock,&rset);        struct timeval timeout = {10 , 0};        size_t i = 0;        for(;i < fds_num;++i)        {            if(fds[i] > 0 ){                FD_SET(fds[i] ,&rset);                if(max_fd < fds[i]){                    max_fd = fds[i];                }            }        }        switch(select(max_fd+1,&rset,NULL,NULL,&timeout))        {            case -1:                perror("select");                break;            case 0:                printf("time out..\n");break;            default:            {                size_t i = 0;                for(;i < fds_num;++i)                {//连接请求//当为 listen_socket 事件就绪的时候,就表明有新的连接请求                    if(FD_ISSET(fds[i],&rset) && fds[i] == listen_sock)                    {                        accept_sock();                    }                    //普通请求                    else if(FD_ISSET(fds[i],&rset) && (fds[i] > 0))                    {                        handle_read(&fds[i]);                    }                    else{}                }            }            break;        }    }    return 0;}

makfeile

TARGET=serverSRC = $(wildcard *.cpp *.c)OBJ = $(patsubst %.cpp *.c,%.o,$(SRC))DEFS =CFLAGS = -gCC =g++LIBS =  -lpthread$(TARGET):$(OBJ)$(CC) $(CFLAGS) $(DEFS) -o $@ $^ $(LIBS).PHONY:clean:rm -rf *.o $(TARGET)
ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select$ makeg++ -g  -o server select.c -lpthreadubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select$ ./serverconnect by a client, ip:127.0.0.1 port:42964

client

#include <stdio.h>#include <string.h>#include <errno.h>#include <sys/socket.h>#include <resolv.h>#include <stdlib.h>#include <netinet/in.h>#include <arpa/inet.h>#include <unistd.h>#include <sys/time.h>#include <sys/types.h>#define MAXBUF 1024#define MAXNAME 64int main(int argc, char **argv){    int sockfd, len;    struct sockaddr_in dest;    char buffer[MAXBUF + 1];    fd_set rfds;    struct timeval tv;    int retval, maxfd = -1;    int optch,ret = -1;    const char*server_addr;    int default_port = 8000;    char* clientName = "佚名";    /*判断是否为合法输入 必须传入一个参数:服务器Ip*/    if(argc<3)    {        printf("usage:tcpcli <IPaddress>");        return 0;    }    while((optch = getopt(argc, argv, "s:p:n:")) != -1){switch (optch){        case 's':            server_addr = optarg;            break;        case 'p':            default_port = atoi(optarg);            printf("port: %s\n", optarg);            break;        case 'n':            clientName = optarg;            printf("client Name: %s\n", optarg);            break;        case '?':            printf("Unknown option: %c\n",(char)optopt);                break;        default:            break;}}        if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)     {        perror("Socket");        exit(EXIT_FAILURE);    }    bzero(&dest, sizeof(dest));    dest.sin_family = AF_INET;    dest.sin_port = htons(default_port);    if (inet_aton(server_addr, (struct in_addr *) &dest.sin_addr.s_addr) == 0)     {        perror(server_addr);        exit(EXIT_FAILURE);    }    if (connect(sockfd, (struct sockaddr *) &dest, sizeof(dest)) != 0)     {        perror("Connect ");        exit(EXIT_FAILURE);    }    printf("get ready message chat:\n");    while (1) {        FD_ZERO(&rfds);        FD_SET(0, &rfds);        FD_SET(sockfd, &rfds);        maxfd = sockfd;        tv.tv_sec = 1;        tv.tv_usec = 0;        retval = select(maxfd + 1, &rfds, NULL, NULL, &tv);        if (retval == -1) {            printf("select %s", strerror(errno));            break;        } else if (retval == 0)            continue;else{            if (FD_ISSET(sockfd, &rfds)) {                bzero(buffer, MAXBUF + 1);                len = recv(sockfd, buffer, MAXBUF, 0);                if (len > 0)                    printf ("recv byte %d, %s\n",len, buffer);                else {                    if (len < 0)                        printf ("message recv failure\n");                    else{                        printf("server close ,quit\n");                    break;}                }            }            if (FD_ISSET(0, &rfds)){                char name_msg[MAXNAME + MAXBUF];                bzero(buffer, MAXBUF + 1);                fgets(buffer, MAXBUF, stdin);                if (!strncasecmp(buffer, "quit", 4)) {                    printf("i will quit\n");                    break;                }                sprintf(name_msg, "[%s]: %s", clientName, buffer);                len = send(sockfd, name_msg, strlen(name_msg) - 1, 0);                if (len < 0) {                    printf ("message send failure");                    break;                } else                    printf("send success,%d byte send..\n",len);            }        }    }    close(sockfd);    return 0;} 

makefile

TARGET=serverSRC = $(wildcard *.cpp *.c)OBJ = $(patsubst %.cpp *.c,%.o,$(SRC))DEFS =CFLAGS = -gCC =g++LIBS =  -lpthread$(TARGET):$(OBJ)$(CC) $(CFLAGS) $(DEFS) -o $@ $^ $(LIBS).PHONY:clean:rm -rf *.o $(TARGET)
ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select/client$ ./client -s 0.0.0.0 -n 梦凡client Name: 梦凡get ready message chat:

poll调用

Poll就是监控文件是否可读的一种机制,作用与select一样。

#include <poll.h>int poll(struct pollfd fds[], nfds_t nfds, int timeout)

参数说明

struct pollfd

fds:是一个struct pollfd结构类型的数组,列出了我们需要poll()检查的文件描述符

typedef struct pollfd {        int fd;           /* 需要被检测或选择的文件描述符*/        short events;     /* 对文件描述符fd上感兴趣的事件 */        short revents;    /* 文件描述符fd上当前实际发生的事件*/} pollfd_t;

​events:想要监听的事件

​revents:实际上发生的事件

POLLINPOLLOUTPOLLPRIPOLLRDHUBPOLLHUPPOLLERR

nfds

指定了fds中元素的个数,nfds_t为无符号整形

timeout

决定阻塞行为,一般如下:

  • -1:一直阻塞到fds数组中有一个达到就绪态或者捕获到一个信号

  • 0:不会阻塞,立即返回

  • >0:阻塞时间

返回值

  • >0:数组fds中准备好读、写或出错状态的那些socket描述符的总数量;

  • ==0:数组fds中没有任何socket描述符准备好读、写,或出错;此时poll超时

  • -1: poll函数调用失败

poll使用

#include <stdio.h>#include <poll.h>#include <string.h>int main(){int timeout = 0;   char buf[1024];struct pollfd fd_poll[1];   //设置只有一个事件while(1){fd_poll[0].fd = 0;      fd_poll[0].events = POLLIN;fd_poll[0].revents = 0;   memset(buf, '\0', sizeof(buf));switch( poll(fd_poll, 1, -1) ){case 0:perror("timeout!");break;case -1:perror("poll");break;default:{if( fd_poll[0].revents & POLLIN ){gets(buf);printf("buf : %s\n",buf);}}break;}}return 0;}

makefile

tcp_poll:tcp_poll.c    gcc -o $@ $^.PHONY:cleanclean:    rm -f tcp_poll

epoll调用

epoll没有对描述符数目的限制,它所支持的文件描述符上限是整个系统最大可以打开的文件数目,例如,在1GB内存的机器上,这个限制大概为10万左右。

epoll只有 epoll_createepoll_ctlepoll_wait 这三个系统调用。

第一步,创建一个 epoll 句柄

第二步,向内核添加、修改或删除要监控的文件描述符。

第三步,发起了 select() 调用

其定义如下:

#include <sys/epoll.h>int epoll_create(int size);int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

epoll_create

#include <sys/epoll.h>int epoll_create(int size);

调用epoll_create方法创建一个epoll的句柄,使用完epoll后使用close函数进行关闭

epoll_ctl

#include <sys/epoll.h>int epoll_ctl(int epfd//第一个参数epfd:epoll_create函数的返回值。, int op//第二个参数events:表示动作类型。有三个宏来表示, int fd//第三个参数fd:需要监听的fd。, struct epoll_event *event);//第四个参数event:告诉内核需要监听什么事件。

op:

  • EPOLL_CTL_ADD:注册新的fd到epfd中;

  • EPOLL_CTL_MOD:修改已经注册的fd的监听事件;

  • EPOLL_CTL_DEL:从 epfd 中删除一个 fd。

fd:需要注册监视对象文件描述符

struct epoll_event

// 感兴趣的事件和被触发的事件struct epoll_event {    __uint32_t events; // Epoll events    epoll_data_t data; // User data variable};// 保存触发事件的某个文件描述符相关的数据typedef union epoll_data {    void *ptr;    int fd;    __uint32_t u32;    __uint64_t u64;} epoll_data_t;
Epoll Events:

EPOLLIN:表示对应的文件描述符可读(包括对端Socket);
EPOLLOUT:表示对应的文件描述符可写;
EPOLLPRI:表示对应的文件描述符有紧急数据可读(带外数据);
EPOLLERR:表示对应的文件描述符发生错误
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET:将EPOLL设为边缘触发(Edge Triggered),这是相对于水平触发(Level Triggered)而言的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket,需要再次添加

例如:

struct epoll_event ep_ev;int accept_sock = accept(listen_sock,(struct sockaddr*)&remote,&len);ep_ev.events = EPOLLIN | EPOLLET;ep_ev.data.fd = accept_sock;epoll_ctl(epoll_fd,EPOLL_CTL_ADD,accept_sock,&ep_ev)

epoll_wait

收集在epoll监控的事件中已经发生的事件

#include <sys/epoll.h>int epoll_wait(int epfd//第一个参数epfd:epoll_create函数的返回值。, struct epoll_event *events, int maxevents, int timeout);//超时时间(毫秒)

第一个参数epfd:epoll_create函数的返回值。

第二个参数events:是分配好的epoll_event结构体数组,epoll将会把发生的事件赋值到events数组中(events不可以是空指针,内核只负责把数据赋值到这个event数组中,不会去帮助我们在用户态分配内存)

第三个参数maxevents:maxevents告诉内核这个events数组有多大,这个maxevents的值不能大于创建epoll_create时的size。

第四个参数:是超时时间(毫秒),如果函数调用成功,则返回对应IO上已准备好的文件描述符数目,如果返回0则表示已经超时。

基于epoll的简易http服务器

基于epoll的简单回显服务器

#include<stdio.h>#include<unistd.h>#include<sys/types.h>#include<sys/socket.h>#include<netinet/in.h>#include<arpa/inet.h>#include<sys/epoll.h>#include<fcntl.h>#include<stdlib.h>#include<string.h>int listen_sock = -1;int epoll_fd = -1;//设置非阻塞int set_noblock(int sock){    int opts = fcntl(sock,F_GETFL);    return fcntl(sock,F_SETFL,opts | O_NONBLOCK);}int creat_socket(int port){    int sock = socket(AF_INET,SOCK_STREAM,0);    if(sock < 0){        perror("socket");        exit(2);    }    //调用setsockopt使当server先断开时避免进入 TIME_WAIT 状态,\        将其属性设定为SO_REUSEADDR,使其地址信息可被重用    int opt = 1;    if(setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt)) < 0){        perror("setsockopt");        exit(3);    }    struct sockaddr_in local;    local.sin_family = AF_INET;    local.sin_port = htons(port);    local.sin_addr.s_addr = INADDR_ANY; //inet_addr("0.0.0.0");    if( bind(sock,(struct sockaddr*)&local,sizeof(local)) < 0 ){        perror("bind");        exit(4);    }    if(listen(sock,5) < 0){        perror("listen");        exit(5);    }    printf("listen port %d..\n",port);    return sock;}int accept_socket(){    struct sockaddr_in remote;    socklen_t len = sizeof(remote);    int accept_sock = accept(listen_sock, (struct sockaddr *)&remote, &len);    if (accept_sock < 0)    {        perror("accept");        return -1;    }    printf("accept a client..[ip]: %s,[port]: %d\n", inet_ntoa(remote.sin_addr), ntohs(remote.sin_port));    //将新的事件添加到epoll集合中    struct epoll_event ep_ev;    ep_ev.events = EPOLLIN | EPOLLET; // edge边沿触发,只触发一次    ep_ev.data.fd = accept_sock;    set_noblock(accept_sock);    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, accept_sock, &ep_ev) < 0)    {        perror("epoll_ctl");        close(accept_sock);        return -1;    }    return 0;}int handle_request(int socketFd){    //申请空间同时存文件描述符和缓冲区地址    char buf[102400];    memset(buf, '\0', sizeof(buf));    ssize_t _s = recv(socketFd, buf, sizeof(buf) - 1, 0);    if (_s < 0)    {        perror("recv");        return -1;    }    else if (_s == 0)    {        printf("remote close..\n");        //远端关闭了,进行善后        epoll_ctl(epoll_fd, EPOLL_CTL_DEL, socketFd, NULL);        close(socketFd);    }    else    {        //读取成功,输出数据        printf("client# %s", buf);        fflush(stdout);        //将事件改写为关心事件,进行回写        struct epoll_event ep_ev;        ep_ev.data.fd = socketFd;        ep_ev.events = EPOLLOUT | EPOLLET;        //在epoll实例中更改同一个事件,触发socket可写事件        epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socketFd, &ep_ev);    }    return 0;}int handle_response(int socketFd){    const char *msg = "HTTP/1.1 200 OK \r\n\r\n<h1> hi girl </h1>\r\n";    send(socketFd, msg, strlen(msg), 0);    epoll_ctl(epoll_fd, EPOLL_CTL_DEL, socketFd, NULL);    close(socketFd);}int main(int argc,char *argv[]){    int default_port = 8000;    int optch = 0;    while ((optch = getopt(argc, argv, "s:p:")) != -1)    {        switch (optch)        {        case 'p':            default_port = atoi(optarg);            printf("port: %s\n", optarg);            break;        case '?':            printf("Unknown option: %c\n", (char)optopt);            break;        default:            break;        }    }    listen_sock = creat_socket(default_port);    epoll_fd = epoll_create(256);    if(epoll_fd < 0){        perror("epoll creat");        exit(6);    }    struct epoll_event ep_ev;    ep_ev.events = EPOLLIN;         //数据的读取    ep_ev.data.fd = listen_sock;    //添加关心的事件    if(epoll_ctl(epoll_fd,EPOLL_CTL_ADD,listen_sock,&ep_ev) < 0){        perror("epoll_ctl");        exit(7);    }    struct epoll_event ready_ev[128];   //申请空间来放就绪的事件。    int maxnum = 128;    int timeout = -1;                   //设置超时时间,若为-1,则永久阻塞等待。    int ret = 0;        int done = 0;    while(!done){        switch(ret = epoll_wait(epoll_fd,ready_ev,maxnum,timeout)){            case -1:                perror("epoll_wait");                break;            case 0:                printf("time out...\n");                break;            default://至少有一个事件就绪            {                int i = 0;                for(;i < ret;++i){                    //判断是否为监听套接字,是的话 accept                    int fd = ready_ev[i].data.fd;                     if((fd == listen_sock) && (ready_ev[i].events & EPOLLIN)){                        accept_socket();                    }                    else{//普通IO                        if(ready_ev[i].events & EPOLLIN){                            handle_request(fd);                        }else if(ready_ev[i].events & EPOLLOUT){                            handle_response(fd);                        }                    }                }            }                break;        }    }    close(listen_sock);    return 0;}

makefile

TARGET=serverSRC = $(wildcard *.cpp *.c)OBJ = $(patsubst %.cpp *.c,%.o,$(SRC))DEFS =CFLAGS = -gCC =g++LIBS =  -lpthread$(TARGET):$(OBJ)$(CC) $(CFLAGS) $(DEFS) -o $@ $^ $(LIBS).PHONY:clean:rm -rf *.o $(TARGET)

Build

make

Usage

ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/epoll$ ./serverlisten port 8000

浏览器输入:http://服务器ip:8000/例如,http://49.234.35.128:8000/

免责声明:本网信息来自于互联网,目的在于传递更多信息,并不代表本网赞同其观点。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,并请自行核实相关内容。本站不承担此类作品侵权行为的直接责任及连带责任。如若本网有任何内容侵犯您的权益,请及时联系我们,本站将会在24小时内处理完毕。
相关文章
返回顶部