Unix网络篇(4)与epoll类似的kqueue模型——Unix-like系统

poll解决了fd上限问题,也将“入参”和“出参”区分了开来。但是,对于大规模并发系统poll的全拷贝操作,极大的影响了性能。为了解决该问题kqueue模型被引进来了。对于epoll模型的介绍请参考《Linux网络篇:独有的I/O多路复用模型epoll(适用于大规模fd)》

说明: 本系列是Unix网络篇。所涉及到的内容适合于Unix-like(包括Mac OS X)。epoll模型几乎与kqueue模型是等价的,非要说区别,那就是kqueue模型是Unix-like系统的,而epollLinux系统所独有的。epoll它是在Linux 2.6之后被引入的。

1 服务端

kqueue模型是为了解决poll全拷贝带来的性能瓶颈问题。

1.1 poll全拷贝

我们再回顾一下前几篇文章selectpoll所讲的内容。这两种模型,都是监听一个fd数组,然后通过循环来判断到底是哪些fd被触发了。这里有两个问题:

其一,需要将全部“入参”拷贝到“出参”也就是————全拷贝;

其二,对于“出参”需要使用循环来判断这些fd哪些被触发了。

1.2 kqueue事件监听

对于上百万的fdpoll全拷贝带来的性能问题是极大的。不管是拷贝,还是遍历,都很大程度上影响了系统吞吐量。为了解决该问题,kqueue采用事件监听模式,它只关心活着的fd,也就是“出参”全部是被触发了的fd

这样了,避免了大规模拷贝fd,也避免了对fd是否被触发的判断。

1.3 kqueue接口的使用

一般说来,三个操作就可以搞定kqueue接口(注意区分kqueuekevent),首先看一下kevent函数的声明:

int kevent(int kq, const struct kevent *changelist, int nchanges, struct kevent *eventlist, int nevents, const struct timespec *timeout)

kevent是真正干活的函数,它就是对poll的改进,这些参数也很好理解:

  1. int kq, 它是一个kqueue的句柄,由int kq = kqueue();返回;
  2. struct kevent *changelist, (入参)注册监听的事件列表,可以一次注册一系列监听事件;
  3. int nchanges, (入参)用来说明上一个参数的个数,如果*changelist中包含N个事件,那这里就填N
  4. struct kevent *eventlist,(出参)当有信号到来时,会从这里抛出来;
  5. int nevents,(出参)说明上一个参数的大小,也就是要监听的信号个数;
  6. const struct timespec *timeout, 设置超时。它是纳秒级别的。

1.3.1 注册监听事件

注册事件时,请将出参设置为空。同时,最好将timeout全部这是为0,也就是无阻塞。

int kevent(int kq, const struct kevent *changelist, int nchanges, NULL, 0, const struct timespec *timeout)

举个例子:

EV_SET(&ev, listenfd, EVFILT_READ, EV_ADD, 0, 0, NULL);
kq = kqueue();
ts.tv_sec = ts.tv_nsec = 0;
//1、注册所关注的事件
kevent(kq, &ev, 1, NULL, 0, &ts); //&ts=0,非阻塞

这里又涉及到一个宏EV_SET,它既是用来初始化struct kevent ev;结构体的。EV_SET中,除了第一个参数,后面六个参数分别对应以下结构体中的六个成员变量。

struct kevent {
uintptr_t ident; /* identifier for this event */
int16_t filter; /* filter for event */
uint16_t flags; /* general flags */
uint32_t fflags; /* filter-specific flags */
intptr_t data; /* filter-specific data */
void *udata; /* opaque user data identifier */
};

1.3.2 开始监听

这时候,应该将入参置空,只要接受出参即可。一般可将超时时间设置为NULL,也就是阻塞等待。

int kevent(int kq, NULL, 0, struct kevent *eventlist, int nevents, const struct timespec *timeout)

举个例子:

//2、确定是否有所关注的时间发生
nev = kevent(kq, NULL, 0, kev, nevents, NULL); //timeout=NULL表示阻塞

这里需要特别注意返回值nev,它是事件发生的个数。同时,kevent函数,会将被触发的事件拷贝到“出参”struct kevent kev[OPEN_MAX]中。(相对poll来说)这是一个很大的改进,kqueue模型它不会拷贝未被触发的事件。

从而,避免了大规模的拷贝,以及后续的判断fd是否被触发的操作。kqueue模型,返回的都是被触发了的。

1.3.3 删除监听事件

恰与注册监听事件极为相似。只需要将flags位设置为EV_DELETE即可(原来是EV_ADD)。 举个例子:

EV_SET(&ev, sockfd, 0, EV_DELETE, 0, 0, NULL); //DELETE    
kevent(kq, &ev, 1, NULL, 0, &ts);

1.4 以下是kqueue_server.c源代码

#include <sys/socket.h> 
#include <netinet/in.h> //sockaddr_in
#include <unistd.h> //fork,read,write
#include <stdlib.h> //exit(0)
#include <errno.h>
#include <string.h> //bzero
#include <sys/event.h> //kevent
#include <limits.h> //OPEN_MAX
#include <stdio.h>
#include <sys/stat.h> //fstat

const int SERV_PORT = 9527;
const int LISTENQ = 5;
const int MAXBUFSIZE = 1024;

int main(int argc, char* argv[]) {
int kq, i, nev, n, nevents, listenfd, connfd, sockfd;
char buf[MAXBUFSIZE];
socklen_t clilen;
struct sockaddr_in cliaddr, servaddr;
struct kevent ev, kev[OPEN_MAX];//#define OPEN_MAX 10240 (Mac OS X)
struct timespec ts;
struct stat st;

listenfd = socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);

if ( bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0 )
exit_msg("bind error");

listen(listenfd, LISTENQ);

EV_SET(&ev, listenfd, EVFILT_READ, EV_ADD, 0, 0, NULL);
kq = kqueue();
ts.tv_sec = ts.tv_nsec = 0;
//1、注册所关注的事件
kevent(kq, &ev, 1, NULL, 0, &ts); //&ts=0,非阻塞
nevents = 1;
for ( ; ; ) {
//2、确定是否有所关注的事件发生
nev = kevent(kq, NULL, 0, kev, nevents, NULL); //timeout=NULL表示阻塞
for (i = 0; i < nev; ++i) {
if (kev[i].ident == listenfd && nevents < OPEN_MAX) {
clilen = sizeof(cliaddr);
connfd = accept(listenfd, (struct sockaddr*)&cliaddr, &clilen);
EV_SET(&ev, connfd, EVFILT_READ, EV_ADD, 0, 0, NULL);
kevent(kq, &ev, 1, NULL, 0, &ts);
++nevents;
}
else {
sockfd = kev[i].ident;
if ( (n = read(sockfd, buf, MAXBUFSIZE)) < 0 ) {
if (errno == ECONNRESET) {
close(sockfd);
EV_SET(&ev, sockfd, 0, EV_DELETE, 0, 0, NULL); //DELETE
kevent(kq, &ev, 1, NULL, 0, &ts);
--nevents;
}
else {
exit_msg("read error");
}
}
else if (n == 0) {
close(sockfd);
EV_SET(&ev, sockfd, 0, EV_DELETE, 0, 0, NULL); //DELETE
kevent(kq, &ev, 1, NULL, 0, &ts);
--nevents;
}
else {
writen(sockfd, buf, n);//DIY: writen = write + while(见《Unix网络篇(1)一个典型的TCP Socket通信例子》)
}
}
}
}

return 0;
}

2 客户端

服务端将需要讲的内容基本上讲完了,客户端使用的是apue书上的例子,str_cli函数实现了kqueue接口。由于客户端将终端关联起来了,所以做了一些额外的操作。

以下是客户端kqueue_client.c源代码:

#include <sys/socket.h> 
#include <netinet/in.h> //sockaddr_in
#include <arpa/inet.h> //inet_pon
#include <unistd.h> //fork,read,write
#include <stdlib.h> //exit(0)
#include <errno.h>
#include <sys/types.h>
#include <sys/event.h> //kqueue
#include <sys/time.h>
#include <string.h> //bzero
#include <stdio.h>
#include <sys/stat.h> //fstat

const int SERV_PORT = 9527;
const int LISTENQ = 5;
const int MAXBUFSIZE = 1024;

void str_cli(FILE* fp, int sockfd) {
int kq, i, n, nev, stdineof=0, isfile;
char buf[MAXBUFSIZE];
struct kevent kev[2];
struct timespec ts;
struct stat st;

isfile = ((fstat(fileno(fp), &st) == 0) && (st.st_mode & S_IFMT) == S_IFREG);

EV_SET(&kev[0], fileno(fp), EVFILT_READ, EV_ADD, 0, 0, NULL);
EV_SET(&kev[1], sockfd, EVFILT_READ, EV_ADD, 0, 0, NULL);

kq = kqueue();
ts.tv_sec = ts.tv_nsec = 0;
//1、注册所关注的事件
kevent(kq, kev, 2, NULL, 0, &ts); //&ts=0,非阻塞

for ( ; ; ) {
//2、确定是否有所关注的事件发生
nev = kevent(kq, NULL, 0, kev/*out*/, 2, NULL); //timeout=NULL表示阻塞
for (i = 0; i < nev; ++i) {
if (kev[i].ident == sockfd) {
if ( (n = read(sockfd, buf, MAXBUFSIZE)) == 0 ) {
if (stdineof == 1)
return;
else
exit_msg("str_cli:server terminated prematurely");
}
write(fileno(stdout), buf, n); //stdout使用write即可,不必writen
}
if (kev[i].ident == fileno(fp)) {
n = read(fileno(fp), buf, MAXBUFSIZE);
if (n > 0)
writen(sockfd, buf, n);
if (n == 0 || (isfile && n == kev[i].data)) {
stdineof = 1;
shutdown(sockfd, SHUT_WR);
//3、注册kevent,DELETE事件
kev[i].flags = EV_DELETE;
kevent(kq, &kev[i], 1, NULL, 0, &ts);//(&kev[i], 1)合起来,表示的也就是key[i]本身
}
}
}
}
}

int main(int argc, char* argv[]) {
int sockfd;
struct sockaddr_in servaddr;

if (argc != 2)
exit_msg("argv != 2");

sockfd = socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
inet_pton(AF_INET, argv[1], &servaddr.sin_addr);

if (connect(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
exit_msg("conn err");//警告: connect请勿捕获EINTR

str_cli(stdin, sockfd);

return 0;
}

References: [1] W.Richard Stevens,《Unix网络编程 卷1:套接字联网API(第3版)》