一. 简介

针对异步IO的简介。翻译自libevent官网

二. 阻塞IO

大多数初级程序员都是从使用阻塞IO开始的。阻塞IO意味着,当你进行调用时,他只有在彻底完成调用、或者经过了一段超时时间而被网络协议栈放弃执行后,才会将控制权再次交给用户。例如在建立TCP连接时,执行connect(),操作系统将向对端发送一个SYN包,如果操作系统没有收到SYN ACK,或者没有达到超时时间,操作系统不会将控制权交换给应用程序。

这里是一个简易的客户端程序,他将向www.zrfyun.top建立一个连接,发送一个简单的HTTP请求,并把服务器的响应输出到stdout。

下载代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For gethostbyname */
#include <netdb.h>

#include <unistd.h>
#include <string.h>
#include <stdio.h>

int main(int c, char **v)
{
const char query[] =
"GET / HTTP/1.0\r\n"
"Host: www.zrfyun.top\r\n"
"\r\n";
const char hostname[] = "www.zrfyun.top";
struct sockaddr_in sin;
struct hostent *h;
const char *cp;
int fd;
ssize_t n_written, remaining;
char buf[1024];

/* Look up the IP address for the hostname. Watch out; this isn't
threadsafe on most platforms. */
h = gethostbyname(hostname);
if (!h) {
fprintf(stderr, "Couldn't lookup %s: %s", hostname, hstrerror(h_errno));
return 1;
}
if (h->h_addrtype != AF_INET) {
fprintf(stderr, "No ipv6 support, sorry.");
return 1;
}

/* Allocate a new socket */
fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
perror("socket");
return 1;
}

/* Connect to the remote host. */
sin.sin_family = AF_INET;
sin.sin_port = htons(80);
sin.sin_addr = *(struct in_addr*)h->h_addr;
if (connect(fd, (struct sockaddr*) &sin, sizeof(sin))) {
perror("connect");
close(fd);
return 1;
}

/* Write the query. */
/* XXX Can send succeed partially? */
cp = query;
remaining = strlen(query);
while (remaining) {
n_written = send(fd, cp, remaining, 0);
if (n_written <= 0) {
perror("send");
return 1;
}
remaining -= n_written;
cp += n_written;
}

/* Get an answer back. */
while (1) {
ssize_t result = recv(fd, buf, sizeof(buf), 0);
if (result == 0) {
break;
} else if (result < 0) {
perror("recv");
close(fd);
return 1;
}
fwrite(buf, 1, result, stdout);
}

close(fd);
return 0;
}

以上的所有的网络相关的函数都是阻塞的:

  • gethostbyname()
    成功或失败地对www.zrfyun.top进行解析后才会进行返回
  • connect()
    只有建立TCP或者超时后才会返回
  • recv()
    只有确实受到消息或者连接被关闭后才会返回
  • send()
    只有把用户空间缓冲区中的所有数据刷新到内核的写缓冲区中后才会返回

在此时,如果我们并不指望同时一间程序执行其他任务,阻塞IO并没有造成什么困扰。但假设我们想写一个同时处理多条连接的程序(假设我们希望同时处理两条连接,并且不知道那条连接首先有输入)。

我们不能这样写:

1
2
3
4
5
6
7
8
9
10
11
12
13
char buf[1024];
int i, n;
while (i_still_want_to_read()) {
for (i=0; i<n_sockets; ++i) {
n = recv(fd[i], buf, sizeof(buf), 0);
if (n==0)
handle_close(fd[i]);
else if (n<0)
handle_error(fd[i], errno);
else
handle_input(fd[i], buf, n);
}
}

因为如果fd[2]首先收到数据,我们的代码在没有从fd[0]和fd[1]中哦如果数据后,是无法对fd[2]进行读取的。

有人使用多线程或多进程来解决这个问题。一个简单的处理就是一个线程(进程)只处理一条连接。由于每条连接都有自己的进程,一个针对某条连接的阻塞IO并不会对其他连接的阻塞IO造成影响。

下面的测试程序,它是一个简单的服务器,侦听端口40713上的TCP连接,一次从其输入读取一行数据,并在每行到达时写出每行的ROT13替换。它使用Unix fork()调用为每个传入连接创建一个新进程。

下载代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>

#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>

#define MAX_LINE 16384

char
rot13_char(char c)
{
/* We don't want to use isalpha here; setting the locale would change
* which characters are considered alphabetical. */
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}

void
child(int fd)
{
char outbuf[MAX_LINE+1];
size_t outbuf_used = 0;
ssize_t result;

while (1) {
char ch;
result = recv(fd, &ch, 1, 0);
if (result == 0) {
break;
} else if (result == -1) {
perror("read");
break;
}

/* We do this test to keep the user from overflowing the buffer. */
if (outbuf_used < sizeof(outbuf)) {
outbuf[outbuf_used++] = rot13_char(ch);
}

if (ch == '\n') {
send(fd, outbuf, outbuf_used, 0);
outbuf_used = 0;
continue;
}
}
}

void
run(void)
{
int listener;
struct sockaddr_in sin;

sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(40713);

listener = socket(AF_INET, SOCK_STREAM, 0);

#ifndef WIN32
{
int one = 1;
setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
}
#endif

if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
perror("bind");
return;
}

if (listen(listener, 16)<0) {
perror("listen");
return;
}



while (1) {
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listener, (struct sockaddr*)&ss, &slen);
if (fd < 0) {
perror("accept");
} else {
if (fork() == 0) {
child(fd);
exit(0);
}
}
}
}

int
main(int c, char **v)
{
run();
return 0;
}

我们是否有更好的办法来解决同时处理多条连接的问题呢?多线程多进程在某些平台上是很消耗资源的。实际中,我们更愿意使用线程池而非直接创建一个线程。但是线程数量无法所以扩充。如果需要同时处理几千万的连接,多线程也无能为力。

三. 非阻塞IO

至此,我们需要让非阻塞IO登台了。在UNix中,可以对socket使用如下操作使得这个文件描述符变为非阻塞状态:

1
2
3
#include <fcntl.h>

int fcntl(int fd, F_SETFL, O_NONBLOCK);

一旦使得fd变为非阻塞,那么针对这个fd的任何网络系统调用,不论是否完成、都将立即返回,与此同时还会返回一个error code,以供我们判断系统调用返回时的状态。

所以刚刚的例子似乎可以天真地被写为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int i, n;
char buf[1024];
for (i=0; i < n_sockets; ++i)
fcntl(fd[i], F_SETFL, O_NONBLOCK);

while (i_still_want_to_read()) {
for (i=0; i < n_sockets; ++i) {
n = recv(fd[i], buf, sizeof(buf), 0);
if (n == 0) {
handle_close(fd[i]);
} else if (n < 0) {
if (errno == EAGAIN)
; /* The kernel didn't have any data for us to read. */
else
handle_error(fd[i], errno);仅包含可供使用的套接字
} else {
handle_input(fd[i], buf, n);
}
}
}

以上的代码可以被运行,但是性能很差。有两点原因:

  • 当所有连接都没有数据可读时,循环依旧一直在运行,浪费CPU。
  • 如果处理多条连接,无论是否有数据到达,我们需要为每条连接都进行系统调用。

所以我们需要让内核做以下动作:只有当我们关注的socket有读写事件发生时,再把控制权交还用户,同时告诉用户是哪个socket是准备就绪的。

最古老的解决方案是select()。他拥有读、写、异常三个fd集合。它会等待,直到其中一个集合中的套接字准备就绪,然后将集合更改为仅包含可供使用的套接字。

这是一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
fd_set readset;
int i, n;
char buf[1024];

while (i_still_want_to_read()) {
int maxfd = -1;
FD_ZERO(&readset);

/* Add all of the interesting fds to readset */
for (i=0; i < n_sockets; ++i) {
if (fd[i]>maxfd) maxfd = fd[i];
FD_SET(fd[i], &readset);
}

/* Wait until one or more fds are ready to read */
select(maxfd+1, &readset, NULL, NULL, NULL);

/* Process all of the fds that are still set in readset */
for (i=0; i < n_sockets; ++i) {
if (FD_ISSET(fd[i], &readset)) {
n = recv(fd[i], buf, sizeof(buf), 0);
if (n == 0) {
handle_close(fd[i]);
} else if (n < 0) {
if (errno == EAGAIN)
; /* The kernel didn't have any data for us to read. */
else
handle_error(fd[i], errno);
} else {
handle_input(fd[i], buf, n);
}
}
}
}

以下是重构的ROT13服务器,不再使用多进程处理多连接,使用select():

下载代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For fcntl */
#include <fcntl.h>
/* for select */
#include <sys/select.h>

#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>

#define MAX_LINE 16384

char
rot13_char(char c)
{
/* We don't want to use isalpha here; setting the locale would change
* which characters are considered alphabetical. */
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}

struct fd_state {
char buffer[MAX_LINE];
size_t buffer_used;

int writing;
size_t n_written;
size_t write_upto;
};

struct fd_state *
alloc_fd_state(void)
{
struct fd_state *state = malloc(sizeof(struct fd_state));
if (!state)
return NULL;
state->buffer_used = state->n_written = state->writing =
state->write_upto = 0;
return state;
}

void
free_fd_state(struct fd_state *state)
{
free(state);
}

void
make_nonblocking(int fd)
{
fcntl(fd, F_SETFL, O_NONBLOCK);
}

int
do_read(int fd, struct fd_state *state)
{
char buf[1024];
int i;
ssize_t result;
while (1) {
result = recv(fd, buf, sizeof(buf), 0);
if (result <= 0)
break;

for (i=0; i < result; ++i) {
if (state->buffer_used < sizeof(state->buffer))
state->buffer[state->buffer_used++] = rot13_char(buf[i]);
if (buf[i] == '\n') {
state->writing = 1;
state->write_upto = state->buffer_used;
}
}
}

if (result == 0) {
return 1;
} else if (result < 0) {
if (errno == EAGAIN)
return 0;
return -1;
}

return 0;
}

int
do_write(int fd, struct fd_state *state)
{
while (state->n_written < state->write_upto) {
ssize_t result = send(fd, state->buffer + state->n_written,
state->write_upto - state->n_written, 0);
if (result < 0) {
if (errno == EAGAIN)
return 0;
return -1;
}
assert(result != 0);

state->n_written += result;
}

if (state->n_written == state->buffer_used)
state->n_written = state->write_upto = state->buffer_used = 0;

state->writing = 0;

return 0;
}

void
run(void)
{
int listener;
struct fd_state *state[FD_SETSIZE];
struct sockaddr_in sin;
int i, maxfd;
fd_set readset, writeset, exset;

sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(40713);

for (i = 0; i < FD_SETSIZE; ++i)
state[i] = NULL;

listener = socket(AF_INET, SOCK_STREAM, 0);
make_nonblocking(listener);

#ifndef WIN32
{
int one = 1;
setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
}
#endif

if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
perror("bind");
return;
}

if (listen(listener, 16)<0) {
perror("listen");
return;
}

FD_ZERO(&readset);
FD_ZERO(&writeset);
FD_ZERO(&exset);

while (1) {
maxfd = listener;

FD_ZERO(&readset);
FD_ZERO(&writeset);
FD_ZERO(&exset);

FD_SET(listener, &readset);

for (i=0; i < FD_SETSIZE; ++i) {
if (state[i]) {
if (i > maxfd)
maxfd = i;
FD_SET(i, &readset);
if (state[i]->writing) {
FD_SET(i, &writeset);
}
}
}

if (select(maxfd+1, &readset, &writeset, &exset, NULL) < 0) {
perror("select");
return;
}

if (FD_ISSET(listener, &readset)) {
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listener, (struct sockaddr*)&ss, &slen);
if (fd < 0) {
perror("accept");
} else if (fd > FD_SETSIZE) {
close(fd);
} else {
make_nonblocking(fd);
state[fd] = alloc_fd_state();
assert(state[fd]);/*XXX*/
}
}

for (i=0; i < maxfd+1; ++i) {
int r = 0;
if (i == listener)
continue;

if (FD_ISSET(i, &readset)) {
r = do_read(i, state[i]);
}
if (r == 0 && FD_ISSET(i, &writeset)) {
r = do_write(i, state[i]);
}
if (r) {
free_fd_state(state[i]);
state[i] = NULL;
close(i);
}
}
}
}

int
main(int c, char **v)
{
setvbuf(stdout, NULL, _IONBF, 0);

run();
return 0;
}

但是这并没有结束,由于生成和读取select()位数组所花费的时间与为select()提供的最大fd成正比,因此当套接字数量较多时,select()调用会急剧增加。

四. libevent

不同的操作系统提供一些select()的代替方法。例如epoll(),kqueue()等。

如果想编写一个可移植的高性能异步应用程序,将需要一个抽象来包装所有这些接口,并提供其中最有效的一个。这就是Libevent的底层API所做的事情。

下载代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For fcntl */
#include <fcntl.h>

#include <event2/event.h>

#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>

#define MAX_LINE 16384

void do_read(evutil_socket_t fd, short events, void *arg);
void do_write(evutil_socket_t fd, short events, void *arg);

char
rot13_char(char c)
{
/* We don't want to use isalpha here; setting the locale would change
* which characters are considered alphabetical. */
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}

struct fd_state {
char buffer[MAX_LINE];
size_t buffer_used;

size_t n_written;
size_t write_upto;

struct event *read_event;
struct event *write_event;
};

struct fd_state *
alloc_fd_state(struct event_base *base, evutil_socket_t fd)
{
struct fd_state *state = malloc(sizeof(struct fd_state));
if (!state)
return NULL;
state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state);
if (!state->read_event) {
free(state);
return NULL;
}
state->write_event =
event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state);

if (!state->write_event) {
event_free(state->read_event);
free(state);
return NULL;
}

state->buffer_used = state->n_written = state->write_upto = 0;

assert(state->write_event);
return state;
}

void
free_fd_state(struct fd_state *state)
{
event_free(state->read_event);
event_free(state->write_event);
free(state);
}

void
do_read(evutil_socket_t fd, short events, void *arg)
{
struct fd_state *state = arg;
char buf[1024];
int i;
ssize_t result;
while (1) {
assert(state->write_event);
result = recv(fd, buf, sizeof(buf), 0);
if (result <= 0)
break;

for (i=0; i < result; ++i) {
if (state->buffer_used < sizeof(state->buffer))
state->buffer[state->buffer_used++] = rot13_char(buf[i]);
if (buf[i] == '\n') {
assert(state->write_event);
event_add(state->write_event, NULL);
state->write_upto = state->buffer_used;
}
}
}

if (result == 0) {
free_fd_state(state);
} else if (result < 0) {
if (errno == EAGAIN) // XXXX use evutil macro
return;
perror("recv");
free_fd_state(state);
}
}

void
do_write(evutil_socket_t fd, short events, void *arg)
{
struct fd_state *state = arg;

while (state->n_written < state->write_upto) {
ssize_t result = send(fd, state->buffer + state->n_written,
state->write_upto - state->n_written, 0);
if (result < 0) {
if (errno == EAGAIN) // XXX use evutil macro
return;
free_fd_state(state);
return;
}
assert(result != 0);

state->n_written += result;
}

if (state->n_written == state->buffer_used)
state->n_written = state->write_upto = state->buffer_used = 1;

event_del(state->write_event);
}

void
do_accept(evutil_socket_t listener, short event, void *arg)
{
struct event_base *base = arg;
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listener, (struct sockaddr*)&ss, &slen);
if (fd < 0) { // XXXX eagain??
perror("accept");
} else if (fd > FD_SETSIZE) {
close(fd); // XXX replace all closes with EVUTIL_CLOSESOCKET */
} else {
struct fd_state *state;
evutil_make_socket_nonblocking(fd);
state = alloc_fd_state(base, fd);
assert(state); /*XXX err*/
assert(state->write_event);
event_add(state->read_event, NULL);
}
}

void
run(void)
{
evutil_socket_t listener;
struct sockaddr_in sin;
struct event_base *base;
struct event *listener_event;

base = event_base_new();
if (!base)
return; /*XXXerr*/

sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(40713);

listener = socket(AF_INET, SOCK_STREAM, 0);
evutil_make_socket_nonblocking(listener);

#ifndef WIN32
{
int one = 1;
setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
}
#endif

if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
perror("bind");
return;
}

if (listen(listener, 16)<0) {
perror("listen");
return;
}

listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
/*XXX check it */
event_add(listener_event, NULL);

event_base_dispatch(base);
}

int
main(int c, char **v)
{
setvbuf(stdout, NULL, _IONBF, 0);

run();
return 0;
}

可能已经注意到,随着代码变得更加高效,它也变得更加复杂。
bufferevents接口解决了这两个问题:它使程序编写起来更加简单,这是我们最后一次使用bufferevents API的ROT13服务器。

下载代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For fcntl */
#include <fcntl.h>

#include <event2/event.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>

#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>

#define MAX_LINE 16384

void do_read(evutil_socket_t fd, short events, void *arg);
void do_write(evutil_socket_t fd, short events, void *arg);

char
rot13_char(char c)
{
/* We don't want to use isalpha here; setting the locale would change
* which characters are considered alphabetical. */
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}

void
readcb(struct bufferevent *bev, void *ctx)
{
struct evbuffer *input, *output;
char *line;
size_t n;
int i;
input = bufferevent_get_input(bev);
output = bufferevent_get_output(bev);

while ((line = evbuffer_readln(input, &n, EVBUFFER_EOL_LF))) {
for (i = 0; i < n; ++i)
line[i] = rot13_char(line[i]);
evbuffer_add(output, line, n);
evbuffer_add(output, "\n", 1);
free(line);
}

if (evbuffer_get_length(input) >= MAX_LINE) {
/* Too long; just process what there is and go on so that the buffer
* doesn't grow infinitely long. */
char buf[1024];
while (evbuffer_get_length(input)) {
int n = evbuffer_remove(input, buf, sizeof(buf));
for (i = 0; i < n; ++i)
buf[i] = rot13_char(buf[i]);
evbuffer_add(output, buf, n);
}
evbuffer_add(output, "\n", 1);
}
}

void
errorcb(struct bufferevent *bev, short error, void *ctx)
{
if (error & BEV_EVENT_EOF) {
/* connection has been closed, do any clean up here */
/* ... */
} else if (error & BEV_EVENT_ERROR) {
/* check errno to see what error occurred */
/* ... */
} else if (error & BEV_EVENT_TIMEOUT) {
/* must be a timeout event handle, handle it */
/* ... */
}
bufferevent_free(bev);
}

void
do_accept(evutil_socket_t listener, short event, void *arg)
{
struct event_base *base = arg;
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listener, (struct sockaddr*)&ss, &slen);
if (fd < 0) {
perror("accept");
} else if (fd > FD_SETSIZE) {
close(fd);
} else {
struct bufferevent *bev;
evutil_make_socket_nonblocking(fd);
bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);
bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);
bufferevent_enable(bev, EV_READ|EV_WRITE);
}
}

void
run(void)
{
evutil_socket_t listener;
struct sockaddr_in sin;
struct event_base *base;
struct event *listener_event;

base = event_base_new();
if (!base)
return; /*XXXerr*/

sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(40713);

listener = socket(AF_INET, SOCK_STREAM, 0);
evutil_make_socket_nonblocking(listener);

#ifndef WIN32
{
int one = 1;
setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
}
#endif

if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
perror("bind");
return;
}

if (listen(listener, 16)<0) {
perror("listen");
return;
}

listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
/*XXX check it */
event_add(listener_event, NULL);

event_base_dispatch(base);
}

int
main(int c, char **v)
{
setvbuf(stdout, NULL, _IONBF, 0);

run();
return 0;
}