一. 简介 针对异步IO的简介。翻译自libevent官网
二. 阻塞IO 大多数初级程序员都是从使用阻塞IO开始的。阻塞IO意味着,当你进行调用时,他只有在彻底完成调用、或者经过了一段超时时间而被网络协议栈放弃执行后,才会将控制权再次交给用户。例如在建立TCP连接时,执行connect()
,操作系统将向对端发送一个SYN包,如果操作系统没有收到SYN ACK,或者没有达到超时时间,操作系统不会将控制权交换给应用程序。
这里是一个简易的客户端程序,他将向www.zrfyun.top
建立一个连接,发送一个简单的HTTP请求,并把服务器的响应输出到stdout。
下载代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 #include <netinet/in.h> #include <sys/socket.h> #include <netdb.h> #include <unistd.h> #include <string.h> #include <stdio.h> int main (int c, char **v) { const char query[] = "GET / HTTP/1.0\r\n" "Host: www.zrfyun.top\r\n" "\r\n" ; const char hostname[] = "www.zrfyun.top" ; struct sockaddr_in sin ; struct hostent *h ; const char *cp; int fd; ssize_t n_written, remaining; char buf[1024 ]; h = gethostbyname(hostname); if (!h) { fprintf (stderr , "Couldn't lookup %s: %s" , hostname, hstrerror(h_errno)); return 1 ; } if (h->h_addrtype != AF_INET) { fprintf (stderr , "No ipv6 support, sorry." ); return 1 ; } fd = socket(AF_INET, SOCK_STREAM, 0 ); if (fd < 0 ) { perror("socket" ); return 1 ; } sin .sin_family = AF_INET; sin .sin_port = htons(80 ); sin .sin_addr = *(struct in_addr*)h->h_addr; if (connect(fd, (struct sockaddr*) &sin , sizeof (sin ))) { perror("connect" ); close(fd); return 1 ; } cp = query; remaining = strlen (query); while (remaining) { n_written = send(fd, cp, remaining, 0 ); if (n_written <= 0 ) { perror("send" ); return 1 ; } remaining -= n_written; cp += n_written; } while (1 ) { ssize_t result = recv(fd, buf, sizeof (buf), 0 ); if (result == 0 ) { break ; } else if (result < 0 ) { perror("recv" ); close(fd); return 1 ; } fwrite(buf, 1 , result, stdout ); } close(fd); return 0 ; }
以上的所有的网络相关的函数都是阻塞的:
gethostbyname()
: 成功或失败地对www.zrfyun.top
进行解析后才会进行返回
connect()
: 只有建立TCP或者超时后才会返回
recv()
: 只有确实受到消息或者连接被关闭后才会返回
send()
: 只有把用户空间缓冲区中的所有数据刷新到内核的写缓冲区中后才会返回
在此时,如果我们并不指望同时一间程序执行其他任务,阻塞IO并没有造成什么困扰。但假设我们想写一个同时处理多条连接的程序(假设我们希望同时处理两条连接,并且不知道那条连接首先有输入)。
我们不能这样写:
1 2 3 4 5 6 7 8 9 10 11 12 13 char buf[1024 ];int i, n;while (i_still_want_to_read()) { for (i=0 ; i<n_sockets; ++i) { n = recv(fd[i], buf, sizeof (buf), 0 ); if (n==0 ) handle_close(fd[i]); else if (n<0 ) handle_error(fd[i], errno); else handle_input(fd[i], buf, n); } }
因为如果fd[2]首先收到数据,我们的代码在没有从fd[0]和fd[1]中哦如果数据后,是无法对fd[2]进行读取的。
有人使用多线程或多进程来解决这个问题。一个简单的处理就是一个线程(进程)只处理一条连接。由于每条连接都有自己的进程,一个针对某条连接的阻塞IO并不会对其他连接的阻塞IO造成影响。
下面的测试程序,它是一个简单的服务器,侦听端口40713上的TCP连接,一次从其输入读取一行数据,并在每行到达时写出每行的ROT13替换。它使用Unix fork()
调用为每个传入连接创建一个新进程。
下载代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 #include <netinet/in.h> #include <sys/socket.h> #include <unistd.h> #include <string.h> #include <stdio.h> #include <stdlib.h> #define MAX_LINE 16384 char rot13_char (char c) { if ((c >= 'a' && c <= 'm' ) || (c >= 'A' && c <= 'M' )) return c + 13 ; else if ((c >= 'n' && c <= 'z' ) || (c >= 'N' && c <= 'Z' )) return c - 13 ; else return c; } void child (int fd) { char outbuf[MAX_LINE+1 ]; size_t outbuf_used = 0 ; ssize_t result; while (1 ) { char ch; result = recv(fd, &ch, 1 , 0 ); if (result == 0 ) { break ; } else if (result == -1 ) { perror("read" ); break ; } if (outbuf_used < sizeof (outbuf)) { outbuf[outbuf_used++] = rot13_char(ch); } if (ch == '\n' ) { send(fd, outbuf, outbuf_used, 0 ); outbuf_used = 0 ; continue ; } } } void run (void ) { int listener; struct sockaddr_in sin ; sin .sin_family = AF_INET; sin .sin_addr.s_addr = 0 ; sin .sin_port = htons(40713 ); listener = socket(AF_INET, SOCK_STREAM, 0 ); #ifndef WIN32 { int one = 1 ; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof (one)); } #endif if (bind(listener, (struct sockaddr*)&sin , sizeof (sin )) < 0 ) { perror("bind" ); return ; } if (listen(listener, 16 )<0 ) { perror("listen" ); return ; } while (1 ) { struct sockaddr_storage ss ; socklen_t slen = sizeof (ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if (fd < 0 ) { perror("accept" ); } else { if (fork() == 0 ) { child(fd); exit (0 ); } } } } int main (int c, char **v) { run(); return 0 ; }
我们是否有更好的办法来解决同时处理多条连接
的问题呢?多线程多进程在某些平台上是很消耗资源的。实际中,我们更愿意使用线程池而非直接创建一个线程。但是线程数量无法所以扩充。如果需要同时处理几千万的连接,多线程也无能为力。
三. 非阻塞IO 至此,我们需要让非阻塞IO登台了。在UNix中,可以对socket使用如下操作使得这个文件描述符变为非阻塞状态:
1 2 3 #include <fcntl.h> int fcntl (int fd, F_SETFL, O_NONBLOCK) ;
一旦使得fd变为非阻塞,那么针对这个fd的任何网络系统调用,不论是否完成、都将立即返回,与此同时还会返回一个error code,以供我们判断系统调用返回时的状态。
所以刚刚的例子似乎可以天真地被写为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 int i, n;char buf[1024 ];for (i=0 ; i < n_sockets; ++i) fcntl(fd[i], F_SETFL, O_NONBLOCK); while (i_still_want_to_read()) { for (i=0 ; i < n_sockets; ++i) { n = recv(fd[i], buf, sizeof (buf), 0 ); if (n == 0 ) { handle_close(fd[i]); } else if (n < 0 ) { if (errno == EAGAIN) ; else handle_error(fd[i], errno);仅包含可供使用的套接字 } else { handle_input(fd[i], buf, n); } } }
以上的代码可以被运行,但是性能很差。有两点原因:
当所有连接都没有数据可读时,循环依旧一直在运行,浪费CPU。
如果处理多条连接,无论是否有数据到达,我们需要为每条连接都进行系统调用。
所以我们需要让内核做以下动作:只有当我们关注的socket有读写事件发生时,再把控制权交还用户,同时告诉用户是哪个socket是准备就绪的。
最古老的解决方案是select()
。他拥有读、写、异常三个fd集合。它会等待,直到其中一个集合中的套接字准备就绪,然后将集合更改为仅包含可供使用的套接字。
这是一个例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 fd_set readset; int i, n;char buf[1024 ];while (i_still_want_to_read()) { int maxfd = -1 ; FD_ZERO(&readset); for (i=0 ; i < n_sockets; ++i) { if (fd[i]>maxfd) maxfd = fd[i]; FD_SET(fd[i], &readset); } select(maxfd+1 , &readset, NULL , NULL , NULL ); for (i=0 ; i < n_sockets; ++i) { if (FD_ISSET(fd[i], &readset)) { n = recv(fd[i], buf, sizeof (buf), 0 ); if (n == 0 ) { handle_close(fd[i]); } else if (n < 0 ) { if (errno == EAGAIN) ; else handle_error(fd[i], errno); } else { handle_input(fd[i], buf, n); } } } }
以下是重构的ROT13服务器,不再使用多进程处理多连接,使用select()
:
下载代码
include <netinet/in.h> #include <sys/socket.h> #include <fcntl.h> #include <sys/select.h> #include <assert.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #define MAX_LINE 16384 char rot13_char (char c) { if ((c >= 'a' && c <= 'm' ) || (c >= 'A' && c <= 'M' )) return c + 13 ; else if ((c >= 'n' && c <= 'z' ) || (c >= 'N' && c <= 'Z' )) return c - 13 ; else return c; } struct fd_state { char buffer[MAX_LINE]; size_t buffer_used; int writing; size_t n_written; size_t write_upto; }; struct fd_state *alloc_fd_state (void ) { struct fd_state *state = malloc (sizeof (struct fd_state)); if (!state) return NULL ; state->buffer_used = state->n_written = state->writing = state->write_upto = 0 ; return state; } void free_fd_state (struct fd_state *state) { free (state); } void make_nonblocking (int fd) { fcntl(fd, F_SETFL, O_NONBLOCK); } int do_read (int fd, struct fd_state *state) { char buf[1024 ]; int i; ssize_t result; while (1 ) { result = recv(fd, buf, sizeof (buf), 0 ); if (result <= 0 ) break ; for (i=0 ; i < result; ++i) { if (state->buffer_used < sizeof (state->buffer)) state->buffer[state->buffer_used++] = rot13_char(buf[i]); if (buf[i] == '\n' ) { state->writing = 1 ; state->write_upto = state->buffer_used; } } } if (result == 0 ) { return 1 ; } else if (result < 0 ) { if (errno == EAGAIN) return 0 ; return -1 ; } return 0 ; } int do_write (int fd, struct fd_state *state) { while (state->n_written < state->write_upto) { ssize_t result = send(fd, state->buffer + state->n_written, state->write_upto - state->n_written, 0 ); if (result < 0 ) { if (errno == EAGAIN) return 0 ; return -1 ; } assert(result != 0 ); state->n_written += result; } if (state->n_written == state->buffer_used) state->n_written = state->write_upto = state->buffer_used = 0 ; state->writing = 0 ; return 0 ; } void run (void ) { int listener; struct fd_state *state [FD_SETSIZE ]; struct sockaddr_in sin ; int i, maxfd; fd_set readset, writeset, exset; sin .sin_family = AF_INET; sin .sin_addr.s_addr = 0 ; sin .sin_port = htons(40713 ); for (i = 0 ; i < FD_SETSIZE; ++i) state[i] = NULL ; listener = socket(AF_INET, SOCK_STREAM, 0 ); make_nonblocking(listener); #ifndef WIN32 { int one = 1 ; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof (one)); } #endif if (bind(listener, (struct sockaddr*)&sin , sizeof (sin )) < 0 ) { perror("bind" ); return ; } if (listen(listener, 16 )<0 ) { perror("listen" ); return ; } FD_ZERO(&readset); FD_ZERO(&writeset); FD_ZERO(&exset); while (1 ) { maxfd = listener; FD_ZERO(&readset); FD_ZERO(&writeset); FD_ZERO(&exset); FD_SET(listener, &readset); for (i=0 ; i < FD_SETSIZE; ++i) { if (state[i]) { if (i > maxfd) maxfd = i; FD_SET(i, &readset); if (state[i]->writing) { FD_SET(i, &writeset); } } } if (select(maxfd+1 , &readset, &writeset, &exset, NULL ) < 0 ) { perror("select" ); return ; } if (FD_ISSET(listener, &readset)) { struct sockaddr_storage ss ; socklen_t slen = sizeof (ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if (fd < 0 ) { perror("accept" ); } else if (fd > FD_SETSIZE) { close(fd); } else { make_nonblocking(fd); state[fd] = alloc_fd_state(); assert(state[fd]); } } for (i=0 ; i < maxfd+1 ; ++i) { int r = 0 ; if (i == listener) continue ; if (FD_ISSET(i, &readset)) { r = do_read(i, state[i]); } if (r == 0 && FD_ISSET(i, &writeset)) { r = do_write(i, state[i]); } if (r) { free_fd_state(state[i]); state[i] = NULL ; close(i); } } } } int main (int c, char **v) { setvbuf(stdout , NULL , _IONBF, 0 ); run(); return 0 ; }
但是这并没有结束,由于生成和读取select()
位数组所花费的时间与为select()
提供的最大fd成正比,因此当套接字数量较多时,select()
调用会急剧增加。
四. libevent 不同的操作系统提供一些select()
的代替方法。例如epoll()
,kqueue()
等。
如果想编写一个可移植的高性能异步应用程序,将需要一个抽象来包装所有这些接口,并提供其中最有效的一个。这就是Libevent
的底层API所做的事情。
下载代码
include <netinet/in.h> #include <sys/socket.h> #include <fcntl.h> #include <event2/event.h> #include <assert.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #define MAX_LINE 16384 void do_read (evutil_socket_t fd, short events, void *arg) ;void do_write (evutil_socket_t fd, short events, void *arg) ;char rot13_char (char c) { if ((c >= 'a' && c <= 'm' ) || (c >= 'A' && c <= 'M' )) return c + 13 ; else if ((c >= 'n' && c <= 'z' ) || (c >= 'N' && c <= 'Z' )) return c - 13 ; else return c; } struct fd_state { char buffer[MAX_LINE]; size_t buffer_used; size_t n_written; size_t write_upto; struct event *read_event ; struct event *write_event ; }; struct fd_state *alloc_fd_state (struct event_base *base, evutil_socket_t fd) { struct fd_state *state = malloc (sizeof (struct fd_state)); if (!state) return NULL ; state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state); if (!state->read_event) { free (state); return NULL ; } state->write_event = event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state); if (!state->write_event) { event_free(state->read_event); free (state); return NULL ; } state->buffer_used = state->n_written = state->write_upto = 0 ; assert(state->write_event); return state; } void free_fd_state (struct fd_state *state) { event_free(state->read_event); event_free(state->write_event); free (state); } void do_read (evutil_socket_t fd, short events, void *arg) { struct fd_state *state = arg; char buf[1024 ]; int i; ssize_t result; while (1 ) { assert(state->write_event); result = recv(fd, buf, sizeof (buf), 0 ); if (result <= 0 ) break ; for (i=0 ; i < result; ++i) { if (state->buffer_used < sizeof (state->buffer)) state->buffer[state->buffer_used++] = rot13_char(buf[i]); if (buf[i] == '\n' ) { assert(state->write_event); event_add(state->write_event, NULL ); state->write_upto = state->buffer_used; } } } if (result == 0 ) { free_fd_state(state); } else if (result < 0 ) { if (errno == EAGAIN) return ; perror("recv" ); free_fd_state(state); } } void do_write (evutil_socket_t fd, short events, void *arg) { struct fd_state *state = arg; while (state->n_written < state->write_upto) { ssize_t result = send(fd, state->buffer + state->n_written, state->write_upto - state->n_written, 0 ); if (result < 0 ) { if (errno == EAGAIN) return ; free_fd_state(state); return ; } assert(result != 0 ); state->n_written += result; } if (state->n_written == state->buffer_used) state->n_written = state->write_upto = state->buffer_used = 1 ; event_del(state->write_event); } void do_accept (evutil_socket_t listener, short event, void *arg) { struct event_base *base = arg; struct sockaddr_storage ss ; socklen_t slen = sizeof (ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if (fd < 0 ) { perror("accept" ); } else if (fd > FD_SETSIZE) { close(fd); } else { struct fd_state *state; evutil_make_socket_nonblocking(fd); state = alloc_fd_state(base, fd); assert(state); assert(state->write_event); event_add(state->read_event, NULL ); } } void run (void ) { evutil_socket_t listener; struct sockaddr_in sin ; struct event_base *base ; struct event *listener_event ; base = event_base_new(); if (!base) return ; sin .sin_family = AF_INET; sin .sin_addr.s_addr = 0 ; sin .sin_port = htons(40713 ); listener = socket(AF_INET, SOCK_STREAM, 0 ); evutil_make_socket_nonblocking(listener); #ifndef WIN32 { int one = 1 ; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof (one)); } #endif if (bind(listener, (struct sockaddr*)&sin , sizeof (sin )) < 0 ) { perror("bind" ); return ; } if (listen(listener, 16 )<0 ) { perror("listen" ); return ; } listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void *)base); event_add(listener_event, NULL ); event_base_dispatch(base); } int main (int c, char **v) { setvbuf(stdout , NULL , _IONBF, 0 ); run(); return 0 ; }
可能已经注意到,随着代码变得更加高效,它也变得更加复杂。bufferevents
接口解决了这两个问题:它使程序编写起来更加简单,这是我们最后一次使用bufferevents API
的ROT13服务器。
下载代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 #include <netinet/in.h> #include <sys/socket.h> #include <fcntl.h> #include <event2/event.h> #include <event2/buffer.h> #include <event2/bufferevent.h> #include <assert.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #define MAX_LINE 16384 void do_read (evutil_socket_t fd, short events, void *arg) ;void do_write (evutil_socket_t fd, short events, void *arg) ;char rot13_char (char c) { if ((c >= 'a' && c <= 'm' ) || (c >= 'A' && c <= 'M' )) return c + 13 ; else if ((c >= 'n' && c <= 'z' ) || (c >= 'N' && c <= 'Z' )) return c - 13 ; else return c; } void readcb (struct bufferevent *bev, void *ctx) { struct evbuffer *input , *output ; char *line; size_t n; int i; input = bufferevent_get_input(bev); output = bufferevent_get_output(bev); while ((line = evbuffer_readln(input, &n, EVBUFFER_EOL_LF))) { for (i = 0 ; i < n; ++i) line[i] = rot13_char(line[i]); evbuffer_add(output, line, n); evbuffer_add(output, "\n" , 1 ); free (line); } if (evbuffer_get_length(input) >= MAX_LINE) { char buf[1024 ]; while (evbuffer_get_length(input)) { int n = evbuffer_remove(input, buf, sizeof (buf)); for (i = 0 ; i < n; ++i) buf[i] = rot13_char(buf[i]); evbuffer_add(output, buf, n); } evbuffer_add(output, "\n" , 1 ); } } void errorcb (struct bufferevent *bev, short error, void *ctx) { if (error & BEV_EVENT_EOF) { } else if (error & BEV_EVENT_ERROR) { } else if (error & BEV_EVENT_TIMEOUT) { } bufferevent_free(bev); } void do_accept (evutil_socket_t listener, short event, void *arg) { struct event_base *base = arg; struct sockaddr_storage ss ; socklen_t slen = sizeof (ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if (fd < 0 ) { perror("accept" ); } else if (fd > FD_SETSIZE) { close(fd); } else { struct bufferevent *bev; evutil_make_socket_nonblocking(fd); bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); bufferevent_setcb(bev, readcb, NULL , errorcb, NULL ); bufferevent_setwatermark(bev, EV_READ, 0 , MAX_LINE); bufferevent_enable(bev, EV_READ|EV_WRITE); } } void run (void ) { evutil_socket_t listener; struct sockaddr_in sin ; struct event_base *base ; struct event *listener_event ; base = event_base_new(); if (!base) return ; sin .sin_family = AF_INET; sin .sin_addr.s_addr = 0 ; sin .sin_port = htons(40713 ); listener = socket(AF_INET, SOCK_STREAM, 0 ); evutil_make_socket_nonblocking(listener); #ifndef WIN32 { int one = 1 ; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof (one)); } #endif if (bind(listener, (struct sockaddr*)&sin , sizeof (sin )) < 0 ) { perror("bind" ); return ; } if (listen(listener, 16 )<0 ) { perror("listen" ); return ; } listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void *)base); event_add(listener_event, NULL ); event_base_dispatch(base); } int main (int c, char **v) { setvbuf(stdout , NULL , _IONBF, 0 ); run(); return 0 ; }