diff options
author | Kent Overstreet <koverstreet@google.com> | 2013-06-28 13:13:17 -0700 |
---|---|---|
committer | Kent Overstreet <koverstreet@google.com> | 2013-06-28 13:13:17 -0700 |
commit | 244386f05c82db9889cadd7493f3b91256b14041 (patch) | |
tree | abd6cee625e39e70e676f71fae34154498c62e57 | |
parent | a58144dfc93061d32e91d8396603109bee0bf269 (diff) |
other aio test code
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | Makefile | 20 | ||||
-rw-r--r-- | aio-multithread-test.c | 141 | ||||
-rw-r--r-- | aio-thread-throughput.c | 431 | ||||
-rw-r--r-- | table.c | 75 | ||||
-rw-r--r-- | util.h | 7 |
6 files changed, 672 insertions, 4 deletions
@@ -1,4 +1,6 @@ aio-cancel aio-cancel-old .* +cscope.* +tags *.o @@ -1,8 +1,20 @@ -CFLAGS=-g -O2 -LDLIBS=-laio +CFLAGS := -g -O2 -Wall -Werror -D_FILE_OFFSET_BITS=64 -I. + +LDLIBS := -laio + +all : aio-cancel aio-multithread-test aio-thread-throughput + +OBJS := $(patsubst %.c,%.o,$(wildcard *.c)) +DEP_FILES := $(wildcard *.d) + +ifneq ($(DEP_FILES),) + -include $(DEP_FILES) +endif + +%.o %.d: %.c Makefile + gcc $(CFLAGS) -MD -MP -MF $*.d -c $< -o $*.o -all : aio-cancel .PHONY : clean clean : - -rm aio-cancel + -rm aio-cancel aio-multithread-test aio-thread-throughput $(OBJS) $(DEP_FILES) diff --git a/aio-multithread-test.c b/aio-multithread-test.c new file mode 100644 index 0000000..e0cae24 --- /dev/null +++ b/aio-multithread-test.c @@ -0,0 +1,141 @@ +#define _GNU_SOURCE +#define _LARGEFILE_SOURCE +#define _FILE_OFFSET_BITS 64 + +#include <errno.h> +#include <fcntl.h> +#include <limits.h> +#include <linux/fs.h> +#include <stdio.h> +#include <stdint.h> +#include <stdlib.h> +#include <sys/ioctl.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> + +#include <libaio.h> +#include <pthread.h> + +#define NR_IOCBS 256 +#define NR_COMPLETIONS 1 + +uint64_t nr_blocks = 1024 * 1024 * 4; +int fd; +io_context_t ioctx; + +uint64_t getblocks(int fd) +{ + uint64_t ret; + struct stat statbuf; + if (fstat(fd, &statbuf)) { + perror("stat error\n"); + exit(EXIT_FAILURE); + } + ret = statbuf.st_size / 512; + if (S_ISBLK(statbuf.st_mode)) + if (ioctl(fd, BLKGETSIZE, &ret)) { + perror("ioctl error"); + exit(EXIT_FAILURE); + } + return ret / 8; +} + +static void *iothread(void *p) +{ + char __attribute__((aligned(4096))) buf[4096]; + unsigned seed = 0; + + while (1) { + struct iocb iocb[NR_IOCBS]; + struct iocb *iocbp[NR_IOCBS]; + unsigned i; + int ret; + + memset(iocb, 0, sizeof(struct iocb) * NR_IOCBS); + + for (i = 0; i < 16; i++) { + uint64_t offset = rand_r(&seed); + + iocb[i].aio_lio_opcode = IO_CMD_PREAD; + iocb[i].aio_fildes = fd; + + iocb[i].u.c.buf = buf; + iocb[i].u.c.nbytes = 4096; + iocb[i].u.c.offset = (offset % nr_blocks) * 4096; + //printf("offset %llu\n", iocb[i].u.c.offset); + + iocbp[i] = &iocb[i]; + } + + ret = io_submit(ioctx, NR_IOCBS, iocbp); + if (ret < 0 && ret != -EAGAIN) + printf("io_submit() error %i\n", ret); + //else + // printf("submitted %i iocbs\n", ret); + + //io_destroy(ioctx); + //exit(EXIT_SUCCESS); + } + + return NULL; +} + +int main(int argc, char **argv) +{ + pthread_t threads[4]; + unsigned i; + + memset(threads, 0, sizeof(pthread_t) * 4); + + if (argc != 2) { + printf("Specify a file/device to test against\n"); + exit(EXIT_FAILURE); + } + + fd = open(argv[1], O_RDONLY|O_DIRECT); + if (fd < 0) { + perror("Open error"); + exit(EXIT_FAILURE); + } + + //nr_blocks = getblocks(fd); + + if (io_setup(INT_MAX, &ioctx) && + io_setup(16384, &ioctx)) { + perror("Error creating io context"); + exit(EXIT_FAILURE); + } + + for (i = 0; i < 4; i++) + if (pthread_create(&threads[i], NULL, iothread, NULL)) { + printf("pthread_create() error\n"); + exit(EXIT_FAILURE); + } + + while (1) { + struct timespec timeout; + struct io_event events[NR_COMPLETIONS]; + int ret; + + timeout.tv_sec = 0; + timeout.tv_nsec = 10000; + + ret = io_getevents(ioctx, 1, NR_COMPLETIONS, events, &timeout); + if (ret < 0) + printf("io_getevents error\n"); + else { + //printf("got %i completions\n", ret); + + for (i = 0; i < ret; i++) { + int res = events[i].res; + + if (res < 0) + printf("io_event err %i\n", res); + } + } + + } + + exit(EXIT_SUCCESS); +} diff --git a/aio-thread-throughput.c b/aio-thread-throughput.c new file mode 100644 index 0000000..4b0eb04 --- /dev/null +++ b/aio-thread-throughput.c @@ -0,0 +1,431 @@ +#define _GNU_SOURCE /* for sched_setaffinity(2) */ +#include <stdlib.h> +#include <stdio.h> +#include <unistd.h> +#include <getopt.h> +#include <limits.h> +#include <string.h> +#include <stdint.h> +#include <inttypes.h> +#include <pthread.h> +#include <errno.h> +#include <sched.h> +#include <libaio.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <math.h> + +#include "util.h" +#include "table.h" + +/* + * This test pins groups of threads to cpus and then spins calling + * io_submit() and io_getevents(). All the threads share an io + * context. The test is meant to measure the cost of having threads + * share a context. + * + * At the end of a test a table is printed which indicates the + * throughput for each configuration of groups of submit and get + * threads. + * + * 0 1 nr many + * 0 1541033 271331 2725218 41935 891714 1928 882900 1233 + * 1 2762664 8037 1588041 378061 897900 44954 897947 10117 + * nr 1768911 1511 1820524 10584 1004088 2124 784318 308 + * many 1855700 328715 1879278 6628 1302796 1936 707631 1348 + * + * 0: a single thread ran on the first cpu + * 1: a single thread ran on the second cpu + * nr: a thread ran on each available cpu + * many: 4 threads started per cpu, but no binding at all + * + * The columns indicate the number of get threads and the rows indicate + * the number of submit threads. The completion rate is sampled each + * second and the mean and stddev of the samples is printed for each + * configuration. + * + * For example, when io_submit() ran on cpu 0 and io_getevents() ran + * pinned to every cpu there were 891714 completions/s with a stddev of + * 1928. + */ + +struct shared { + pthread_mutex_t mutex; + pthread_cond_t cond; + unsigned long nr_to_submit; + io_context_t aio_ctx; + unsigned long aio_nr; +}; + +struct per_thread { + pthread_t pthread; + struct shared *shared; + + unsigned long id; + int cpu; + unsigned int submit:1, + live:1; + unsigned long count; + int fd; +}; + +static void submit_loop(struct per_thread *thd, struct shared *shared) +{ + struct iocb *iocbs; + struct iocb **ios; + char junk; + long nr = 0; + long i; + int ret; + + iocbs = calloc(shared->aio_nr, sizeof(struct iocb)); + ios = calloc(shared->aio_nr, sizeof(struct iocb *)); + + while (shared->aio_ctx) { + if (nr == 0) { + pthread_mutex_lock(&shared->mutex); + while (shared->aio_ctx && shared->nr_to_submit == 0) { + pthread_cond_wait(&shared->cond, + &shared->mutex); + } + nr = min(shared->aio_nr, shared->nr_to_submit); + shared->nr_to_submit -= nr; + pthread_mutex_unlock(&shared->mutex); + if (!shared->aio_ctx) + break; + } + + for (i = 0; i < nr; i++) { + io_prep_pread(&iocbs[i], thd->fd, &junk, 1, 0); + ios[i] = &iocbs[i]; + } + + ret = io_submit(shared->aio_ctx, nr, ios); + if (!shared->aio_ctx) + break; + if (ret <= 0) { + printf("nr %lu ret %d ctx %p\n", nr, ret, shared->aio_ctx); + perror("io_submit failed"); + exit(1); + } + + nr -= ret; + } + + free(iocbs); + free(ios); +} + +static void get_loop(struct per_thread *thd, struct shared *shared) +{ + struct io_event *events; + int ret; + + events = calloc(shared->aio_nr, sizeof(struct io_event)); + + while (shared->aio_ctx) { + ret = io_getevents(shared->aio_ctx, 1, shared->aio_nr, events, + NULL); + if (!shared->aio_ctx) + break; + if (ret <= 0) { + perror("io_getevents failed"); + exit(1); + } + + thd->count += ret; + + pthread_mutex_lock(&shared->mutex); + shared->nr_to_submit += ret; + pthread_cond_signal(&shared->cond); + pthread_mutex_unlock(&shared->mutex); + } + + free(events); +} + +/* + * Give each thread their own file and inode which won't + * generate io to reduce overhead of the buffered reads. + */ +static int open_file(unsigned long id) +{ + char path[PATH_MAX]; + + sprintf(path, "/dev/shm/aio-thread-throughput-file-%lu", id); + return open(path, O_CREAT|O_RDONLY, 0700); +} + +static void *thread_func(void *arg) +{ + struct per_thread *thd = arg; + struct shared *shared = thd->shared; + + thd->fd = open_file(thd->id); + if (thd->fd < 0) { + perror("open /dev/null"); + exit(1); + } + + /* would need dynamic cpu sets > 1024 cpus */ + if (thd->cpu >= 0) { + cpu_set_t want; + cpu_set_t set; + + CPU_ZERO(&want); + CPU_SET(thd->cpu, &want); + if (pthread_setaffinity_np(thd->pthread, sizeof(want), &want) || + pthread_getaffinity_np(thd->pthread, sizeof(set), &set)) { + perror("pthread affinity failed"); + exit(1); + } + + if (!CPU_EQUAL(&want, &set)) { + printf("couldn't set cpu set\n"); + exit(1); + } + } + + if (thd->submit) { + submit_loop(thd, shared); + } else { + get_loop(thd, shared); + } + + close(thd->fd); + + return NULL; +} + +static int get_cpus(unsigned int *cpus, unsigned int max) +{ + unsigned int i; + unsigned int nr; + cpu_set_t cpuset; + + if (sched_getaffinity(getpid(), sizeof(cpuset), &cpuset)) + return 0; + + for (i = 0, nr = 0; i < CPU_SETSIZE && nr < max; i++) { + if (CPU_ISSET(i, &cpuset)) + cpus[nr++] = i; + } + + return nr; +} + +struct samples { + unsigned long i; + unsigned long max; + uint64_t count[0]; +}; + +static double u64mean(uint64_t *samples, unsigned long count) +{ + unsigned long i; + uint64_t sum; + + for (i = 0, sum = 0; i < count; i++) + sum += samples[i]; + + return sum / count; +} + +static double sample_stddev(uint64_t *samples, unsigned long count) +{ + unsigned long i; + double mean; + uint64_t sum; + + if (count < 2) + return NAN; + + mean = u64mean(samples, count); + + for (i = 0, sum = 0; i < count; i++) + sum += pow((double)samples[i] - mean, 2.0); + + return sqrt(sum / (count - 1)); +} + +int main(int argc, char **argv) +{ + static char *names[] = {"0", "1", "nr", "many"}; + unsigned int cpus[CPU_SETSIZE]; + struct shared shared; + struct per_thread *threads; + struct per_thread *thd; + io_context_t ctx; + int submit_mode; + int get_mode; + int cpu_ind; + int mode; + unsigned int i; + uint64_t total; + int is_submit; + int nr_cpus; + int ret; + int nr; + unsigned long iter = 30; + uint64_t *samples; + struct table *table; + + /* figure out which cpus are possible */ + nr_cpus = get_cpus(cpus, CPU_SETSIZE); + if (nr_cpus == 0) { + printf("couldn't find nr cpus\n"); + exit(1); + } + if (nr_cpus == 1) { + printf("need more than one cpu\n"); + exit(1); + } + + shared.aio_nr = 256; + shared.aio_ctx = NULL; + + threads = calloc((nr_cpus * 4 * 2) + 1, sizeof(struct per_thread)); + if (!threads) { + printf("error: couldn't allocate threads\n"); + exit(1); + } + + samples = calloc(iter, sizeof(samples[0])); + if (!samples) { + printf("error: couldn't allocate %lu samples\n", iter); + exit(1); + } + + table = table_alloc(9, 5); + if (!samples) { + printf("error: couldn't allocate table\n"); + exit(1); + } + + for (i = 0; i < 4; i++) { + table_set_cell(table, 0, 1 + i, "%s", names[i]); + table_set_cell(table, (i + 1) * 2, 0, "%s", names[i]); + } + + printf("will run on %d cpus: ", nr_cpus); + for (i = 0; i < nr_cpus; i++) + printf("%u%c", cpus[i], i == nr_cpus - 1 ? '\n' : ','); + + printf("each set of threads will run for %lu seconds\n", iter); + + for (submit_mode = 0; submit_mode < 4; submit_mode++) { + for (get_mode = 0; get_mode < 4; get_mode++) { + + /* get a new context for each mode */ + ret = io_setup(shared.aio_nr, &shared.aio_ctx); + if (ret) { + printf("io_setup: %s\n", strerror(-ret)); + exit(1); + } + shared.nr_to_submit = 0; + + pthread_mutex_init(&shared.mutex, NULL); + pthread_cond_init(&shared.cond, NULL); + + thd = threads; + + for (is_submit = 0; is_submit < 2; is_submit++) { + + if (is_submit) + mode = submit_mode; + else + mode = get_mode; + + if (mode == 0 || mode == 1) { + cpu_ind = mode; + nr = 1; + } else if (mode == 2) { + cpu_ind = 0; + nr = nr_cpus; + } else { + cpu_ind = -1; + nr = nr_cpus * 4; + } + + /* fire off threads */ + for (i = 0; i < nr; i++, thd++) { + thd->shared = &shared; + thd->submit = is_submit; + thd->live = 1; + thd->id = thd - threads; + if (cpu_ind >= 0) { + thd->cpu = cpus[cpu_ind++]; + } else { + thd->cpu = -1; + } + + ret = pthread_create(&thd->pthread, + NULL, + thread_func, thd); + if (ret) { + printf("pthread_create: %s\n", strerror(ret)); + exit(1); + } + } + } + + /* tell everyone to get going */ + pthread_mutex_lock(&shared.mutex); + shared.nr_to_submit = shared.aio_nr; + pthread_cond_signal(&shared.cond); + pthread_mutex_unlock(&shared.mutex); + + printf("get: %s submit: %s\n", + names[get_mode], names[submit_mode]); + + /* threads should stabilize after a quarter second */ + usleep(250000); + for (thd = threads; thd->live; thd++) + thd->count = 0; + + /* run for a bit and grab numbers */ + for (i = 0; i < iter; i++) { + total = 0; + sleep(1); + for (thd = threads; thd->live; thd++) { + total += thd->count; + thd->count = 0; + } + samples[i] = total; + printf("%u: completions: %"PRIu64" mean: %f stddev: %f\n", + i, total, u64mean(samples, i + 1), + sample_stddev(samples, i + 1)); + } + + table_set_cell(table, 1 + (get_mode * 2), + submit_mode + 1, "%lu", + (unsigned long)u64mean(samples, iter)); + + table_set_cell(table, 1 + (get_mode * 2) + 1, + submit_mode + 1, "%lu", + (unsigned long)sample_stddev(samples, iter)); + + /* tell threads to stop */ + pthread_mutex_lock(&shared.mutex); + ctx = shared.aio_ctx; + shared.aio_ctx = NULL; + if (io_destroy(ctx)) { + perror("io_destroy"); + exit(1); + } + pthread_cond_broadcast(&shared.cond); + pthread_mutex_unlock(&shared.mutex); + + /* and drain 'em */ + for (thd = threads; thd->live; thd++) { + pthread_join(thd->pthread, NULL); + thd->live = 0; + } + } + } + + table_printf(table); + + return 0; +} @@ -0,0 +1,75 @@ +#include <stdlib.h> +#include <stdio.h> +#include <stdarg.h> + +#include "table.h" +#include "util.h" + +struct cell { + char str[100]; +}; + +struct table { + unsigned long wid; + unsigned long hei; + int *widest; + struct cell *cells; +}; + +struct table *table_alloc(unsigned long wid, unsigned long hei) +{ + struct table *table; + + table = malloc(sizeof(struct table)); + if (table) { + table->widest = calloc(wid, sizeof(table->widest[0])); + table->cells = calloc(wid * hei, sizeof(struct cell)); + if (table->widest && table->cells) { + table->wid = wid; + table->hei = hei; + } else { + free(table->widest); + free(table->cells); + free(table); + table = NULL; + } + } + + return table; +} + +static struct cell *get_cell(struct table *table, + unsigned long x, unsigned long y) +{ + return &table->cells[(y * table->wid) + x]; +} + +void table_set_cell(struct table *table, unsigned long x, unsigned long y, + char *fmt, ...) +{ + struct cell *cell; + va_list ap; + int len; + + cell = get_cell(table, x, y); + va_start(ap, fmt); + len = vsnprintf(cell->str, sizeof(cell->str), fmt, ap); + va_end(ap); + + table->widest[x] = max(table->widest[x], len); +} + +void table_printf(struct table *table) +{ + unsigned long x; + unsigned long y; + struct cell *cell; + + for (y = 0; y < table->hei; y++) { + for (x = 0; x < table->wid; x++) { + cell = get_cell(table, x, y); + printf("%*s ", table->widest[x], cell->str); + } + printf("\n"); + } +} @@ -0,0 +1,7 @@ +#ifndef __UTIL_H__ +#define __UTIL_H__ + +#define min(a, b) (a < b ? a : b) +#define max(a, b) (a > b ? a : b) + +#endif |