From e5cf4d5ae338b4e3ac5ba6af3f2fb03306eaf5d9 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Mon, 5 Feb 2024 00:51:23 -0500 Subject: thread_with_file: Lift from bcachefs thread_with_file and thread_with_stdio are abstractions for connecting kthreads to file descriptors, which is handy for all sorts of things - the running kthread has its lifetime connected to the file descriptor, which means an asynchronous job running in the kernel can easily exit in response to a ctrl-c, and the file descriptor also provides a communications channel. Signed-off-by: Kent Overstreet --- MAINTAINERS | 9 + fs/bcachefs/Kconfig | 1 + fs/bcachefs/Makefile | 1 - fs/bcachefs/bcachefs.h | 2 +- fs/bcachefs/chardev.c | 14 +- fs/bcachefs/error.c | 4 +- fs/bcachefs/super.c | 14 +- fs/bcachefs/thread_with_file.c | 450 -------------------------------- fs/bcachefs/thread_with_file.h | 76 ------ fs/bcachefs/thread_with_file_types.h | 23 -- include/linux/thread_with_file.h | 79 ++++++ include/linux/thread_with_file_types.h | 25 ++ lib/Kconfig | 3 + lib/Makefile | 1 + lib/thread_with_file.c | 454 +++++++++++++++++++++++++++++++++ 15 files changed, 584 insertions(+), 572 deletions(-) delete mode 100644 fs/bcachefs/thread_with_file.c delete mode 100644 fs/bcachefs/thread_with_file.h delete mode 100644 fs/bcachefs/thread_with_file_types.h create mode 100644 include/linux/thread_with_file.h create mode 100644 include/linux/thread_with_file_types.h create mode 100644 lib/thread_with_file.c diff --git a/MAINTAINERS b/MAINTAINERS index 815537163ade..7b6d5c9588b3 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -21891,6 +21891,15 @@ F: Documentation/userspace-api/media/drivers/thp7312.rst F: drivers/media/i2c/thp7312.c F: include/uapi/linux/thp7312.h +THREAD WITH FILE +M: Kent Overstreet +M: Darrick J. Wong +L: linux-bcachefs@vger.kernel.org +S: Maintained +F: include/linux/thread_with_file.h +F: include/linux/thread_with_file_types.h +F: lib/thread_with_file.c + THUNDERBOLT DMA TRAFFIC TEST DRIVER M: Isaac Hazan L: linux-usb@vger.kernel.org diff --git a/fs/bcachefs/Kconfig b/fs/bcachefs/Kconfig index 8c587ddd2f85..08073d76e5a4 100644 --- a/fs/bcachefs/Kconfig +++ b/fs/bcachefs/Kconfig @@ -25,6 +25,7 @@ config BCACHEFS_FS select SRCU select SYMBOLIC_ERRNAME select TIME_STATS + select THREAD_WITH_FILE help The bcachefs filesystem - a modern, copy on write filesystem, with support for multiple devices, compression, checksumming, etc. diff --git a/fs/bcachefs/Makefile b/fs/bcachefs/Makefile index 7a5ab437c657..2075cef07939 100644 --- a/fs/bcachefs/Makefile +++ b/fs/bcachefs/Makefile @@ -81,7 +81,6 @@ bcachefs-y := \ super-io.o \ sysfs.o \ tests.o \ - thread_with_file.o \ trace.o \ two_state_shared_lock.o \ util.o \ diff --git a/fs/bcachefs/bcachefs.h b/fs/bcachefs/bcachefs.h index 10272ff9cfad..9a1ead1ab38e 100644 --- a/fs/bcachefs/bcachefs.h +++ b/fs/bcachefs/bcachefs.h @@ -200,6 +200,7 @@ #include #include #include +#include #include #include #include @@ -470,7 +471,6 @@ enum bch_time_stats { #include "replicas_types.h" #include "subvolume_types.h" #include "super_types.h" -#include "thread_with_file_types.h" /* Number of nodes btree coalesce will try to coalesce at once */ #define GC_MERGE_NODES 4U diff --git a/fs/bcachefs/chardev.c b/fs/bcachefs/chardev.c index 38defa19d52d..4603fe6d988a 100644 --- a/fs/bcachefs/chardev.c +++ b/fs/bcachefs/chardev.c @@ -11,7 +11,6 @@ #include "replicas.h" #include "super.h" #include "super-io.h" -#include "thread_with_file.h" #include #include @@ -20,6 +19,7 @@ #include #include #include +#include #include /* returns with ref on ca->ref */ @@ -166,9 +166,9 @@ static int bch2_fsck_offline_thread_fn(struct thread_with_stdio *stdio) bch2_fs_stop(c); if (ret & 1) - bch2_stdio_redirect_printf(&stdio->stdio, false, "%s: errors fixed\n", c->name); + stdio_redirect_printf(&stdio->stdio, false, "%s: errors fixed\n", c->name); if (ret & 4) - bch2_stdio_redirect_printf(&stdio->stdio, false, "%s: still has errors\n", c->name); + stdio_redirect_printf(&stdio->stdio, false, "%s: still has errors\n", c->name); return ret; } @@ -230,7 +230,7 @@ static long bch2_ioctl_fsck_offline(struct bch_ioctl_fsck_offline __user *user_a opt_set(thr->opts, stdio, (u64)(unsigned long)&thr->thr.stdio); - ret = bch2_run_thread_with_stdio(&thr->thr, &bch2_offline_fsck_ops); + ret = run_thread_with_stdio(&thr->thr, &bch2_offline_fsck_ops); err: if (ret < 0) { if (thr) @@ -433,7 +433,7 @@ static int bch2_data_job_release(struct inode *inode, struct file *file) { struct bch_data_ctx *ctx = container_of(file->private_data, struct bch_data_ctx, thr); - bch2_thread_with_file_exit(&ctx->thr); + thread_with_file_exit(&ctx->thr); kfree(ctx); return 0; } @@ -483,7 +483,7 @@ static long bch2_ioctl_data(struct bch_fs *c, ctx->c = c; ctx->arg = arg; - ret = bch2_run_thread_with_file(&ctx->thr, + ret = run_thread_with_file(&ctx->thr, &bcachefs_data_ops, bch2_data_thread); if (ret < 0) @@ -851,7 +851,7 @@ static long bch2_ioctl_fsck_online(struct bch_fs *c, goto err; } - ret = bch2_run_thread_with_stdio(&thr->thr, &bch2_online_fsck_ops); + ret = run_thread_with_stdio(&thr->thr, &bch2_online_fsck_ops); err: if (ret < 0) { bch_err_fn(c, ret); diff --git a/fs/bcachefs/error.c b/fs/bcachefs/error.c index 043431206799..8ae95b218e8b 100644 --- a/fs/bcachefs/error.c +++ b/fs/bcachefs/error.c @@ -3,7 +3,7 @@ #include "error.h" #include "recovery.h" #include "super.h" -#include "thread_with_file.h" +#include #define FSCK_ERR_RATELIMIT_NR 10 @@ -111,7 +111,7 @@ static enum ask_yn bch2_fsck_ask_yn(struct bch_fs *c) do { bch2_print(c, " (y,n, or Y,N for all errors of this type) "); - int r = bch2_stdio_redirect_readline(stdio, buf, sizeof(buf) - 1); + int r = stdio_redirect_readline(stdio, buf, sizeof(buf) - 1); if (r < 0) return YN_NO; buf[r] = '\0'; diff --git a/fs/bcachefs/super.c b/fs/bcachefs/super.c index 9288e000d8d9..970d1abb5d7a 100644 --- a/fs/bcachefs/super.c +++ b/fs/bcachefs/super.c @@ -57,7 +57,6 @@ #include "super.h" #include "super-io.h" #include "sysfs.h" -#include "thread_with_file.h" #include "trace.h" #include @@ -69,6 +68,7 @@ #include #include #include +#include #include MODULE_LICENSE("GPL"); @@ -96,7 +96,7 @@ static void bch2_print_maybe_redirect(struct stdio_redirect *stdio, const char * if (fmt[0] == KERN_SOH[0]) fmt += 2; - bch2_stdio_redirect_vprintf(stdio, true, fmt, args); + stdio_redirect_vprintf(stdio, true, fmt, args); return; } #endif @@ -113,16 +113,6 @@ void bch2_print_opts(struct bch_opts *opts, const char *fmt, ...) va_end(args); } -void __bch2_print(struct bch_fs *c, const char *fmt, ...) -{ - struct stdio_redirect *stdio = bch2_fs_stdio_redirect(c); - - va_list args; - va_start(args, fmt); - bch2_print_maybe_redirect(stdio, fmt, args); - va_end(args); -} - #define KTYPE(type) \ static const struct attribute_group type ## _group = { \ .attrs = type ## _files \ diff --git a/fs/bcachefs/thread_with_file.c b/fs/bcachefs/thread_with_file.c deleted file mode 100644 index 940db15d6a93..000000000000 --- a/fs/bcachefs/thread_with_file.c +++ /dev/null @@ -1,450 +0,0 @@ -// SPDX-License-Identifier: GPL-2.0 -#ifndef NO_BCACHEFS_FS - -#include "bcachefs.h" -#include "thread_with_file.h" - -#include -#include -#include -#include -#include -#include - -void bch2_thread_with_file_exit(struct thread_with_file *thr) -{ - if (thr->task) { - kthread_stop(thr->task); - put_task_struct(thr->task); - } -} - -int bch2_run_thread_with_file(struct thread_with_file *thr, - const struct file_operations *fops, - int (*fn)(void *)) -{ - struct file *file = NULL; - int ret, fd = -1; - unsigned fd_flags = O_CLOEXEC; - - if (fops->read && fops->write) - fd_flags |= O_RDWR; - else if (fops->read) - fd_flags |= O_RDONLY; - else if (fops->write) - fd_flags |= O_WRONLY; - - char name[TASK_COMM_LEN]; - get_task_comm(name, current); - - thr->ret = 0; - thr->task = kthread_create(fn, thr, "%s", name); - ret = PTR_ERR_OR_ZERO(thr->task); - if (ret) - return ret; - - ret = get_unused_fd_flags(fd_flags); - if (ret < 0) - goto err; - fd = ret; - - file = anon_inode_getfile(name, fops, thr, fd_flags); - ret = PTR_ERR_OR_ZERO(file); - if (ret) - goto err; - - get_task_struct(thr->task); - wake_up_process(thr->task); - fd_install(fd, file); - return fd; -err: - if (fd >= 0) - put_unused_fd(fd); - if (thr->task) - kthread_stop(thr->task); - return ret; -} - -/* stdio_redirect */ - -static bool stdio_redirect_has_input(struct stdio_redirect *stdio) -{ - return stdio->input.buf.nr || stdio->done; -} - -static bool stdio_redirect_has_output(struct stdio_redirect *stdio) -{ - return stdio->output.buf.nr || stdio->done; -} - -#define STDIO_REDIRECT_BUFSIZE 4096 - -static bool stdio_redirect_has_input_space(struct stdio_redirect *stdio) -{ - return stdio->input.buf.nr < STDIO_REDIRECT_BUFSIZE || stdio->done; -} - -static bool stdio_redirect_has_output_space(struct stdio_redirect *stdio) -{ - return stdio->output.buf.nr < STDIO_REDIRECT_BUFSIZE || stdio->done; -} - -static void stdio_buf_init(struct stdio_buf *buf) -{ - spin_lock_init(&buf->lock); - init_waitqueue_head(&buf->wait); - darray_init(&buf->buf); -} - -/* thread_with_stdio */ - -static void thread_with_stdio_done(struct thread_with_stdio *thr) -{ - thr->thr.done = true; - thr->stdio.done = true; - wake_up(&thr->stdio.input.wait); - wake_up(&thr->stdio.output.wait); -} - -static ssize_t thread_with_stdio_read(struct file *file, char __user *ubuf, - size_t len, loff_t *ppos) -{ - struct thread_with_stdio *thr = - container_of(file->private_data, struct thread_with_stdio, thr); - struct stdio_buf *buf = &thr->stdio.output; - size_t copied = 0, b; - int ret = 0; - - if (!(file->f_flags & O_NONBLOCK)) { - ret = wait_event_interruptible(buf->wait, stdio_redirect_has_output(&thr->stdio)); - if (ret) - return ret; - } else if (!stdio_redirect_has_output(&thr->stdio)) - return -EAGAIN; - - while (len && buf->buf.nr) { - if (fault_in_writeable(ubuf, len) == len) { - ret = -EFAULT; - break; - } - - spin_lock_irq(&buf->lock); - b = min_t(size_t, len, buf->buf.nr); - - if (b && !copy_to_user_nofault(ubuf, buf->buf.data, b)) { - ubuf += b; - len -= b; - copied += b; - buf->buf.nr -= b; - memmove(buf->buf.data, - buf->buf.data + b, - buf->buf.nr); - } - spin_unlock_irq(&buf->lock); - } - - return copied ?: ret; -} - -static int thread_with_stdio_release(struct inode *inode, struct file *file) -{ - struct thread_with_stdio *thr = - container_of(file->private_data, struct thread_with_stdio, thr); - - thread_with_stdio_done(thr); - bch2_thread_with_file_exit(&thr->thr); - darray_exit(&thr->stdio.input.buf); - darray_exit(&thr->stdio.output.buf); - thr->ops->exit(thr); - return 0; -} - -static ssize_t thread_with_stdio_write(struct file *file, const char __user *ubuf, - size_t len, loff_t *ppos) -{ - struct thread_with_stdio *thr = - container_of(file->private_data, struct thread_with_stdio, thr); - struct stdio_buf *buf = &thr->stdio.input; - size_t copied = 0; - ssize_t ret = 0; - - while (len) { - if (thr->thr.done) { - ret = -EPIPE; - break; - } - - size_t b = len - fault_in_readable(ubuf, len); - if (!b) { - ret = -EFAULT; - break; - } - - spin_lock(&buf->lock); - if (buf->buf.nr < STDIO_REDIRECT_BUFSIZE) - darray_make_room_gfp(&buf->buf, - min(b, STDIO_REDIRECT_BUFSIZE - buf->buf.nr), GFP_NOWAIT); - b = min(len, darray_room(buf->buf)); - - if (b && !copy_from_user_nofault(&darray_top(buf->buf), ubuf, b)) { - buf->buf.nr += b; - ubuf += b; - len -= b; - copied += b; - } - spin_unlock(&buf->lock); - - if (b) { - wake_up(&buf->wait); - } else { - if ((file->f_flags & O_NONBLOCK)) { - ret = -EAGAIN; - break; - } - - ret = wait_event_interruptible(buf->wait, - stdio_redirect_has_input_space(&thr->stdio)); - if (ret) - break; - } - } - - return copied ?: ret; -} - -static __poll_t thread_with_stdio_poll(struct file *file, struct poll_table_struct *wait) -{ - struct thread_with_stdio *thr = - container_of(file->private_data, struct thread_with_stdio, thr); - - poll_wait(file, &thr->stdio.output.wait, wait); - poll_wait(file, &thr->stdio.input.wait, wait); - - __poll_t mask = 0; - - if (stdio_redirect_has_output(&thr->stdio)) - mask |= EPOLLIN; - if (stdio_redirect_has_input_space(&thr->stdio)) - mask |= EPOLLOUT; - if (thr->thr.done) - mask |= EPOLLHUP|EPOLLERR; - return mask; -} - -static __poll_t thread_with_stdout_poll(struct file *file, struct poll_table_struct *wait) -{ - struct thread_with_stdio *thr = - container_of(file->private_data, struct thread_with_stdio, thr); - - poll_wait(file, &thr->stdio.output.wait, wait); - - __poll_t mask = 0; - - if (stdio_redirect_has_output(&thr->stdio)) - mask |= EPOLLIN; - if (thr->thr.done) - mask |= EPOLLHUP|EPOLLERR; - return mask; -} - -static int thread_with_stdio_flush(struct file *file, fl_owner_t id) -{ - struct thread_with_stdio *thr = - container_of(file->private_data, struct thread_with_stdio, thr); - - return thr->thr.ret; -} - -static long thread_with_stdio_ioctl(struct file *file, unsigned int cmd, unsigned long p) -{ - struct thread_with_stdio *thr = - container_of(file->private_data, struct thread_with_stdio, thr); - - if (thr->ops->unlocked_ioctl) - return thr->ops->unlocked_ioctl(thr, cmd, p); - return -ENOTTY; -} - -static const struct file_operations thread_with_stdio_fops = { - .llseek = no_llseek, - .read = thread_with_stdio_read, - .write = thread_with_stdio_write, - .poll = thread_with_stdio_poll, - .flush = thread_with_stdio_flush, - .release = thread_with_stdio_release, - .unlocked_ioctl = thread_with_stdio_ioctl, -}; - -static const struct file_operations thread_with_stdout_fops = { - .llseek = no_llseek, - .read = thread_with_stdio_read, - .poll = thread_with_stdout_poll, - .flush = thread_with_stdio_flush, - .release = thread_with_stdio_release, - .unlocked_ioctl = thread_with_stdio_ioctl, -}; - -static int thread_with_stdio_fn(void *arg) -{ - struct thread_with_stdio *thr = arg; - - thr->thr.ret = thr->ops->fn(thr); - - thread_with_stdio_done(thr); - return 0; -} - -int bch2_run_thread_with_stdio(struct thread_with_stdio *thr, - const struct thread_with_stdio_ops *ops) -{ - stdio_buf_init(&thr->stdio.input); - stdio_buf_init(&thr->stdio.output); - thr->ops = ops; - - return bch2_run_thread_with_file(&thr->thr, &thread_with_stdio_fops, thread_with_stdio_fn); -} - -int bch2_run_thread_with_stdout(struct thread_with_stdio *thr, - const struct thread_with_stdio_ops *ops) -{ - stdio_buf_init(&thr->stdio.input); - stdio_buf_init(&thr->stdio.output); - thr->ops = ops; - - return bch2_run_thread_with_file(&thr->thr, &thread_with_stdout_fops, thread_with_stdio_fn); -} -EXPORT_SYMBOL_GPL(bch2_run_thread_with_stdout); - -int bch2_stdio_redirect_read(struct stdio_redirect *stdio, char *ubuf, size_t len) -{ - struct stdio_buf *buf = &stdio->input; - - /* - * we're waiting on user input (or for the file descriptor to be - * closed), don't want a hung task warning: - */ - do { - wait_event_timeout(buf->wait, stdio_redirect_has_input(stdio), - sysctl_hung_task_timeout_secs * HZ / 2); - } while (!stdio_redirect_has_input(stdio)); - - if (stdio->done) - return -1; - - spin_lock(&buf->lock); - int ret = min(len, buf->buf.nr); - buf->buf.nr -= ret; - memcpy(ubuf, buf->buf.data, ret); - memmove(buf->buf.data, - buf->buf.data + ret, - buf->buf.nr); - spin_unlock(&buf->lock); - - wake_up(&buf->wait); - return ret; -} - -int bch2_stdio_redirect_readline(struct stdio_redirect *stdio, char *ubuf, size_t len) -{ - struct stdio_buf *buf = &stdio->input; - size_t copied = 0; - ssize_t ret = 0; -again: - do { - wait_event_timeout(buf->wait, stdio_redirect_has_input(stdio), - sysctl_hung_task_timeout_secs * HZ / 2); - } while (!stdio_redirect_has_input(stdio)); - - if (stdio->done) { - ret = -1; - goto out; - } - - spin_lock(&buf->lock); - size_t b = min(len, buf->buf.nr); - char *n = memchr(buf->buf.data, '\n', b); - if (n) - b = min_t(size_t, b, n + 1 - buf->buf.data); - buf->buf.nr -= b; - memcpy(ubuf, buf->buf.data, b); - memmove(buf->buf.data, - buf->buf.data + b, - buf->buf.nr); - ubuf += b; - len -= b; - copied += b; - spin_unlock(&buf->lock); - - wake_up(&buf->wait); - - if (!n && len) - goto again; -out: - return copied ?: ret; -} - -__printf(3, 0) -static ssize_t bch2_darray_vprintf(darray_char *out, gfp_t gfp, const char *fmt, va_list args) -{ - ssize_t ret; - - do { - va_list args2; - size_t len; - - va_copy(args2, args); - len = vsnprintf(out->data + out->nr, darray_room(*out), fmt, args2); - va_end(args2); - - if (len + 1 <= darray_room(*out)) { - out->nr += len; - return len; - } - - ret = darray_make_room_gfp(out, len + 1, gfp); - } while (ret == 0); - - return ret; -} - -ssize_t bch2_stdio_redirect_vprintf(struct stdio_redirect *stdio, bool nonblocking, - const char *fmt, va_list args) -{ - struct stdio_buf *buf = &stdio->output; - unsigned long flags; - ssize_t ret; - -again: - spin_lock_irqsave(&buf->lock, flags); - ret = bch2_darray_vprintf(&buf->buf, GFP_NOWAIT, fmt, args); - spin_unlock_irqrestore(&buf->lock, flags); - - if (ret < 0) { - if (nonblocking) - return -EAGAIN; - - ret = wait_event_interruptible(buf->wait, - stdio_redirect_has_output_space(stdio)); - if (ret) - return ret; - goto again; - } - - wake_up(&buf->wait); - return ret; -} - -ssize_t bch2_stdio_redirect_printf(struct stdio_redirect *stdio, bool nonblocking, - const char *fmt, ...) -{ - va_list args; - ssize_t ret; - - va_start(args, fmt); - ret = bch2_stdio_redirect_vprintf(stdio, nonblocking, fmt, args); - va_end(args); - - return ret; -} - -#endif /* NO_BCACHEFS_FS */ diff --git a/fs/bcachefs/thread_with_file.h b/fs/bcachefs/thread_with_file.h deleted file mode 100644 index af54ea8f5b0f..000000000000 --- a/fs/bcachefs/thread_with_file.h +++ /dev/null @@ -1,76 +0,0 @@ -/* SPDX-License-Identifier: GPL-2.0 */ -#ifndef _BCACHEFS_THREAD_WITH_FILE_H -#define _BCACHEFS_THREAD_WITH_FILE_H - -#include "thread_with_file_types.h" - -/* - * Thread with file: Run a kthread and connect it to a file descriptor, so that - * it can be interacted with via fd read/write methods and closing the file - * descriptor stops the kthread. - * - * We have two different APIs: - * - * thread_with_file, the low level version. - * You get to define the full file_operations, including your release function, - * which means that you must call bch2_thread_with_file_exit() from your - * .release method - * - * thread_with_stdio, the higher level version - * This implements full piping of input and output, including .poll. - * - * Notes on behaviour: - * - kthread shutdown behaves like writing or reading from a pipe that has been - * closed - * - Input and output buffers are 4096 bytes, although buffers may in some - * situations slightly exceed that limit so as to avoid chopping off a - * message in the middle in nonblocking mode. - * - Input/output buffers are lazily allocated, with GFP_NOWAIT allocations - - * should be fine but might change in future revisions. - * - Output buffer may grow past 4096 bytes to deal with messages that are - * bigger than 4096 bytes - * - Writing may be done blocking or nonblocking; in nonblocking mode, we only - * drop entire messages. - * - * To write, use stdio_redirect_printf() - * To read, use stdio_redirect_read() or stdio_redirect_readline() - */ - -struct task_struct; - -struct thread_with_file { - struct task_struct *task; - int ret; - bool done; -}; - -void bch2_thread_with_file_exit(struct thread_with_file *); -int bch2_run_thread_with_file(struct thread_with_file *, - const struct file_operations *, - int (*fn)(void *)); - -struct thread_with_stdio; - -struct thread_with_stdio_ops { - void (*exit)(struct thread_with_stdio *); - int (*fn)(struct thread_with_stdio *); - long (*unlocked_ioctl)(struct thread_with_stdio *, unsigned int, unsigned long); -}; - -struct thread_with_stdio { - struct thread_with_file thr; - struct stdio_redirect stdio; - const struct thread_with_stdio_ops *ops; -}; - -int bch2_run_thread_with_stdio(struct thread_with_stdio *, - const struct thread_with_stdio_ops *); -int bch2_run_thread_with_stdout(struct thread_with_stdio *, - const struct thread_with_stdio_ops *); -int bch2_stdio_redirect_read(struct stdio_redirect *, char *, size_t); -int bch2_stdio_redirect_readline(struct stdio_redirect *, char *, size_t); - -__printf(3, 0) ssize_t bch2_stdio_redirect_vprintf(struct stdio_redirect *, bool, const char *, va_list); -__printf(3, 4) ssize_t bch2_stdio_redirect_printf(struct stdio_redirect *, bool, const char *, ...); - -#endif /* _BCACHEFS_THREAD_WITH_FILE_H */ diff --git a/fs/bcachefs/thread_with_file_types.h b/fs/bcachefs/thread_with_file_types.h deleted file mode 100644 index 41990756aa26..000000000000 --- a/fs/bcachefs/thread_with_file_types.h +++ /dev/null @@ -1,23 +0,0 @@ -/* SPDX-License-Identifier: GPL-2.0 */ -#ifndef _BCACHEFS_THREAD_WITH_FILE_TYPES_H -#define _BCACHEFS_THREAD_WITH_FILE_TYPES_H - -#include - -struct stdio_buf { - spinlock_t lock; - wait_queue_head_t wait; - darray_char buf; -}; - -struct stdio_redirect { - struct stdio_buf input; - struct stdio_buf output; - - spinlock_t input_lock; - wait_queue_head_t input_wait; - darray_char input_buf; - bool done; -}; - -#endif /* _BCACHEFS_THREAD_WITH_FILE_TYPES_H */ diff --git a/include/linux/thread_with_file.h b/include/linux/thread_with_file.h new file mode 100644 index 000000000000..cf44337af3e9 --- /dev/null +++ b/include/linux/thread_with_file.h @@ -0,0 +1,79 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +/* + * (C) 2022-2024 Kent Overstreet + */ +#ifndef _LINUX_THREAD_WITH_FILE_H +#define _LINUX_THREAD_WITH_FILE_H + +#include + +/* + * Thread with file: Run a kthread and connect it to a file descriptor, so that + * it can be interacted with via fd read/write methods and closing the file + * descriptor stops the kthread. + * + * We have two different APIs: + * + * thread_with_file, the low level version. + * You get to define the full file_operations, including your release function, + * which means that you must call thread_with_file_exit() from your + * .release method + * + * thread_with_stdio, the higher level version + * This implements full piping of input and output, including .poll. + * + * Notes on behaviour: + * - kthread shutdown behaves like writing or reading from a pipe that has been + * closed + * - Input and output buffers are 4096 bytes, although buffers may in some + * situations slightly exceed that limit so as to avoid chopping off a + * message in the middle in nonblocking mode. + * - Input/output buffers are lazily allocated, with GFP_NOWAIT allocations - + * should be fine but might change in future revisions. + * - Output buffer may grow past 4096 bytes to deal with messages that are + * bigger than 4096 bytes + * - Writing may be done blocking or nonblocking; in nonblocking mode, we only + * drop entire messages. + * + * To write, use stdio_redirect_printf() + * To read, use stdio_redirect_read() or stdio_redirect_readline() + */ + +struct task_struct; + +struct thread_with_file { + struct task_struct *task; + int ret; + bool done; +}; + +void thread_with_file_exit(struct thread_with_file *); +int run_thread_with_file(struct thread_with_file *, + const struct file_operations *, + int (*fn)(void *)); + +struct thread_with_stdio; + +struct thread_with_stdio_ops { + void (*exit)(struct thread_with_stdio *); + int (*fn)(struct thread_with_stdio *); + long (*unlocked_ioctl)(struct thread_with_stdio *, unsigned int, unsigned long); +}; + +struct thread_with_stdio { + struct thread_with_file thr; + struct stdio_redirect stdio; + const struct thread_with_stdio_ops *ops; +}; + +int run_thread_with_stdio(struct thread_with_stdio *, + const struct thread_with_stdio_ops *); +int run_thread_with_stdout(struct thread_with_stdio *, + const struct thread_with_stdio_ops *); +int stdio_redirect_read(struct stdio_redirect *, char *, size_t); +int stdio_redirect_readline(struct stdio_redirect *, char *, size_t); + +__printf(3, 0) ssize_t stdio_redirect_vprintf(struct stdio_redirect *, bool, const char *, va_list); +__printf(3, 4) ssize_t stdio_redirect_printf(struct stdio_redirect *, bool, const char *, ...); + +#endif /* _LINUX_THREAD_WITH_FILE_H */ diff --git a/include/linux/thread_with_file_types.h b/include/linux/thread_with_file_types.h new file mode 100644 index 000000000000..98d0ad125322 --- /dev/null +++ b/include/linux/thread_with_file_types.h @@ -0,0 +1,25 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +#ifndef _LINUX_THREAD_WITH_FILE_TYPES_H +#define _LINUX_THREAD_WITH_FILE_TYPES_H + +#include +#include +#include + +struct stdio_buf { + spinlock_t lock; + wait_queue_head_t wait; + darray_char buf; +}; + +struct stdio_redirect { + struct stdio_buf input; + struct stdio_buf output; + + spinlock_t input_lock; + wait_queue_head_t input_wait; + darray_char input_buf; + bool done; +}; + +#endif /* _LINUX_THREAD_WITH_FILE_TYPES_H */ diff --git a/lib/Kconfig b/lib/Kconfig index 3ba8b965f8c7..9258d04e939d 100644 --- a/lib/Kconfig +++ b/lib/Kconfig @@ -789,3 +789,6 @@ config FIRMWARE_TABLE config TIME_STATS tristate select MEAN_AND_VARIANCE + +config THREAD_WITH_FILE + tristate diff --git a/lib/Makefile b/lib/Makefile index 830907bb8fc8..e77304f69df0 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -371,6 +371,7 @@ obj-$(CONFIG_SBITMAP) += sbitmap.o obj-$(CONFIG_PARMAN) += parman.o obj-$(CONFIG_TIME_STATS) += time_stats.o +obj-$(CONFIG_THREAD_WITH_FILE) += thread_with_file.o obj-y += group_cpus.o diff --git a/lib/thread_with_file.c b/lib/thread_with_file.c new file mode 100644 index 000000000000..4f60ce7287cc --- /dev/null +++ b/lib/thread_with_file.c @@ -0,0 +1,454 @@ +// SPDX-License-Identifier: GPL-2.0 +/* + * (C) 2022-2024 Kent Overstreet + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* stdio_redirect */ + +#define STDIO_REDIRECT_BUFSIZE 4096 + +static bool stdio_redirect_has_input(struct stdio_redirect *stdio) +{ + return stdio->input.buf.nr || stdio->done; +} + +static bool stdio_redirect_has_output(struct stdio_redirect *stdio) +{ + return stdio->output.buf.nr || stdio->done; +} + +static bool stdio_redirect_has_input_space(struct stdio_redirect *stdio) +{ + return stdio->input.buf.nr < STDIO_REDIRECT_BUFSIZE || stdio->done; +} + +static bool stdio_redirect_has_output_space(struct stdio_redirect *stdio) +{ + return stdio->output.buf.nr < STDIO_REDIRECT_BUFSIZE || stdio->done; +} + +static void stdio_buf_init(struct stdio_buf *buf) +{ + spin_lock_init(&buf->lock); + init_waitqueue_head(&buf->wait); + darray_init(&buf->buf); +} + +int stdio_redirect_read(struct stdio_redirect *stdio, char *ubuf, size_t len) +{ + struct stdio_buf *buf = &stdio->input; + + /* + * we're waiting on user input (or for the file descriptor to be + * closed), don't want a hung task warning: + */ + do { + wait_event_timeout(buf->wait, stdio_redirect_has_input(stdio), + sysctl_hung_task_timeout_secs * HZ / 2); + } while (!stdio_redirect_has_input(stdio)); + + if (stdio->done) + return -1; + + spin_lock(&buf->lock); + int ret = min(len, buf->buf.nr); + memcpy(ubuf, buf->buf.data, ret); + darray_remove_items(&buf->buf, buf->buf.data, ret); + spin_unlock(&buf->lock); + + wake_up(&buf->wait); + return ret; +} +EXPORT_SYMBOL_GPL(stdio_redirect_read); + +int stdio_redirect_readline(struct stdio_redirect *stdio, char *ubuf, size_t len) +{ + struct stdio_buf *buf = &stdio->input; + size_t copied = 0; + ssize_t ret = 0; +again: + do { + wait_event_timeout(buf->wait, stdio_redirect_has_input(stdio), + sysctl_hung_task_timeout_secs * HZ / 2); + } while (!stdio_redirect_has_input(stdio)); + + if (stdio->done) { + ret = -1; + goto out; + } + + spin_lock(&buf->lock); + size_t b = min(len, buf->buf.nr); + char *n = memchr(buf->buf.data, '\n', b); + if (n) + b = min_t(size_t, b, n + 1 - buf->buf.data); + memcpy(ubuf, buf->buf.data, b); + darray_remove_items(&buf->buf, buf->buf.data, b); + ubuf += b; + len -= b; + copied += b; + spin_unlock(&buf->lock); + + wake_up(&buf->wait); + + if (!n && len) + goto again; +out: + return copied ?: ret; +} +EXPORT_SYMBOL_GPL(stdio_redirect_readline); + +__printf(3, 0) +static ssize_t darray_vprintf(darray_char *out, gfp_t gfp, const char *fmt, va_list args) +{ + ssize_t ret; + + do { + va_list args2; + size_t len; + + va_copy(args2, args); + len = vsnprintf(out->data + out->nr, darray_room(*out), fmt, args2); + va_end(args2); + + if (len + 1 <= darray_room(*out)) { + out->nr += len; + return len; + } + + ret = darray_make_room_gfp(out, len + 1, gfp); + } while (ret == 0); + + return ret; +} + +ssize_t stdio_redirect_vprintf(struct stdio_redirect *stdio, bool nonblocking, + const char *fmt, va_list args) +{ + struct stdio_buf *buf = &stdio->output; + unsigned long flags; + ssize_t ret; + +again: + spin_lock_irqsave(&buf->lock, flags); + ret = darray_vprintf(&buf->buf, GFP_NOWAIT, fmt, args); + spin_unlock_irqrestore(&buf->lock, flags); + + if (ret < 0) { + if (nonblocking) + return -EAGAIN; + + ret = wait_event_interruptible(buf->wait, + stdio_redirect_has_output_space(stdio)); + if (ret) + return ret; + goto again; + } + + wake_up(&buf->wait); + return ret; + +} +EXPORT_SYMBOL_GPL(stdio_redirect_vprintf); + +ssize_t stdio_redirect_printf(struct stdio_redirect *stdio, bool nonblocking, + const char *fmt, ...) +{ + + va_list args; + ssize_t ret; + + va_start(args, fmt); + ret = stdio_redirect_vprintf(stdio, nonblocking, fmt, args); + va_end(args); + + return ret; +} +EXPORT_SYMBOL_GPL(stdio_redirect_printf); + +/* thread with file: */ + +void thread_with_file_exit(struct thread_with_file *thr) +{ + if (thr->task) { + kthread_stop(thr->task); + put_task_struct(thr->task); + } +} +EXPORT_SYMBOL_GPL(thread_with_file_exit); + +int run_thread_with_file(struct thread_with_file *thr, + const struct file_operations *fops, + int (*fn)(void *)) +{ + struct file *file = NULL; + int ret, fd = -1; + unsigned fd_flags = O_CLOEXEC; + + if (fops->read && fops->write) + fd_flags |= O_RDWR; + else if (fops->read) + fd_flags |= O_RDONLY; + else if (fops->write) + fd_flags |= O_WRONLY; + + char name[TASK_COMM_LEN]; + get_task_comm(name, current); + + thr->ret = 0; + thr->task = kthread_create(fn, thr, "%s", name); + ret = PTR_ERR_OR_ZERO(thr->task); + if (ret) + return ret; + + ret = get_unused_fd_flags(fd_flags); + if (ret < 0) + goto err; + fd = ret; + + file = anon_inode_getfile(name, fops, thr, fd_flags); + ret = PTR_ERR_OR_ZERO(file); + if (ret) + goto err; + + get_task_struct(thr->task); + wake_up_process(thr->task); + fd_install(fd, file); + return fd; +err: + if (fd >= 0) + put_unused_fd(fd); + if (thr->task) + kthread_stop(thr->task); + return ret; +} +EXPORT_SYMBOL_GPL(run_thread_with_file); + +/* thread_with_stdio */ + +static void thread_with_stdio_done(struct thread_with_stdio *thr) +{ + thr->thr.done = true; + thr->stdio.done = true; + wake_up(&thr->stdio.input.wait); + wake_up(&thr->stdio.output.wait); +} + +static ssize_t thread_with_stdio_read(struct file *file, char __user *ubuf, + size_t len, loff_t *ppos) +{ + struct thread_with_stdio *thr = + container_of(file->private_data, struct thread_with_stdio, thr); + struct stdio_buf *buf = &thr->stdio.output; + size_t copied = 0, b; + int ret = 0; + + if (!(file->f_flags & O_NONBLOCK)) { + ret = wait_event_interruptible(buf->wait, stdio_redirect_has_output(&thr->stdio)); + if (ret) + return ret; + } else if (!stdio_redirect_has_output(&thr->stdio)) + return -EAGAIN; + + while (len && buf->buf.nr) { + if (fault_in_writeable(ubuf, len) == len) { + ret = -EFAULT; + break; + } + + spin_lock_irq(&buf->lock); + b = min_t(size_t, len, buf->buf.nr); + + if (b && !copy_to_user_nofault(ubuf, buf->buf.data, b)) { + ubuf += b; + len -= b; + copied += b; + darray_remove_items(&buf->buf, buf->buf.data, b); + } + spin_unlock_irq(&buf->lock); + } + + return copied ?: ret; +} + +static ssize_t thread_with_stdio_write(struct file *file, const char __user *ubuf, + size_t len, loff_t *ppos) +{ + struct thread_with_stdio *thr = + container_of(file->private_data, struct thread_with_stdio, thr); + struct stdio_buf *buf = &thr->stdio.input; + size_t copied = 0; + ssize_t ret = 0; + + while (len) { + if (thr->thr.done) { + ret = -EPIPE; + break; + } + + size_t b = len - fault_in_readable(ubuf, len); + if (!b) { + ret = -EFAULT; + break; + } + + spin_lock(&buf->lock); + if (buf->buf.nr < STDIO_REDIRECT_BUFSIZE) + darray_make_room_gfp(&buf->buf, + min(b, STDIO_REDIRECT_BUFSIZE - buf->buf.nr), GFP_NOWAIT); + b = min(len, darray_room(buf->buf)); + + if (b && !copy_from_user_nofault(&darray_top(buf->buf), ubuf, b)) { + buf->buf.nr += b; + ubuf += b; + len -= b; + copied += b; + } + spin_unlock(&buf->lock); + + if (b) { + wake_up(&buf->wait); + } else { + if ((file->f_flags & O_NONBLOCK)) { + ret = -EAGAIN; + break; + } + + ret = wait_event_interruptible(buf->wait, + stdio_redirect_has_input_space(&thr->stdio)); + if (ret) + break; + } + } + + return copied ?: ret; +} + +static __poll_t thread_with_stdio_poll(struct file *file, struct poll_table_struct *wait) +{ + struct thread_with_stdio *thr = + container_of(file->private_data, struct thread_with_stdio, thr); + + poll_wait(file, &thr->stdio.output.wait, wait); + poll_wait(file, &thr->stdio.input.wait, wait); + + __poll_t mask = 0; + + if (stdio_redirect_has_output(&thr->stdio)) + mask |= EPOLLIN; + if (stdio_redirect_has_input_space(&thr->stdio)) + mask |= EPOLLOUT; + if (thr->thr.done) + mask |= EPOLLHUP|EPOLLERR; + return mask; +} + +static int thread_with_stdio_release(struct inode *inode, struct file *file) +{ + struct thread_with_stdio *thr = + container_of(file->private_data, struct thread_with_stdio, thr); + + thread_with_stdio_done(thr); + thread_with_file_exit(&thr->thr); + darray_exit(&thr->stdio.input.buf); + darray_exit(&thr->stdio.output.buf); + thr->ops->exit(thr); + return 0; +} + +static __poll_t thread_with_stdout_poll(struct file *file, struct poll_table_struct *wait) +{ + struct thread_with_stdio *thr = + container_of(file->private_data, struct thread_with_stdio, thr); + + poll_wait(file, &thr->stdio.output.wait, wait); + + __poll_t mask = 0; + + if (stdio_redirect_has_output(&thr->stdio)) + mask |= EPOLLIN; + if (thr->thr.done) + mask |= EPOLLHUP|EPOLLERR; + return mask; +} + +static int thread_with_stdio_flush(struct file *file, fl_owner_t id) +{ + struct thread_with_stdio *thr = + container_of(file->private_data, struct thread_with_stdio, thr); + + return thr->thr.ret; +} + +static long thread_with_stdio_ioctl(struct file *file, unsigned int cmd, unsigned long p) +{ + struct thread_with_stdio *thr = + container_of(file->private_data, struct thread_with_stdio, thr); + + if (thr->ops->unlocked_ioctl) + return thr->ops->unlocked_ioctl(thr, cmd, p); + return -ENOTTY; +} + +static const struct file_operations thread_with_stdio_fops = { + .llseek = no_llseek, + .read = thread_with_stdio_read, + .write = thread_with_stdio_write, + .poll = thread_with_stdio_poll, + .flush = thread_with_stdio_flush, + .release = thread_with_stdio_release, + .unlocked_ioctl = thread_with_stdio_ioctl, +}; + +static const struct file_operations thread_with_stdout_fops = { + .llseek = no_llseek, + .read = thread_with_stdio_read, + .poll = thread_with_stdout_poll, + .flush = thread_with_stdio_flush, + .release = thread_with_stdio_release, + .unlocked_ioctl = thread_with_stdio_ioctl, +}; + +static int thread_with_stdio_fn(void *arg) +{ + struct thread_with_stdio *thr = arg; + + thr->thr.ret = thr->ops->fn(thr); + + thread_with_stdio_done(thr); + return 0; +} + +int run_thread_with_stdio(struct thread_with_stdio *thr, + const struct thread_with_stdio_ops *ops) +{ + stdio_buf_init(&thr->stdio.input); + stdio_buf_init(&thr->stdio.output); + thr->ops = ops; + + return run_thread_with_file(&thr->thr, &thread_with_stdio_fops, thread_with_stdio_fn); +} +EXPORT_SYMBOL_GPL(run_thread_with_stdio); + +int run_thread_with_stdout(struct thread_with_stdio *thr, + const struct thread_with_stdio_ops *ops) +{ + stdio_buf_init(&thr->stdio.input); + stdio_buf_init(&thr->stdio.output); + thr->ops = ops; + + return run_thread_with_file(&thr->thr, &thread_with_stdout_fops, thread_with_stdio_fn); +} +EXPORT_SYMBOL_GPL(run_thread_with_stdout); + +MODULE_AUTHOR("Kent Overstreet"); +MODULE_LICENSE("GPL"); -- cgit v1.2.3