summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRusty Russell <rusty@rustcorp.com.au>2013-12-07 17:40:47 +1030
committerRusty Russell <rusty@rustcorp.com.au>2013-12-07 17:41:44 +1030
commit12ab811533406e22b78ce8b227474f1056375ae4 (patch)
treeee4cd9b73c9b6c1272ecc349f388f99eade9a1e5
parent075120f34490f1b7c55a4ebdc70bbcff17e55a84 (diff)
io: io_always, and zero-length operations support.
A zero-length read should complete immediately, even if the fd isn't readable. Wire this up, and expose it for callers to use. Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
-rw-r--r--ccan/io/backend.h4
-rw-r--r--ccan/io/io.c38
-rw-r--r--ccan/io/io.h23
-rw-r--r--ccan/io/poll.c37
-rw-r--r--ccan/io/test/run-18-errno.c2
-rw-r--r--ccan/io/test/run-19-always-DEBUG.c8
-rw-r--r--ccan/io/test/run-19-always.c133
7 files changed, 240 insertions, 5 deletions
diff --git a/ccan/io/backend.h b/ccan/io/backend.h
index 77d51dda..e2090ff1 100644
--- a/ccan/io/backend.h
+++ b/ccan/io/backend.h
@@ -3,6 +3,10 @@
#define CCAN_IO_BACKEND_H
#include <stdbool.h>
#include <ccan/timer/timer.h>
+#include <poll.h>
+
+/* A setting for actions to always run (eg. zero-length reads). */
+#define POLLALWAYS (((POLLIN|POLLOUT) + 1) & ~((POLLIN|POLLOUT)))
struct io_alloc {
void *(*alloc)(size_t size);
diff --git a/ccan/io/io.c b/ccan/io/io.c
index faf8b87b..734cb393 100644
--- a/ccan/io/io.c
+++ b/ccan/io/io.c
@@ -8,7 +8,6 @@
#include <errno.h>
#include <stdlib.h>
#include <assert.h>
-#include <poll.h>
#include <unistd.h>
#include <fcntl.h>
@@ -232,6 +231,26 @@ bool io_timeout_(struct io_conn *conn, struct timespec ts,
return true;
}
+/* Always done: call the next thing. */
+static int do_always(int fd, struct io_plan *plan)
+{
+ return 1;
+}
+
+struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *),
+ void *arg)
+{
+ struct io_plan plan;
+
+ assert(cb);
+ plan.io = do_always;
+ plan.next = cb;
+ plan.next_arg = arg;
+ plan.pollflag = POLLALWAYS;
+
+ return plan;
+}
+
/* Returns true if we're finished. */
static int do_write(int fd, struct io_plan *plan)
{
@@ -252,6 +271,10 @@ struct io_plan io_write_(const void *data, size_t len,
struct io_plan plan;
assert(cb);
+
+ if (len == 0)
+ return io_always_(cb, arg);
+
plan.u1.const_vp = data;
plan.u2.s = len;
plan.io = do_write;
@@ -281,11 +304,16 @@ struct io_plan io_read_(void *data, size_t len,
struct io_plan plan;
assert(cb);
+
+ if (len == 0)
+ return io_always_(cb, arg);
+
plan.u1.cp = data;
plan.u2.s = len;
plan.io = do_read;
plan.next = cb;
plan.next_arg = arg;
+
plan.pollflag = POLLIN;
return plan;
@@ -309,6 +337,10 @@ struct io_plan io_read_partial_(void *data, size_t *len,
struct io_plan plan;
assert(cb);
+
+ if (*len == 0)
+ return io_always_(cb, arg);
+
plan.u1.cp = data;
plan.u2.vp = len;
plan.io = do_read_partial;
@@ -337,6 +369,10 @@ struct io_plan io_write_partial_(const void *data, size_t *len,
struct io_plan plan;
assert(cb);
+
+ if (*len == 0)
+ return io_always_(cb, arg);
+
plan.u1.const_vp = data;
plan.u2.vp = len;
plan.io = do_write_partial;
diff --git a/ccan/io/io.h b/ccan/io/io.h
index 558a8769..bcdb11fd 100644
--- a/ccan/io/io.h
+++ b/ccan/io/io.h
@@ -291,6 +291,29 @@ struct io_plan io_write_partial_(const void *data, size_t *len,
void *arg);
/**
+ * io_always - plan to immediately call next callback.
+ * @cb: function to call.
+ * @arg: @cb argument
+ *
+ * Sometimes it's neater to plan a callback rather than call it directly;
+ * for example, if you only need to read data for one path and not another.
+ *
+ * Example:
+ * static void start_conn_with_nothing(int fd)
+ * {
+ * // Silly example: close on next time around loop.
+ * io_new_conn(fd, io_always(io_close_cb, NULL));
+ * }
+ */
+#define io_always(cb, arg) \
+ io_debug(io_always_(typesafe_cb_preargs(struct io_plan, void *, \
+ (cb), (arg), \
+ struct io_conn *), \
+ (arg)))
+struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *),
+ void *arg);
+
+/**
* io_connect - plan to connect to a listening socket.
* @fd: file descriptor.
* @addr: where to connect.
diff --git a/ccan/io/poll.c b/ccan/io/poll.c
index 18691e17..d7b9eb56 100644
--- a/ccan/io/poll.c
+++ b/ccan/io/poll.c
@@ -10,6 +10,7 @@
#include <errno.h>
static size_t num_fds = 0, max_fds = 0, num_closing = 0, num_waiting = 0;
+static bool some_always = false;
static struct pollfd *pollfds = NULL;
static struct fd **fds = NULL;
static struct timers timeouts;
@@ -146,9 +147,9 @@ void backend_plan_changed(struct io_conn *conn)
if (pfd->events)
num_waiting--;
- pfd->events = conn->plan.pollflag;
+ pfd->events = conn->plan.pollflag & (POLLIN|POLLOUT);
if (conn->duplex) {
- int mask = conn->duplex->plan.pollflag;
+ int mask = conn->duplex->plan.pollflag & (POLLIN|POLLOUT);
/* You can't *both* read/write. */
assert(!mask || pfd->events != mask);
pfd->events |= mask;
@@ -161,15 +162,20 @@ void backend_plan_changed(struct io_conn *conn)
if (!conn->plan.next)
num_closing++;
+
+ if (conn->plan.pollflag == POLLALWAYS)
+ some_always = true;
}
bool add_conn(struct io_conn *c)
{
- if (!add_fd(&c->fd, c->plan.pollflag))
+ if (!add_fd(&c->fd, c->plan.pollflag & (POLLIN|POLLOUT)))
return false;
/* Immediate close is allowed. */
if (!c->plan.next)
num_closing++;
+ if (c->plan.pollflag == POLLALWAYS)
+ some_always = true;
return true;
}
@@ -267,6 +273,26 @@ void backend_del_timeout(struct io_conn *conn)
conn->timeout->conn = NULL;
}
+static void handle_always(void)
+{
+ int i;
+
+ some_always = false;
+
+ for (i = 0; i < num_fds && !io_loop_return; i++) {
+ struct io_conn *c = (void *)fds[i];
+
+ if (fds[i]->listener)
+ continue;
+
+ if (c->plan.pollflag == POLLALWAYS)
+ io_ready(c);
+
+ if (c->duplex && c->duplex->plan.pollflag == POLLALWAYS)
+ io_ready(c->duplex);
+ }
+}
+
/* This is the main loop. */
void *do_io_loop(struct io_conn **ready)
{
@@ -317,6 +343,11 @@ void *do_io_loop(struct io_conn **ready)
if (doing_debug() && some_timeouts)
continue;
+ if (some_always) {
+ handle_always();
+ continue;
+ }
+
if (num_fds == 0)
break;
diff --git a/ccan/io/test/run-18-errno.c b/ccan/io/test/run-18-errno.c
index 985a3229..222c0fb5 100644
--- a/ccan/io/test/run-18-errno.c
+++ b/ccan/io/test/run-18-errno.c
@@ -36,7 +36,7 @@ static void init_conn(int fd, int *state)
(*state)++;
close(fd);
errno = 0;
- io_set_finish(io_new_conn(fd, io_read(state, 0,
+ io_set_finish(io_new_conn(fd, io_read(state, 1,
io_close_cb, NULL)),
finish_EBADF, state);
}
diff --git a/ccan/io/test/run-19-always-DEBUG.c b/ccan/io/test/run-19-always-DEBUG.c
new file mode 100644
index 00000000..4decacd8
--- /dev/null
+++ b/ccan/io/test/run-19-always-DEBUG.c
@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64019"
+#define main real_main
+int real_main(void);
+#include "run-19-always.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
diff --git a/ccan/io/test/run-19-always.c b/ccan/io/test/run-19-always.c
new file mode 100644
index 00000000..e6413fc9
--- /dev/null
+++ b/ccan/io/test/run-19-always.c
@@ -0,0 +1,133 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65019"
+#endif
+
+struct data {
+ int state;
+ size_t bytes;
+ char *buf;
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 1);
+ d->state++;
+ io_break(d, io_idle());
+}
+
+static struct io_plan write_buf(struct io_conn *conn, struct data *d)
+{
+ return io_write(d->buf, d->bytes, io_close_cb, d);
+}
+
+static void init_conn(int fd, struct data *d)
+{
+ ok1(d->state == 0);
+ d->state++;
+ /* Empty read should run immediately... */
+ io_set_finish(io_new_conn(fd, io_read(NULL, 0, write_buf, d)),
+ finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+static void read_from_socket(size_t bytes, const struct addrinfo *addrinfo)
+{
+ int fd, done, r;
+ char buf[100];
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+
+ for (done = 0; done < bytes; done += r) {
+ r = read(fd, buf, sizeof(buf));
+ if (r < 0)
+ exit(3);
+ done += r;
+ }
+ close(fd);
+}
+
+int main(void)
+{
+ struct data *d = malloc(sizeof(*d));
+ struct addrinfo *addrinfo;
+ struct io_listener *l;
+ int fd, status;
+
+ /* This is how many tests you plan to run */
+ plan_tests(9);
+ d->state = 0;
+ d->bytes = 1024*1024;
+ d->buf = malloc(d->bytes);
+ memset(d->buf, 'a', d->bytes);
+ fd = make_listen_fd(PORT, &addrinfo);
+ ok1(fd >= 0);
+ l = io_new_listener(fd, init_conn, d);
+ ok1(l);
+ fflush(stdout);
+ if (!fork()) {
+ io_close_listener(l);
+ read_from_socket(d->bytes, addrinfo);
+ freeaddrinfo(addrinfo);
+ free(d->buf);
+ free(d);
+ exit(0);
+ }
+ ok1(io_loop() == d);
+ ok1(d->state == 2);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ freeaddrinfo(addrinfo);
+ free(d->buf);
+ free(d);
+ io_close_listener(l);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}