diff options
author | Kent Overstreet <kent.overstreet@gmail.com> | 2019-06-30 16:28:01 -0400 |
---|---|---|
committer | Kent Overstreet <kent.overstreet@gmail.com> | 2020-05-06 17:14:16 -0400 |
commit | ea5715a73506eb929e43b66eb3b87c94e2b44ab4 (patch) | |
tree | a145b47f47c831f20c6ee694995a5f9b7e2e6e31 /fs/bcachefs/journal.c | |
parent | 5f6131b81dfa624673447c41cfb69c151086b802 (diff) |
Merge with 1f431b384d bcachefs: Refactor trans_(get|update)_key
Diffstat (limited to 'fs/bcachefs/journal.c')
-rw-r--r-- | fs/bcachefs/journal.c | 936 |
1 files changed, 540 insertions, 396 deletions
diff --git a/fs/bcachefs/journal.c b/fs/bcachefs/journal.c index addd51f08c9a..5c3e146e3942 100644 --- a/fs/bcachefs/journal.c +++ b/fs/bcachefs/journal.c @@ -1,3 +1,4 @@ +// SPDX-License-Identifier: GPL-2.0 /* * bcachefs journalling code, for btree insertions * @@ -5,7 +6,7 @@ */ #include "bcachefs.h" -#include "alloc.h" +#include "alloc_foreground.h" #include "bkey_methods.h" #include "btree_gc.h" #include "buckets.h" @@ -17,29 +18,14 @@ #include <trace/events/bcachefs.h> -static bool journal_entry_is_open(struct journal *j) +static bool __journal_entry_is_open(union journal_res_state state) { - return j->reservations.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL; + return state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL; } -void bch2_journal_buf_put_slowpath(struct journal *j, bool need_write_just_set) +static bool journal_entry_is_open(struct journal *j) { - struct journal_buf *w = journal_prev_buf(j); - - atomic_dec_bug(&journal_seq_pin(j, le64_to_cpu(w->data->seq))->count); - - if (!need_write_just_set && - test_bit(JOURNAL_NEED_WRITE, &j->flags)) - bch2_time_stats_update(j->delay_time, - j->need_write_time); -#if 0 - closure_call(&j->io, bch2_journal_write, NULL, NULL); -#else - /* Shut sparse up: */ - closure_init(&j->io, NULL); - set_closure_fn(&j->io, bch2_journal_write, NULL); - bch2_journal_write(&j->io); -#endif + return __journal_entry_is_open(j->reservations); } static void journal_pin_new_entry(struct journal *j, int count) @@ -70,41 +56,71 @@ static void bch2_journal_buf_init(struct journal *j) buf->data->u64s = 0; } -static inline size_t journal_entry_u64s_reserve(struct journal_buf *buf) +void bch2_journal_halt(struct journal *j) { - return BTREE_ID_NR * (JSET_KEYS_U64s + BKEY_EXTENT_U64s_MAX); + union journal_res_state old, new; + u64 v = atomic64_read(&j->reservations.counter); + + do { + old.v = new.v = v; + if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) + return; + + new.cur_entry_offset = JOURNAL_ENTRY_ERROR_VAL; + } while ((v = atomic64_cmpxchg(&j->reservations.counter, + old.v, new.v)) != old.v); + + journal_wake(j); + closure_wake_up(&journal_cur_buf(j)->wait); } -static enum { - JOURNAL_ENTRY_ERROR, - JOURNAL_ENTRY_INUSE, - JOURNAL_ENTRY_CLOSED, - JOURNAL_UNLOCKED, -} journal_buf_switch(struct journal *j, bool need_write_just_set) +/* journal entry close/open: */ + +void __bch2_journal_buf_put(struct journal *j, bool need_write_just_set) +{ + if (!need_write_just_set && + test_bit(JOURNAL_NEED_WRITE, &j->flags)) + bch2_time_stats_update(j->delay_time, + j->need_write_time); + + clear_bit(JOURNAL_NEED_WRITE, &j->flags); + + closure_call(&j->io, bch2_journal_write, system_highpri_wq, NULL); +} + +/* + * Returns true if journal entry is now closed: + */ +static bool __journal_entry_close(struct journal *j) { struct bch_fs *c = container_of(j, struct bch_fs, journal); - struct journal_buf *buf; + struct journal_buf *buf = journal_cur_buf(j); union journal_res_state old, new; u64 v = atomic64_read(&j->reservations.counter); + bool set_need_write = false; + unsigned sectors; lockdep_assert_held(&j->lock); do { old.v = new.v = v; if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL) - return JOURNAL_ENTRY_CLOSED; + return true; - if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) - return JOURNAL_ENTRY_ERROR; + if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) { + /* this entry will never be written: */ + closure_wake_up(&buf->wait); + return true; + } - if (new.prev_buf_unwritten) - return JOURNAL_ENTRY_INUSE; + if (!test_bit(JOURNAL_NEED_WRITE, &j->flags)) { + set_bit(JOURNAL_NEED_WRITE, &j->flags); + j->need_write_time = local_clock(); + set_need_write = true; + } - /* - * avoid race between setting buf->data->u64s and - * journal_res_put starting write: - */ - journal_state_inc(&new); + if (new.prev_buf_unwritten) + return false; new.cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL; new.idx++; @@ -114,59 +130,62 @@ static enum { } while ((v = atomic64_cmpxchg(&j->reservations.counter, old.v, new.v)) != old.v); - clear_bit(JOURNAL_NEED_WRITE, &j->flags); - - buf = &j->buf[old.idx]; buf->data->u64s = cpu_to_le32(old.cur_entry_offset); - j->prev_buf_sectors = - vstruct_blocks_plus(buf->data, c->block_bits, - journal_entry_u64s_reserve(buf)) * - c->opts.block_size; - BUG_ON(j->prev_buf_sectors > j->cur_buf_sectors); + sectors = vstruct_blocks_plus(buf->data, c->block_bits, + buf->u64s_reserved) << c->block_bits; + BUG_ON(sectors > buf->sectors); + buf->sectors = sectors; + + bkey_extent_init(&buf->key); - bch2_journal_reclaim_fast(j); - /* XXX: why set this here, and not in bch2_journal_write()? */ + /* + * We have to set last_seq here, _before_ opening a new journal entry: + * + * A threads may replace an old pin with a new pin on their current + * journal reservation - the expectation being that the journal will + * contain either what the old pin protected or what the new pin + * protects. + * + * After the old pin is dropped journal_last_seq() won't include the old + * pin, so we can only write the updated last_seq on the entry that + * contains whatever the new pin protects. + * + * Restated, we can _not_ update last_seq for a given entry if there + * could be a newer entry open with reservations/pins that have been + * taken against it. + * + * Hence, we want update/set last_seq on the current journal entry right + * before we open a new one: + */ buf->data->last_seq = cpu_to_le64(journal_last_seq(j)); + if (journal_entry_empty(buf->data)) + clear_bit(JOURNAL_NOT_EMPTY, &j->flags); + else + set_bit(JOURNAL_NOT_EMPTY, &j->flags); + journal_pin_new_entry(j, 1); bch2_journal_buf_init(j); cancel_delayed_work(&j->write_work); - spin_unlock(&j->lock); - - if (c->bucket_journal_seq > 1 << 14) { - c->bucket_journal_seq = 0; - bch2_bucket_seq_cleanup(c); - } - c->bucket_journal_seq++; + bch2_journal_space_available(j); - /* ugh - might be called from __journal_res_get() under wait_event() */ - __set_current_state(TASK_RUNNING); - bch2_journal_buf_put(j, old.idx, need_write_just_set); - - return JOURNAL_UNLOCKED; + bch2_journal_buf_put(j, old.idx, set_need_write); + return true; } -void bch2_journal_halt(struct journal *j) +static bool journal_entry_close(struct journal *j) { - union journal_res_state old, new; - u64 v = atomic64_read(&j->reservations.counter); - - do { - old.v = new.v = v; - if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) - return; + bool ret; - new.cur_entry_offset = JOURNAL_ENTRY_ERROR_VAL; - } while ((v = atomic64_cmpxchg(&j->reservations.counter, - old.v, new.v)) != old.v); + spin_lock(&j->lock); + ret = __journal_entry_close(j); + spin_unlock(&j->lock); - journal_wake(j); - closure_wake_up(&journal_cur_buf(j)->wait); - closure_wake_up(&journal_prev_buf(j)->wait); + return ret; } /* @@ -174,49 +193,39 @@ void bch2_journal_halt(struct journal *j) * journal reservation - journal entry is open means journal is dirty: * * returns: - * 1: success - * 0: journal currently full (must wait) - * -EROFS: insufficient rw devices - * -EIO: journal error + * 0: success + * -ENOSPC: journal currently full, must invoke reclaim + * -EAGAIN: journal blocked, must wait + * -EROFS: insufficient rw devices or journal error */ static int journal_entry_open(struct journal *j) { struct journal_buf *buf = journal_cur_buf(j); union journal_res_state old, new; - ssize_t u64s; - int sectors; + int u64s; u64 v; lockdep_assert_held(&j->lock); BUG_ON(journal_entry_is_open(j)); - if (!fifo_free(&j->pin)) - return 0; - - sectors = bch2_journal_entry_sectors(j); - if (sectors <= 0) - return sectors; + if (j->blocked) + return -EAGAIN; - buf->disk_sectors = sectors; + if (j->cur_entry_error) + return j->cur_entry_error; - sectors = min_t(unsigned, sectors, buf->size >> 9); - j->cur_buf_sectors = sectors; + BUG_ON(!j->cur_entry_sectors); - u64s = (sectors << 9) / sizeof(u64); + buf->u64s_reserved = j->entry_u64s_reserved; + buf->disk_sectors = j->cur_entry_sectors; + buf->sectors = min(buf->disk_sectors, buf->buf_size >> 9); - /* Subtract the journal header */ - u64s -= sizeof(struct jset) / sizeof(u64); - /* - * Btree roots, prio pointers don't get added until right before we do - * the write: - */ - u64s -= journal_entry_u64s_reserve(buf); - u64s = max_t(ssize_t, 0L, u64s); - - BUG_ON(u64s >= JOURNAL_ENTRY_CLOSED_VAL); + u64s = (int) (buf->sectors << 9) / sizeof(u64) - + journal_entry_overhead(j); + u64s = clamp_t(int, u64s, 0, JOURNAL_ENTRY_CLOSED_VAL - 1); if (u64s <= le32_to_cpu(buf->data->u64s)) - return 0; + return -ENOSPC; /* * Must be set before marking the journal entry as open: @@ -228,10 +237,13 @@ static int journal_entry_open(struct journal *j) old.v = new.v = v; if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) - return -EIO; + return -EROFS; /* Handle any already added entries */ new.cur_entry_offset = le32_to_cpu(buf->data->u64s); + + EBUG_ON(journal_state_count(new, new.idx)); + journal_state_inc(&new); } while ((v = atomic64_cmpxchg(&j->reservations.counter, old.v, new.v)) != old.v); @@ -244,37 +256,29 @@ static int journal_entry_open(struct journal *j) &j->write_work, msecs_to_jiffies(j->write_delay_ms)); journal_wake(j); - return 1; + return 0; } -/* - * returns true if there's nothing to flush and no journal write still in flight - */ -static bool journal_flush_write(struct journal *j) +static bool journal_quiesced(struct journal *j) { - bool ret; - - spin_lock(&j->lock); - ret = !j->reservations.prev_buf_unwritten; - - if (!journal_entry_is_open(j)) { - spin_unlock(&j->lock); - return ret; - } + union journal_res_state state = READ_ONCE(j->reservations); + bool ret = !state.prev_buf_unwritten && !__journal_entry_is_open(state); - set_bit(JOURNAL_NEED_WRITE, &j->flags); - if (journal_buf_switch(j, false) == JOURNAL_UNLOCKED) - ret = false; - else - spin_unlock(&j->lock); + if (!ret) + journal_entry_close(j); return ret; } +static void journal_quiesce(struct journal *j) +{ + wait_event(j->wait, journal_quiesced(j)); +} + static void journal_write_work(struct work_struct *work) { struct journal *j = container_of(work, struct journal, write_work.work); - journal_flush_write(j); + journal_entry_close(j); } /* @@ -302,26 +306,39 @@ u64 bch2_inode_journal_seq(struct journal *j, u64 inode) } static int __journal_res_get(struct journal *j, struct journal_res *res, - unsigned u64s_min, unsigned u64s_max) + unsigned flags) { struct bch_fs *c = container_of(j, struct bch_fs, journal); struct journal_buf *buf; + bool can_discard; int ret; retry: - ret = journal_res_get_fast(j, res, u64s_min, u64s_max); - if (ret) - return ret; + if (journal_res_get_fast(j, res, flags)) + return 0; + + if (bch2_journal_error(j)) + return -EROFS; spin_lock(&j->lock); + /* * Recheck after taking the lock, so we don't race with another thread * that just did journal_entry_open() and call journal_entry_close() * unnecessarily */ - ret = journal_res_get_fast(j, res, u64s_min, u64s_max); - if (ret) { + if (journal_res_get_fast(j, res, flags)) { spin_unlock(&j->lock); - return 1; + return 0; + } + + if (!(flags & JOURNAL_RES_GET_RESERVED) && + !test_bit(JOURNAL_MAY_GET_UNRESERVED, &j->flags)) { + /* + * Don't want to close current journal entry, just need to + * invoke reclaim: + */ + ret = -ENOSPC; + goto unlock; } /* @@ -331,51 +348,58 @@ retry: */ buf = journal_cur_buf(j); if (journal_entry_is_open(j) && - buf->size >> 9 < buf->disk_sectors && - buf->size < JOURNAL_ENTRY_SIZE_MAX) - j->buf_size_want = max(j->buf_size_want, buf->size << 1); + buf->buf_size >> 9 < buf->disk_sectors && + buf->buf_size < JOURNAL_ENTRY_SIZE_MAX) + j->buf_size_want = max(j->buf_size_want, buf->buf_size << 1); - /* - * Close the current journal entry if necessary, then try to start a new - * one: - */ - switch (journal_buf_switch(j, false)) { - case JOURNAL_ENTRY_ERROR: - spin_unlock(&j->lock); - return -EROFS; - case JOURNAL_ENTRY_INUSE: - /* haven't finished writing out the previous one: */ - spin_unlock(&j->lock); + if (journal_entry_is_open(j) && + !__journal_entry_close(j)) { + /* + * We failed to get a reservation on the current open journal + * entry because it's full, and we can't close it because + * there's still a previous one in flight: + */ trace_journal_entry_full(c); - goto blocked; - case JOURNAL_ENTRY_CLOSED: - break; - case JOURNAL_UNLOCKED: - goto retry; + ret = -EAGAIN; + } else { + ret = journal_entry_open(j); } +unlock: + if ((ret == -EAGAIN || ret == -ENOSPC) && + !j->res_get_blocked_start) + j->res_get_blocked_start = local_clock() ?: 1; - /* We now have a new, closed journal buf - see if we can open it: */ - ret = journal_entry_open(j); + can_discard = j->can_discard; spin_unlock(&j->lock); - if (ret < 0) - return ret; - if (ret) + if (!ret) goto retry; - /* Journal's full, we have to wait */ + if (ret == -ENOSPC) { + BUG_ON(!can_discard && (flags & JOURNAL_RES_GET_RESERVED)); - /* - * Direct reclaim - can't rely on reclaim from work item - * due to freezing.. - */ - bch2_journal_reclaim_work(&j->reclaim_work.work); + /* + * Journal is full - can't rely on reclaim from work item due to + * freezing: + */ + trace_journal_full(c); - trace_journal_full(c); -blocked: - if (!j->res_get_blocked_start) - j->res_get_blocked_start = local_clock() ?: 1; - return 0; + if (!(flags & JOURNAL_RES_GET_NONBLOCK)) { + if (can_discard) { + bch2_journal_do_discards(j); + goto retry; + } + + if (mutex_trylock(&j->reclaim_lock)) { + bch2_journal_reclaim(j); + mutex_unlock(&j->reclaim_lock); + } + } + + ret = -EAGAIN; + } + + return ret; } /* @@ -389,16 +413,78 @@ blocked: * btree node write locks. */ int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res, - unsigned u64s_min, unsigned u64s_max) + unsigned flags) { int ret; - wait_event(j->wait, - (ret = __journal_res_get(j, res, u64s_min, - u64s_max))); - return ret < 0 ? ret : 0; + closure_wait_event(&j->async_wait, + (ret = __journal_res_get(j, res, flags)) != -EAGAIN || + (flags & JOURNAL_RES_GET_NONBLOCK)); + return ret; } +/* journal_preres: */ + +static bool journal_preres_available(struct journal *j, + struct journal_preres *res, + unsigned new_u64s) +{ + bool ret = bch2_journal_preres_get_fast(j, res, new_u64s); + + if (!ret) + bch2_journal_reclaim_work(&j->reclaim_work.work); + + return ret; +} + +int __bch2_journal_preres_get(struct journal *j, + struct journal_preres *res, + unsigned new_u64s) +{ + int ret; + + closure_wait_event(&j->preres_wait, + (ret = bch2_journal_error(j)) || + journal_preres_available(j, res, new_u64s)); + return ret; +} + +/* journal_entry_res: */ + +void bch2_journal_entry_res_resize(struct journal *j, + struct journal_entry_res *res, + unsigned new_u64s) +{ + union journal_res_state state; + int d = new_u64s - res->u64s; + + spin_lock(&j->lock); + + j->entry_u64s_reserved += d; + if (d <= 0) + goto out; + + j->cur_entry_u64s = max_t(int, 0, j->cur_entry_u64s - d); + smp_mb(); + state = READ_ONCE(j->reservations); + + if (state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL && + state.cur_entry_offset > j->cur_entry_u64s) { + j->cur_entry_u64s += d; + /* + * Not enough room in current journal entry, have to flush it: + */ + __journal_entry_close(j); + } else { + journal_cur_buf(j)->u64s_reserved += d; + } +out: + spin_unlock(&j->lock); + res->u64s += d; +} + +/* journal flushing: */ + u64 bch2_journal_last_unwritten_seq(struct journal *j) { u64 seq; @@ -420,30 +506,84 @@ u64 bch2_journal_last_unwritten_seq(struct journal *j) * btree root - every journal entry contains the roots of all the btrees, so it * doesn't need to bother with getting a journal reservation */ -int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *parent) +int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *cl) { + struct bch_fs *c = container_of(j, struct bch_fs, journal); int ret; spin_lock(&j->lock); - BUG_ON(seq > journal_cur_seq(j)); - if (seq < journal_cur_seq(j) || + /* + * Can't try to open more than one sequence number ahead: + */ + BUG_ON(journal_cur_seq(j) < seq && !journal_entry_is_open(j)); + + if (journal_cur_seq(j) > seq || journal_entry_is_open(j)) { spin_unlock(&j->lock); - return 1; + return 0; } - ret = journal_entry_open(j); - if (!ret) - closure_wait(&j->async_wait, parent); + if (journal_cur_seq(j) < seq && + !__journal_entry_close(j)) { + /* haven't finished writing out the previous one: */ + trace_journal_entry_full(c); + ret = -EAGAIN; + } else { + BUG_ON(journal_cur_seq(j) != seq); + + ret = journal_entry_open(j); + } + + if ((ret == -EAGAIN || ret == -ENOSPC) && + !j->res_get_blocked_start) + j->res_get_blocked_start = local_clock() ?: 1; + + if (ret == -EAGAIN || ret == -ENOSPC) + closure_wait(&j->async_wait, cl); + spin_unlock(&j->lock); - if (!ret) + if (ret == -ENOSPC) { + trace_journal_full(c); bch2_journal_reclaim_work(&j->reclaim_work.work); + ret = -EAGAIN; + } return ret; } +static int journal_seq_error(struct journal *j, u64 seq) +{ + union journal_res_state state = READ_ONCE(j->reservations); + + if (seq == journal_cur_seq(j)) + return bch2_journal_error(j); + + if (seq + 1 == journal_cur_seq(j) && + !state.prev_buf_unwritten && + seq > j->seq_ondisk) + return -EIO; + + return 0; +} + +static inline struct journal_buf * +journal_seq_to_buf(struct journal *j, u64 seq) +{ + /* seq should be for a journal entry that has been opened: */ + BUG_ON(seq > journal_cur_seq(j)); + BUG_ON(seq == journal_cur_seq(j) && + j->reservations.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL); + + if (seq == journal_cur_seq(j)) + return journal_cur_buf(j); + if (seq + 1 == journal_cur_seq(j) && + j->reservations.prev_buf_unwritten) + return journal_prev_buf(j); + return NULL; +} + /** * bch2_journal_wait_on_seq - wait for a journal entry to be written * @@ -452,31 +592,22 @@ int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *pare * can wait for an arbitrary amount of time (up to @j->write_delay_ms, which is * configurable). */ -void bch2_journal_wait_on_seq(struct journal *j, u64 seq, struct closure *parent) +void bch2_journal_wait_on_seq(struct journal *j, u64 seq, + struct closure *parent) { - spin_lock(&j->lock); - - BUG_ON(seq > journal_cur_seq(j)); + struct journal_buf *buf; - if (bch2_journal_error(j)) { - spin_unlock(&j->lock); - return; - } + spin_lock(&j->lock); - if (seq == journal_cur_seq(j)) { - if (!closure_wait(&journal_cur_buf(j)->wait, parent)) - BUG(); - } else if (seq + 1 == journal_cur_seq(j) && - j->reservations.prev_buf_unwritten) { - if (!closure_wait(&journal_prev_buf(j)->wait, parent)) + if ((buf = journal_seq_to_buf(j, seq))) { + if (!closure_wait(&buf->wait, parent)) BUG(); - smp_mb(); - - /* check if raced with write completion (or failure) */ - if (!j->reservations.prev_buf_unwritten || - bch2_journal_error(j)) - closure_wake_up(&journal_prev_buf(j)->wait); + if (seq == journal_cur_seq(j)) { + smp_mb(); + if (bch2_journal_error(j)) + closure_wake_up(&buf->wait); + } } spin_unlock(&j->lock); @@ -488,107 +619,32 @@ void bch2_journal_wait_on_seq(struct journal *j, u64 seq, struct closure *parent * like bch2_journal_wait_on_seq, except that it triggers a write immediately if * necessary */ -void bch2_journal_flush_seq_async(struct journal *j, u64 seq, struct closure *parent) +void bch2_journal_flush_seq_async(struct journal *j, u64 seq, + struct closure *parent) { struct journal_buf *buf; spin_lock(&j->lock); - BUG_ON(seq > journal_cur_seq(j)); - - if (bch2_journal_error(j)) { - spin_unlock(&j->lock); - return; - } - - if (seq == journal_cur_seq(j)) { - bool set_need_write = false; - - buf = journal_cur_buf(j); - - if (parent && !closure_wait(&buf->wait, parent)) - BUG(); - - if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) { - j->need_write_time = local_clock(); - set_need_write = true; - } - - switch (journal_buf_switch(j, set_need_write)) { - case JOURNAL_ENTRY_ERROR: - if (parent) - closure_wake_up(&buf->wait); - break; - case JOURNAL_ENTRY_CLOSED: - /* - * Journal entry hasn't been opened yet, but caller - * claims it has something - */ - BUG(); - case JOURNAL_ENTRY_INUSE: - break; - case JOURNAL_UNLOCKED: - return; - } - } else if (parent && - seq + 1 == journal_cur_seq(j) && - j->reservations.prev_buf_unwritten) { - buf = journal_prev_buf(j); - + if (parent && + (buf = journal_seq_to_buf(j, seq))) if (!closure_wait(&buf->wait, parent)) BUG(); - smp_mb(); - - /* check if raced with write completion (or failure) */ - if (!j->reservations.prev_buf_unwritten || - bch2_journal_error(j)) - closure_wake_up(&buf->wait); - } - + if (seq == journal_cur_seq(j)) + __journal_entry_close(j); spin_unlock(&j->lock); } static int journal_seq_flushed(struct journal *j, u64 seq) { - struct journal_buf *buf; - int ret = 1; + int ret; spin_lock(&j->lock); - BUG_ON(seq > journal_cur_seq(j)); - - if (seq == journal_cur_seq(j)) { - bool set_need_write = false; - - ret = 0; - - buf = journal_cur_buf(j); - - if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) { - j->need_write_time = local_clock(); - set_need_write = true; - } - - switch (journal_buf_switch(j, set_need_write)) { - case JOURNAL_ENTRY_ERROR: - ret = -EIO; - break; - case JOURNAL_ENTRY_CLOSED: - /* - * Journal entry hasn't been opened yet, but caller - * claims it has something - */ - BUG(); - case JOURNAL_ENTRY_INUSE: - break; - case JOURNAL_UNLOCKED: - return 0; - } - } else if (seq + 1 == journal_cur_seq(j) && - j->reservations.prev_buf_unwritten) { - ret = bch2_journal_error(j); - } + ret = seq <= j->seq_ondisk ? 1 : journal_seq_error(j, seq); + if (seq == journal_cur_seq(j)) + __journal_entry_close(j); spin_unlock(&j->lock); return ret; @@ -612,11 +668,10 @@ int bch2_journal_flush_seq(struct journal *j, u64 seq) void bch2_journal_meta_async(struct journal *j, struct closure *parent) { struct journal_res res; - unsigned u64s = jset_u64s(0); memset(&res, 0, sizeof(res)); - bch2_journal_res_get(j, &res, u64s, u64s); + bch2_journal_res_get(j, &res, jset_u64s(0), 0); bch2_journal_res_put(j, &res); bch2_journal_flush_seq_async(j, res.seq, parent); @@ -625,12 +680,11 @@ void bch2_journal_meta_async(struct journal *j, struct closure *parent) int bch2_journal_meta(struct journal *j) { struct journal_res res; - unsigned u64s = jset_u64s(0); int ret; memset(&res, 0, sizeof(res)); - ret = bch2_journal_res_get(j, &res, u64s, u64s); + ret = bch2_journal_res_get(j, &res, jset_u64s(0), 0); if (ret) return ret; @@ -683,6 +737,26 @@ int bch2_journal_flush(struct journal *j) return bch2_journal_flush_seq(j, seq); } +/* block/unlock the journal: */ + +void bch2_journal_unblock(struct journal *j) +{ + spin_lock(&j->lock); + j->blocked--; + spin_unlock(&j->lock); + + journal_wake(j); +} + +void bch2_journal_block(struct journal *j) +{ + spin_lock(&j->lock); + j->blocked++; + spin_unlock(&j->lock); + + journal_quiesce(j); +} + /* allocate journal on a device: */ static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr, @@ -705,10 +779,14 @@ static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr, goto err; journal_buckets = bch2_sb_resize_journal(&ca->disk_sb, - nr + sizeof(*journal_buckets) / sizeof(u64)); + nr + sizeof(*journal_buckets) / sizeof(u64)); if (!journal_buckets) goto err; + /* + * We may be called from the device add path, before the new device has + * actually been added to the running filesystem: + */ if (c) spin_lock(&c->journal.lock); @@ -722,58 +800,58 @@ static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr, while (ja->nr < nr) { struct open_bucket *ob = NULL; + unsigned pos; long bucket; if (new_fs) { - percpu_down_read_preempt_disable(&c->usage_lock); bucket = bch2_bucket_alloc_new_fs(ca); - percpu_up_read_preempt_enable(&c->usage_lock); - if (bucket < 0) { ret = -ENOSPC; goto err; } } else { - int ob_idx = bch2_bucket_alloc(c, ca, RESERVE_ALLOC, false, cl); - if (ob_idx < 0) { + ob = bch2_bucket_alloc(c, ca, RESERVE_ALLOC, + false, cl); + if (IS_ERR(ob)) { ret = cl ? -EAGAIN : -ENOSPC; goto err; } - ob = c->open_buckets + ob_idx; bucket = sector_to_bucket(ca, ob->ptr.offset); } if (c) { - percpu_down_read_preempt_disable(&c->usage_lock); + percpu_down_read(&c->mark_lock); spin_lock(&c->journal.lock); } - __array_insert_item(ja->buckets, ja->nr, ja->last_idx); - __array_insert_item(ja->bucket_seq, ja->nr, ja->last_idx); - __array_insert_item(journal_buckets->buckets, ja->nr, ja->last_idx); + pos = ja->nr ? (ja->cur_idx + 1) % ja->nr : 0; + __array_insert_item(ja->buckets, ja->nr, pos); + __array_insert_item(ja->bucket_seq, ja->nr, pos); + __array_insert_item(journal_buckets->buckets, ja->nr, pos); + ja->nr++; - ja->buckets[ja->last_idx] = bucket; - ja->bucket_seq[ja->last_idx] = 0; - journal_buckets->buckets[ja->last_idx] = cpu_to_le64(bucket); + ja->buckets[pos] = bucket; + ja->bucket_seq[pos] = 0; + journal_buckets->buckets[pos] = cpu_to_le64(bucket); - if (ja->last_idx < ja->nr) { - if (ja->cur_idx >= ja->last_idx) - ja->cur_idx++; - ja->last_idx++; - } - ja->nr++; + if (pos <= ja->discard_idx) + ja->discard_idx = (ja->discard_idx + 1) % ja->nr; + if (pos <= ja->dirty_idx_ondisk) + ja->dirty_idx_ondisk = (ja->dirty_idx_ondisk + 1) % ja->nr; + if (pos <= ja->dirty_idx) + ja->dirty_idx = (ja->dirty_idx + 1) % ja->nr; + if (pos <= ja->cur_idx) + ja->cur_idx = (ja->cur_idx + 1) % ja->nr; bch2_mark_metadata_bucket(c, ca, bucket, BCH_DATA_JOURNAL, - ca->mi.bucket_size, - gc_phase(GC_PHASE_SB), - new_fs - ? BCH_BUCKET_MARK_MAY_MAKE_UNAVAILABLE - : 0); + ca->mi.bucket_size, + gc_phase(GC_PHASE_SB), + 0); if (c) { spin_unlock(&c->journal.lock); - percpu_up_read_preempt_enable(&c->usage_lock); + percpu_up_read(&c->mark_lock); } if (!new_fs) @@ -818,7 +896,7 @@ int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca, */ if (bch2_disk_reservation_get(c, &disk_res, - bucket_to_sector(ca, nr - ja->nr), 1, 0)) { + bucket_to_sector(ca, nr - ja->nr), 1, 0)) { mutex_unlock(&c->sb_lock); return -ENOSPC; } @@ -875,54 +953,90 @@ static bool bch2_journal_writing_to_device(struct journal *j, unsigned dev_idx) void bch2_dev_journal_stop(struct journal *j, struct bch_dev *ca) { - spin_lock(&j->lock); - bch2_extent_drop_device(bkey_i_to_s_extent(&j->key), ca->dev_idx); - spin_unlock(&j->lock); - wait_event(j->wait, !bch2_journal_writing_to_device(j, ca->dev_idx)); } void bch2_fs_journal_stop(struct journal *j) { - wait_event(j->wait, journal_flush_write(j)); + struct bch_fs *c = container_of(j, struct bch_fs, journal); + + bch2_journal_flush_all_pins(j); + + wait_event(j->wait, journal_entry_close(j)); + + /* do we need to write another journal entry? */ + if (test_bit(JOURNAL_NOT_EMPTY, &j->flags) || + c->btree_roots_dirty) + bch2_journal_meta(j); + + journal_quiesce(j); + + BUG_ON(!bch2_journal_error(j) && + test_bit(JOURNAL_NOT_EMPTY, &j->flags)); cancel_delayed_work_sync(&j->write_work); cancel_delayed_work_sync(&j->reclaim_work); } -void bch2_fs_journal_start(struct journal *j) +int bch2_fs_journal_start(struct journal *j, u64 cur_seq, + struct list_head *journal_entries) { - struct journal_seq_blacklist *bl; - u64 blacklist = 0; + struct bch_fs *c = container_of(j, struct bch_fs, journal); + struct journal_entry_pin_list *p; + struct journal_replay *i; + u64 last_seq = cur_seq, nr, seq; + + if (!list_empty(journal_entries)) + last_seq = le64_to_cpu(list_first_entry(journal_entries, + struct journal_replay, + list)->j.seq); + + nr = cur_seq - last_seq; + + if (nr + 1 > j->pin.size) { + free_fifo(&j->pin); + init_fifo(&j->pin, roundup_pow_of_two(nr + 1), GFP_KERNEL); + if (!j->pin.data) { + bch_err(c, "error reallocating journal fifo (%llu open entries)", nr); + return -ENOMEM; + } + } + + j->replay_journal_seq = last_seq; + j->replay_journal_seq_end = cur_seq; + j->last_seq_ondisk = last_seq; + j->pin.front = last_seq; + j->pin.back = cur_seq; + atomic64_set(&j->seq, cur_seq - 1); + + fifo_for_each_entry_ptr(p, &j->pin, seq) { + INIT_LIST_HEAD(&p->list); + INIT_LIST_HEAD(&p->flushed); + atomic_set(&p->count, 1); + p->devs.nr = 0; + } + + list_for_each_entry(i, journal_entries, list) { + seq = le64_to_cpu(i->j.seq); - list_for_each_entry(bl, &j->seq_blacklist, list) - blacklist = max(blacklist, bl->end); + BUG_ON(seq < last_seq || seq >= cur_seq); + + journal_seq_pin(j, seq)->devs = i->devs; + } spin_lock(&j->lock); set_bit(JOURNAL_STARTED, &j->flags); - while (journal_cur_seq(j) < blacklist) - journal_pin_new_entry(j, 0); - - /* - * journal_buf_switch() only inits the next journal entry when it - * closes an open journal entry - the very first journal entry gets - * initialized here: - */ journal_pin_new_entry(j, 1); bch2_journal_buf_init(j); - spin_unlock(&j->lock); + c->last_bucket_seq_cleanup = journal_cur_seq(j); - /* - * Adding entries to the next journal entry before allocating space on - * disk for the next journal entry - this is ok, because these entries - * only have to go down with the next journal entry we write: - */ - bch2_journal_seq_blacklist_write(j); + bch2_journal_space_available(j); + spin_unlock(&j->lock); - queue_delayed_work(system_freezable_wq, &j->reclaim_work, 0); + return 0; } /* init/exit: */ @@ -968,8 +1082,8 @@ int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb) void bch2_fs_journal_exit(struct journal *j) { - kvpfree(j->buf[1].data, j->buf[1].size); - kvpfree(j->buf[0].data, j->buf[0].size); + kvpfree(j->buf[1].data, j->buf[1].buf_size); + kvpfree(j->buf[0].data, j->buf[0].buf_size); free_fifo(&j->pin); } @@ -986,26 +1100,28 @@ int bch2_fs_journal_init(struct journal *j) init_waitqueue_head(&j->wait); INIT_DELAYED_WORK(&j->write_work, journal_write_work); INIT_DELAYED_WORK(&j->reclaim_work, bch2_journal_reclaim_work); - mutex_init(&j->blacklist_lock); - INIT_LIST_HEAD(&j->seq_blacklist); + init_waitqueue_head(&j->pin_flush_wait); mutex_init(&j->reclaim_lock); + mutex_init(&j->discard_lock); lockdep_init_map(&j->res_map, "journal res", &res_key, 0); - j->buf[0].size = JOURNAL_ENTRY_SIZE_MIN; - j->buf[1].size = JOURNAL_ENTRY_SIZE_MIN; + j->buf[0].buf_size = JOURNAL_ENTRY_SIZE_MIN; + j->buf[1].buf_size = JOURNAL_ENTRY_SIZE_MIN; j->write_delay_ms = 1000; j->reclaim_delay_ms = 100; - bkey_extent_init(&j->key); + /* Btree roots: */ + j->entry_u64s_reserved += + BTREE_ID_NR * (JSET_KEYS_U64s + BKEY_EXTENT_U64s_MAX); atomic64_set(&j->reservations.counter, ((union journal_res_state) { .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v); if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) || - !(j->buf[0].data = kvpmalloc(j->buf[0].size, GFP_KERNEL)) || - !(j->buf[1].data = kvpmalloc(j->buf[1].size, GFP_KERNEL))) { + !(j->buf[0].data = kvpmalloc(j->buf[0].buf_size, GFP_KERNEL)) || + !(j->buf[1].data = kvpmalloc(j->buf[1].buf_size, GFP_KERNEL))) { ret = -ENOMEM; goto out; } @@ -1020,38 +1136,63 @@ out: ssize_t bch2_journal_print_debug(struct journal *j, char *buf) { + struct printbuf out = _PBUF(buf, PAGE_SIZE); struct bch_fs *c = container_of(j, struct bch_fs, journal); - union journal_res_state *s = &j->reservations; + union journal_res_state s; struct bch_dev *ca; unsigned iter; - ssize_t ret = 0; rcu_read_lock(); spin_lock(&j->lock); + s = READ_ONCE(j->reservations); + + pr_buf(&out, + "active journal entries:\t%llu\n" + "seq:\t\t\t%llu\n" + "last_seq:\t\t%llu\n" + "last_seq_ondisk:\t%llu\n" + "prereserved:\t\t%u/%u\n" + "current entry sectors:\t%u\n" + "current entry:\t\t", + fifo_used(&j->pin), + journal_cur_seq(j), + journal_last_seq(j), + j->last_seq_ondisk, + j->prereserved.reserved, + j->prereserved.remaining, + j->cur_entry_sectors); + + switch (s.cur_entry_offset) { + case JOURNAL_ENTRY_ERROR_VAL: + pr_buf(&out, "error\n"); + break; + case JOURNAL_ENTRY_CLOSED_VAL: + pr_buf(&out, "closed\n"); + break; + default: + pr_buf(&out, "%u/%u\n", + s.cur_entry_offset, + j->cur_entry_u64s); + break; + } - ret += scnprintf(buf + ret, PAGE_SIZE - ret, - "active journal entries:\t%llu\n" - "seq:\t\t\t%llu\n" - "last_seq:\t\t%llu\n" - "last_seq_ondisk:\t%llu\n" - "reservation count:\t%u\n" - "reservation offset:\t%u\n" - "current entry u64s:\t%u\n" - "io in flight:\t\t%i\n" - "need write:\t\t%i\n" - "dirty:\t\t\t%i\n" - "replay done:\t\t%i\n", - fifo_used(&j->pin), - journal_cur_seq(j), - journal_last_seq(j), - j->last_seq_ondisk, - journal_state_count(*s, s->idx), - s->cur_entry_offset, - j->cur_entry_u64s, - s->prev_buf_unwritten, - test_bit(JOURNAL_NEED_WRITE, &j->flags), - journal_entry_is_open(j), - test_bit(JOURNAL_REPLAY_DONE, &j->flags)); + pr_buf(&out, + "current entry refs:\t%u\n" + "prev entry unwritten:\t", + journal_state_count(s, s.idx)); + + if (s.prev_buf_unwritten) + pr_buf(&out, "yes, ref %u sectors %u\n", + journal_state_count(s, !s.idx), + journal_prev_buf(j)->sectors); + else + pr_buf(&out, "no\n"); + + pr_buf(&out, + "need write:\t\t%i\n" + "replay done:\t\t%i\n", + test_bit(JOURNAL_NEED_WRITE, &j->flags), + test_bit(JOURNAL_REPLAY_DONE, &j->flags)); for_each_member_device_rcu(ca, c, iter, &c->rw_devs[BCH_DATA_JOURNAL]) { @@ -1060,50 +1201,53 @@ ssize_t bch2_journal_print_debug(struct journal *j, char *buf) if (!ja->nr) continue; - ret += scnprintf(buf + ret, PAGE_SIZE - ret, - "dev %u:\n" - "\tnr\t\t%u\n" - "\tcur_idx\t\t%u (seq %llu)\n" - "\tlast_idx\t%u (seq %llu)\n", - iter, ja->nr, - ja->cur_idx, ja->bucket_seq[ja->cur_idx], - ja->last_idx, ja->bucket_seq[ja->last_idx]); + pr_buf(&out, + "dev %u:\n" + "\tnr\t\t%u\n" + "\tavailable\t%u:%u\n" + "\tdiscard_idx\t\t%u\n" + "\tdirty_idx_ondisk\t%u (seq %llu)\n" + "\tdirty_idx\t\t%u (seq %llu)\n" + "\tcur_idx\t\t%u (seq %llu)\n", + iter, ja->nr, + bch2_journal_dev_buckets_available(j, ja, journal_space_discarded), + ja->sectors_free, + ja->discard_idx, + ja->dirty_idx_ondisk, ja->bucket_seq[ja->dirty_idx_ondisk], + ja->dirty_idx, ja->bucket_seq[ja->dirty_idx], + ja->cur_idx, ja->bucket_seq[ja->cur_idx]); } spin_unlock(&j->lock); rcu_read_unlock(); - return ret; + return out.pos - buf; } ssize_t bch2_journal_print_pins(struct journal *j, char *buf) { + struct printbuf out = _PBUF(buf, PAGE_SIZE); struct journal_entry_pin_list *pin_list; struct journal_entry_pin *pin; - ssize_t ret = 0; u64 i; spin_lock(&j->lock); fifo_for_each_entry_ptr(pin_list, &j->pin, i) { - ret += scnprintf(buf + ret, PAGE_SIZE - ret, - "%llu: count %u\n", - i, atomic_read(&pin_list->count)); + pr_buf(&out, "%llu: count %u\n", + i, atomic_read(&pin_list->count)); list_for_each_entry(pin, &pin_list->list, list) - ret += scnprintf(buf + ret, PAGE_SIZE - ret, - "\t%p %pf\n", - pin, pin->flush); + pr_buf(&out, "\t%p %pf\n", + pin, pin->flush); if (!list_empty(&pin_list->flushed)) - ret += scnprintf(buf + ret, PAGE_SIZE - ret, - "flushed:\n"); + pr_buf(&out, "flushed:\n"); list_for_each_entry(pin, &pin_list->flushed, list) - ret += scnprintf(buf + ret, PAGE_SIZE - ret, - "\t%p %pf\n", - pin, pin->flush); + pr_buf(&out, "\t%p %pf\n", + pin, pin->flush); } spin_unlock(&j->lock); - return ret; + return out.pos - buf; } |