diff options
author | Kent Overstreet <kent.overstreet@gmail.com> | 2016-04-12 02:01:01 -0800 |
---|---|---|
committer | Kent Overstreet <kent.overstreet@gmail.com> | 2016-10-07 12:36:08 -0800 |
commit | e139a254a229af7d00612bd9b2080a556d7e7e67 (patch) | |
tree | e05e609c73e51410039f482a443e989debd24d8e | |
parent | 64e2031d6916042baf920e01e5c9b4e212d6c4a2 (diff) |
bcache: better journal pipelining
previously, when a journal entry filled up or was otherwise ready to be written,
we couldn't start on the next journal entry until all outstanding reservations
had finished using the current journal entry.
this fixes that - we'll start the next journal entry immediately, and the write
will happen whenever the last res_put() happens on the old journal entry.
-rw-r--r-- | drivers/md/bcache/journal.c | 844 | ||||
-rw-r--r-- | drivers/md/bcache/journal.h | 15 | ||||
-rw-r--r-- | drivers/md/bcache/journal_types.h | 24 |
3 files changed, 401 insertions, 482 deletions
diff --git a/drivers/md/bcache/journal.c b/drivers/md/bcache/journal.c index 237daedab3b4..6db9dddb09af 100644 --- a/drivers/md/bcache/journal.c +++ b/drivers/md/bcache/journal.c @@ -20,7 +20,18 @@ #include <trace/events/bcache.h> -static void __bch_journal_next_entry(struct journal *); +static void journal_write(struct closure *); +static void journal_reclaim_fast(struct journal *); + +static inline struct journal_buf *journal_cur_buf(struct journal *j) +{ + return j->buf + j->reservations.idx; +} + +static inline struct journal_buf *journal_prev_buf(struct journal *j) +{ + return j->buf + !j->reservations.idx; +} /* Sequence number of oldest dirty journal entry */ @@ -48,12 +59,12 @@ static inline u64 journal_pin_seq(struct journal *j, (_n = bkey_next(k), 1)); \ k = _n) -static inline void bch_journal_add_entry_at(struct journal *j, const void *data, - size_t u64s, unsigned type, - enum btree_id id, unsigned level, - unsigned offset) +static inline void bch_journal_add_entry_at(struct journal_buf *buf, + const void *data, size_t u64s, + unsigned type, enum btree_id id, + unsigned level, unsigned offset) { - struct jset_entry *jkeys = bkey_idx(journal_cur_buf(j)->data, offset); + struct jset_entry *jkeys = bkey_idx(buf->data, offset); jkeys->u64s = cpu_to_le16(u64s); jkeys->btree_id = id; @@ -64,13 +75,14 @@ static inline void bch_journal_add_entry_at(struct journal *j, const void *data, memcpy(jkeys->_data, data, u64s * sizeof(u64)); } -static inline void bch_journal_add_entry(struct journal *j, const void *data, - size_t u64s, unsigned type, - enum btree_id id, unsigned level) +static inline void bch_journal_add_entry(struct journal_buf *buf, + const void *data, size_t u64s, + unsigned type, enum btree_id id, + unsigned level) { - struct jset *jset = journal_cur_buf(j)->data; + struct jset *jset = buf->data; - bch_journal_add_entry_at(j, data, u64s, type, id, level, + bch_journal_add_entry_at(buf, data, u64s, type, id, level, le32_to_cpu(jset->u64s)); le32_add_cpu(&jset->u64s, jset_u64s(u64s)); } @@ -110,13 +122,15 @@ struct bkey_i *bch_journal_find_btree_root(struct cache_set *c, struct jset *j, return k; } -static void bch_journal_add_btree_root(struct journal *j, enum btree_id id, - struct bkey_i *k, unsigned level) +static void bch_journal_add_btree_root(struct journal_buf *buf, + enum btree_id id, struct bkey_i *k, + unsigned level) { - bch_journal_add_entry(j, k, k->k.u64s, JKEYS_BTREE_ROOT, id, level); + bch_journal_add_entry(buf, k, k->k.u64s, JKEYS_BTREE_ROOT, id, level); } -static inline void bch_journal_add_prios(struct journal *j) +static inline void bch_journal_add_prios(struct journal *j, + struct journal_buf *buf) { /* * no prio bucket ptrs yet... XXX should change the allocator so this @@ -125,7 +139,7 @@ static inline void bch_journal_add_prios(struct journal *j) if (!j->nr_prio_buckets) return; - bch_journal_add_entry(j, j->prio_buckets, j->nr_prio_buckets, + bch_journal_add_entry(buf, j->prio_buckets, j->nr_prio_buckets, JKEYS_PRIO_PTRS, 0, 0); } @@ -414,7 +428,7 @@ static int journal_read_bucket(struct cache *ca, struct journal_list *jlist, data = (void *) __get_free_pages(GFP_KERNEL, JOURNAL_BUF_ORDER); if (!data) { mutex_lock(&jlist->cache_set_buffer_lock); - data = c->journal.w[0].data; + data = c->journal.buf[0].data; } pr_debug("reading %u", bucket); @@ -501,7 +515,7 @@ reread: out: ret = entries_found; err: - if (data == c->journal.w[0].data) + if (data == c->journal.buf[0].data) mutex_unlock(&jlist->cache_set_buffer_lock); else free_pages((unsigned long) data, JOURNAL_BUF_ORDER); @@ -804,81 +818,237 @@ void bch_journal_mark(struct cache_set *c, struct list_head *list) } } -static union journal_res_state journal_res_state(unsigned count, - unsigned entry_offset) +static bool journal_entry_is_open(struct journal *j) { - return (union journal_res_state) { - .count = count, - .cur_entry_offset = entry_offset, - }; + return j->reservations.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL; } -static bool journal_entry_is_open(struct journal *j) +static bool journal_entry_not_started(struct journal *j) { - return j->reservations.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL; + return j->reservations.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL; +} + +static inline int journal_state_count(union journal_res_state s, int idx) +{ + return idx == 0 ? s.buf0_count : s.buf1_count; +} + +static inline void journal_state_inc(union journal_res_state *s) +{ + s->buf0_count += s->idx == 0; + s->buf1_count += s->idx == 1; +} + +static void __journal_res_put(struct journal *j, unsigned idx, + bool need_write_just_set) +{ + union journal_res_state s; + + s.v = atomic64_sub_return(((union journal_res_state) { + .buf0_count = idx == 0, + .buf1_count = idx == 1, + }).v, &j->reservations.counter); + + BUG_ON(s.idx != idx && !s.prev_buf_unwritten); + + if (s.idx != idx && !journal_state_count(s, idx)) { + struct cache_set *c = container_of(j, + struct cache_set, journal); + + if (!need_write_just_set && + test_bit(JOURNAL_NEED_WRITE, &j->flags)) + __bch_time_stats_update(j->delay_time, + j->need_write_time); + +#if 0 + closure_call(&j->io, journal_write, NULL, &c->cl); +#else + /* Shut sparse up: */ + closure_init(&j->io, &c->cl); + set_closure_fn(&j->io, journal_write, NULL); + journal_write(&j->io); +#endif + } +} + +static void __bch_journal_next_entry(struct journal *j) +{ + struct journal_entry_pin_list pin_list, *p; + struct jset *jset; + + /* + * The fifo_push() needs to happen at the same time as j->seq is + * incremented for last_seq() to be calculated correctly + */ + BUG_ON(!fifo_push(&j->pin, pin_list)); + p = &fifo_back(&j->pin); + + INIT_LIST_HEAD(&p->list); + atomic_set(&p->count, 1); + + if (test_bit(JOURNAL_REPLAY_DONE, &j->flags)) { + smp_wmb(); + j->cur_pin_list = p; + } + + jset = journal_cur_buf(j)->data; + jset->seq = cpu_to_le64(++j->seq); + jset->u64s = 0; } -enum journal_entry_state { +static enum { JOURNAL_ENTRY_ERROR, JOURNAL_ENTRY_INUSE, JOURNAL_ENTRY_CLOSED, -}; - -static enum journal_entry_state -__journal_entry_close(struct journal *j, u32 val) + JOURNAL_UNLOCKED, +} journal_buf_switch(struct journal *j, bool need_write_just_set) { + struct journal_buf *buf; union journal_res_state old, new; u64 v = atomic64_read(&j->reservations.counter); do { old.v = new.v = v; - if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL || - old.cur_entry_offset == val) - break; + if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL) + return JOURNAL_ENTRY_CLOSED; + + if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) + return JOURNAL_ENTRY_ERROR; + + if (new.prev_buf_unwritten) + return JOURNAL_ENTRY_INUSE; - new.cur_entry_offset = val; + /* + * avoid race between setting buf->data->u64s and + * journal_res_put starting write: + */ + journal_state_inc(&new); + + new.cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL; + new.idx++; + new.prev_buf_unwritten = 1; + + BUG_ON(journal_state_count(new, new.idx)); } while ((v = atomic64_cmpxchg(&j->reservations.counter, old.v, new.v)) != old.v); - if (old.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL) - journal_cur_buf(j)->data->u64s = cpu_to_le32(old.cur_entry_offset); + journal_reclaim_fast(j); - if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) - return JOURNAL_ENTRY_ERROR; + clear_bit(JOURNAL_NEED_WRITE, &j->flags); - return old.count - ? JOURNAL_ENTRY_INUSE - : JOURNAL_ENTRY_CLOSED; -} + buf = &j->buf[old.idx]; + buf->data->u64s = cpu_to_le32(old.cur_entry_offset); + buf->data->last_seq = cpu_to_le64(last_seq(j)); -/* - * Closes the current journal entry so that new reservations cannot be take on - * it - returns true if the count of outstanding reservations is 0. - */ -static enum journal_entry_state journal_entry_close(struct journal *j) -{ - return __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL); + atomic_dec_bug(&fifo_back(&j->pin).count); + __bch_journal_next_entry(j); + + spin_unlock(&j->lock); + + /* ugh - might be called from __journal_res_get() under wait_event() */ + __set_current_state(TASK_RUNNING); + __journal_res_put(j, old.idx, need_write_just_set); + + return JOURNAL_UNLOCKED; } void bch_journal_halt(struct journal *j) { - __journal_entry_close(j, JOURNAL_ENTRY_ERROR_VAL); + union journal_res_state old, new; + u64 v = atomic64_read(&j->reservations.counter); + + do { + old.v = new.v = v; + if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) + return; + + new.cur_entry_offset = JOURNAL_ENTRY_ERROR_VAL; + } while ((v = atomic64_cmpxchg(&j->reservations.counter, + old.v, new.v)) != old.v); wake_up(&j->wait); } -/* Number of u64s we can write to the current journal bucket */ -static void journal_entry_open(struct journal *j) +static unsigned journal_dev_buckets_available(struct journal *j, + struct cache *ca) { - struct journal_buf *w = journal_cur_buf(j); + struct journal_device *ja = &ca->journal; + unsigned nr = bch_nr_journal_buckets(ca->disk_sb.sb); + unsigned next = (ja->cur_idx + 1) % nr; + unsigned available = (ja->last_idx + nr - next) % nr; + + /* + * Hack to avoid a deadlock during journal replay: + * journal replay might require setting a new btree + * root, which requires writing another journal entry - + * thus, if the journal is full (and this happens when + * replaying the first journal bucket's entries) we're + * screwed. + * + * So don't let the journal fill up unless we're in + * replay: + */ + if (test_bit(JOURNAL_REPLAY_DONE, &j->flags)) + available = max((int) available - 2, 0); + + /* + * Don't use the last bucket unless writing the new last_seq + * will make another bucket available: + */ + if (ja->bucket_seq[ja->last_idx] >= last_seq(j)) + available = max((int) available - 1, 0); + + return available; +} + +/* returns number of sectors available for next journal entry: */ +static int journal_entry_sectors(struct journal *j) +{ + struct cache_set *c = container_of(j, struct cache_set, journal); + struct cache *ca; + unsigned i, nr_buckets = UINT_MAX, sectors = JOURNAL_BUF_SECTORS, + nr_devs = 0; + + lockdep_assert_held(&j->lock); + + rcu_read_lock(); + group_for_each_cache_rcu(ca, &c->cache_tiers[0], i) { + nr_devs++; + nr_buckets = min(nr_buckets, + journal_dev_buckets_available(j, ca)); + sectors = min_t(unsigned, sectors, ca->mi.bucket_size); + } + rcu_read_unlock(); + + if (nr_devs < c->opts.metadata_replicas) + return -EROFS; + + if (nr_buckets == UINT_MAX) + nr_buckets = 0; + + return nr_buckets ? sectors : 0; +} + +/* + * should _only_ called from journal_res_get() - when we actually want a + * journal reservation - journal entry is open means journal is dirty: + */ +static int journal_entry_open(struct journal *j) +{ + struct journal_buf *buf = journal_cur_buf(j); ssize_t u64s; + int ret = 0, sectors; lockdep_assert_held(&j->lock); - BUG_ON(journal_entry_is_open(j) || - test_bit(JOURNAL_DIRTY, &j->flags)); + BUG_ON(journal_entry_is_open(j)); - u64s = min_t(size_t, - j->sectors_free << 9, - JOURNAL_BUF_BYTES) / sizeof(u64); + if (!fifo_free(&j->pin)) + return 0; + + sectors = journal_entry_sectors(j); + if (sectors <= 0) + return sectors; + + u64s = (sectors << 9) / sizeof(u64); /* Subtract the journal header */ u64s -= sizeof(struct jset) / sizeof(u64); @@ -891,30 +1061,26 @@ static void journal_entry_open(struct journal *j) u64s -= JSET_KEYS_U64s + j->nr_prio_buckets; u64s = max_t(ssize_t, 0L, u64s); - if (u64s > le32_to_cpu(w->data->u64s)) { + if (u64s > le32_to_cpu(buf->data->u64s)) { union journal_res_state old, new; u64 v = atomic64_read(&j->reservations.counter); /* * Must be set before marking the journal entry as open: - * - * XXX: does this cause any problems if we bail out because of - * JOURNAL_ENTRY_ERROR_VAL? */ j->cur_entry_u64s = u64s; do { old.v = new.v = v; - BUG_ON(old.count); - if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) - break; + return false; /* Handle any already added entries */ - new.cur_entry_offset = le32_to_cpu(w->data->u64s); + new.cur_entry_offset = le32_to_cpu(buf->data->u64s); } while ((v = atomic64_cmpxchg(&j->reservations.counter, old.v, new.v)) != old.v); + ret = 1; wake_up(&j->wait); @@ -923,7 +1089,14 @@ static void journal_entry_open(struct journal *j) j->res_get_blocked_start); j->res_get_blocked_start = 0; } + + if (!old.prev_buf_unwritten) + mod_delayed_work(system_freezable_wq, + &j->write_work, + msecs_to_jiffies(j->delay_ms)); } + + return ret; } void bch_journal_start(struct cache_set *c) @@ -948,6 +1121,11 @@ void bch_journal_start(struct cache_set *c) j->seq++; } + /* + * journal_buf_switch() only inits the next journal entry when it + * closes an open journal entry - the very first journal entry gets + * initialized here: + */ __bch_journal_next_entry(j); /* @@ -955,10 +1133,9 @@ void bch_journal_start(struct cache_set *c) * disk for the next journal entry - this is ok, because these entries * only have to go down with the next journal entry we write: */ - list_for_each_entry(bl, &j->seq_blacklist, list) if (!bl->written) { - bch_journal_add_entry(j, &bl->seq, 1, + bch_journal_add_entry(journal_cur_buf(j), &bl->seq, 1, JKEYS_JOURNAL_SEQ_BLACKLISTED, 0, 0); @@ -967,11 +1144,6 @@ void bch_journal_start(struct cache_set *c) bl->written = true; } - /* - * Recalculate, since we just added entries directly bypassing - * reservations - */ - journal_entry_open(j); spin_unlock(&j->lock); queue_work(system_freezable_wq, &j->reclaim_work); @@ -1267,27 +1439,16 @@ static void journal_reclaim_work(struct work_struct *work) /** * journal_next_bucket - move on to the next journal bucket if possible */ -static int journal_next_bucket(struct cache_set *c) +static int journal_write_alloc(struct journal *j, unsigned sectors) { - struct journal *j = &c->journal; + struct cache_set *c = container_of(j, struct cache_set, journal); struct bkey_s_extent e = bkey_i_to_s_extent(&j->key); struct bch_extent_ptr *ptr; struct cache *ca; - unsigned iter, replicas, nr_devs = 0, - replicas_want = READ_ONCE(c->opts.metadata_replicas); - int ret = 0; - - lockdep_assert_held(&j->lock); - - /* - * only supposed to be called when we're out of space/haven't started a - * new journal entry - */ - BUG_ON(test_bit(JOURNAL_DIRTY, &j->flags)); - - /* We use last_idx() below, make sure it's up to date: */ - journal_reclaim_fast(j); + unsigned iter, replicas, replicas_want = + READ_ONCE(c->opts.metadata_replicas); + spin_lock(&j->lock); rcu_read_lock(); /* @@ -1302,7 +1463,7 @@ static int journal_next_bucket(struct cache_set *c) extent_for_each_ptr_backwards(e, ptr) if (!(ca = PTR_CACHE(c, ptr)) || ca->mi.state != CACHE_ACTIVE || - ca->journal.sectors_free <= j->sectors_free) + ca->journal.sectors_free <= sectors) __bch_extent_drop_ptr(e, ptr); replicas = 0; @@ -1315,10 +1476,7 @@ static int journal_next_bucket(struct cache_set *c) */ group_for_each_cache_rcu(ca, &c->cache_tiers[0], iter) { struct journal_device *ja = &ca->journal; - unsigned next, remaining, nr_buckets = - bch_nr_journal_buckets(ca->disk_sb.sb); - - nr_devs++; + unsigned nr_buckets = bch_nr_journal_buckets(ca->disk_sb.sb); if (replicas >= replicas_want) break; @@ -1327,39 +1485,12 @@ static int journal_next_bucket(struct cache_set *c) * Check that we can use this device, and aren't already using * it: */ - if (bch_extent_has_device(e.c, ca->sb.nr_this_dev)) - continue; - - next = (ja->cur_idx + 1) % nr_buckets; - remaining = (ja->last_idx + nr_buckets - next) % nr_buckets; - - /* - * Hack to avoid a deadlock during journal replay: - * journal replay might require setting a new btree - * root, which requires writing another journal entry - - * thus, if the journal is full (and this happens when - * replaying the first journal bucket's entries) we're - * screwed. - * - * So don't let the journal fill up unless we're in - * replay: - */ - if (test_bit(JOURNAL_REPLAY_DONE, &j->flags)) - remaining = max((int) remaining - 2, 0); - - /* - * Don't use the last bucket unless writing the new last_seq - * will make another bucket available: - */ - if (remaining == 1 && - ja->bucket_seq[ja->last_idx] >= last_seq(j)) - continue; - - if (!remaining) + if (bch_extent_has_device(e.c, ca->sb.nr_this_dev) || + !journal_dev_buckets_available(j, ca)) continue; ja->sectors_free = ca->mi.bucket_size; - ja->cur_idx = next; + ja->cur_idx = (ja->cur_idx + 1) % nr_buckets; ja->bucket_seq[ja->cur_idx] = j->seq; extent_ptr_append(bkey_i_to_extent(&j->key), @@ -1373,60 +1504,58 @@ static int journal_next_bucket(struct cache_set *c) trace_bcache_journal_next_bucket(ca, ja->cur_idx, ja->last_idx); } - /* set j->sectors_free to the min of any device */ - j->sectors_free = UINT_MAX; - - if (replicas >= replicas_want) - extent_for_each_online_device(c, e, ptr, ca) - j->sectors_free = min(j->sectors_free, - ca->journal.sectors_free); - - if (j->sectors_free == UINT_MAX) - j->sectors_free = 0; - - if (!j->sectors_free && nr_devs < replicas_want) - ret = -EROFS; - - journal_entry_open(j); - rcu_read_unlock(); + spin_unlock(&j->lock); - queue_work(system_freezable_wq, &j->reclaim_work); + if (replicas < replicas_want) + return -EROFS; - return ret; + return 0; } -static void __bch_journal_next_entry(struct journal *j) +static void journal_write_compact(struct jset *jset) { - struct journal_entry_pin_list pin_list, *p; - struct jset *jset; - - change_bit(JOURNAL_BUF_IDX, &j->flags); + struct jset_entry *i, *next, *prev = NULL; /* - * The fifo_push() needs to happen at the same time as j->seq is - * incremented for last_seq() to be calculated correctly + * Simple compaction, dropping empty jset_entries (from journal + * reservations that weren't fully used) and merging jset_entries that + * can be. + * + * If we wanted to be really fancy here, we could sort all the keys in + * the jset and drop keys that were overwritten - probably not worth it: */ - BUG_ON(!fifo_push(&j->pin, pin_list)); - p = &fifo_back(&j->pin); + for (i = jset->start; + i < (struct jset_entry *) bkey_idx(jset, le32_to_cpu(jset->u64s)) && + (next = jset_keys_next(i), true); + i = next) { + unsigned u64s = le16_to_cpu(i->u64s); - INIT_LIST_HEAD(&p->list); - atomic_set(&p->count, 1); + /* Empty entry: */ + if (!u64s) + continue; - if (test_bit(JOURNAL_REPLAY_DONE, &j->flags)) { - smp_wmb(); - j->cur_pin_list = p; - } + /* Can we merge with previous entry? */ + if (prev && + i->btree_id == prev->btree_id && + i->level == prev->level && + JKEYS_TYPE(i) == JKEYS_TYPE(prev) && + JKEYS_TYPE(i) == JKEYS_BTREE_KEYS) { + memmove(jset_keys_next(prev), + i->_data, + u64s * sizeof(u64)); + le16_add_cpu(&prev->u64s, u64s); + continue; + } - jset = journal_cur_buf(j)->data; - jset->seq = cpu_to_le64(++j->seq); - jset->u64s = 0; -} + /* Couldn't merge, move i into new position (after prev): */ + prev = prev ? jset_keys_next(prev) : jset->start; + if (i != prev) + memmove(prev, i, jset_u64s(u64s) * sizeof(u64)); + } -static void bch_journal_next_entry(struct journal *j) -{ - __bch_journal_next_entry(j); - journal_entry_open(j); + prev = prev ? jset_keys_next(prev) : jset->start; + jset->u64s = cpu_to_le32((u64 *) prev - jset->_data); } static void journal_write_endio(struct bio *bio) @@ -1453,7 +1582,9 @@ static void journal_write_done(struct closure *cl) __bch_time_stats_update(j->write_time, j->write_start_time); - clear_bit(JOURNAL_IO_IN_FLIGHT, &j->flags); + BUG_ON(!j->reservations.prev_buf_unwritten); + atomic64_sub(((union journal_res_state) { .prev_buf_unwritten = 1 }).v, + &j->reservations.counter); /* * XXX: this is racy, we could technically end up doing the wake up @@ -1473,8 +1604,10 @@ static void journal_write_done(struct closure *cl) closure_wake_up(&w->wait); wake_up(&j->wait); - if (test_bit(JOURNAL_NEED_WRITE, &j->flags)) - mod_delayed_work(system_freezable_wq, &j->write_work, 0); + if (journal_entry_is_open(j)) + mod_delayed_work(system_freezable_wq, &j->write_work, + test_bit(JOURNAL_NEED_WRITE, &j->flags) + ? 0 : msecs_to_jiffies(j->delay_ms)); /* * Updating last_seq_ondisk may let journal_reclaim_work() discard more @@ -1483,79 +1616,28 @@ static void journal_write_done(struct closure *cl) queue_work(system_freezable_wq, &j->reclaim_work); } -static void journal_write_compact(struct jset *jset) -{ - struct jset_entry *i, *next, *prev = NULL; - - /* - * Simple compaction, dropping empty jset_entries (from journal - * reservations that weren't fully used) and merging jset_entries that - * can be. - * - * If we wanted to be really fancy here, we could sort all the keys in - * the jset and drop keys that were overwritten - probably not worth it: - */ - for (i = jset->start; - i < (struct jset_entry *) bkey_idx(jset, le32_to_cpu(jset->u64s)) && - (next = jset_keys_next(i), true); - i = next) { - unsigned u64s = le16_to_cpu(i->u64s); - - /* Empty entry: */ - if (!u64s) - continue; - - /* Can we merge with previous entry? */ - if (prev && - i->btree_id == prev->btree_id && - i->level == prev->level && - JKEYS_TYPE(i) == JKEYS_TYPE(prev) && - JKEYS_TYPE(i) == JKEYS_BTREE_KEYS) { - memmove(jset_keys_next(prev), - i->_data, - u64s * sizeof(u64)); - le16_add_cpu(&prev->u64s, u64s); - continue; - } - - /* Couldn't merge, move i into new position (after prev): */ - prev = prev ? jset_keys_next(prev) : jset->start; - if (i != prev) - memmove(prev, i, jset_u64s(u64s) * sizeof(u64)); - } - - prev = prev ? jset_keys_next(prev) : jset->start; - jset->u64s = cpu_to_le32((u64 *) prev - jset->_data); -} - -static void journal_write_locked(struct closure *cl) - __releases(c->journal.lock) +static void journal_write(struct closure *cl) { struct journal *j = container_of(cl, struct journal, io); struct cache_set *c = container_of(j, struct cache_set, journal); struct cache *ca; - struct journal_buf *w = journal_cur_buf(j); - struct bkey_s_extent e = bkey_i_to_s_extent(&j->key); + struct journal_buf *w = journal_prev_buf(j); + struct bio *bio; struct bch_extent_ptr *ptr; - BKEY_PADDED(k) tmp; unsigned i, sectors; - struct bio *bio; - struct bio_list list; - bio_list_init(&list); - - BUG_ON(j->reservations.count); - BUG_ON(journal_full(j)); - BUG_ON(!test_bit(JOURNAL_DIRTY, &j->flags)); + j->write_start_time = local_clock(); - clear_bit(JOURNAL_NEED_WRITE, &j->flags); - clear_bit(JOURNAL_DIRTY, &j->flags); - cancel_delayed_work(&j->write_work); + bch_journal_add_prios(j, w); - bch_journal_add_prios(j); + spin_lock(&c->btree_root_lock); + for (i = 0; i < BTREE_ID_NR; i++) { + struct btree_root *r = &c->btree_roots[i]; - /* So last_seq is up to date */ - journal_reclaim_fast(j); + if (r->alive) + bch_journal_add_btree_root(w, i, &r->key, r->level); + } + spin_unlock(&c->btree_root_lock); journal_write_compact(w->data); @@ -1563,34 +1645,23 @@ static void journal_write_locked(struct closure *cl) w->data->write_clock = cpu_to_le16(c->prio_clock[WRITE].hand); w->data->magic = cpu_to_le64(jset_magic(&c->disk_sb)); w->data->version = cpu_to_le32(BCACHE_JSET_VERSION); - w->data->last_seq = cpu_to_le64(last_seq(j)); - - /* _after_ last_seq(): */ - spin_lock(&c->btree_root_lock); - - for (i = 0; i < BTREE_ID_NR; i++) { - struct btree_root *r = &c->btree_roots[i]; - - if (r->alive) - bch_journal_add_btree_root(j, i, &r->key, r->level); - } - - spin_unlock(&c->btree_root_lock); SET_JSET_BIG_ENDIAN(w->data, CPU_BIG_ENDIAN); - SET_JSET_CSUM_TYPE(w->data, c->opts.metadata_checksum); - w->data->csum = cpu_to_le64(__csum_set(w->data, - le32_to_cpu(w->data->u64s), - JSET_CSUM_TYPE(w->data))); + w->data->csum = cpu_to_le64(__csum_set(w->data, + le32_to_cpu(w->data->u64s), + JSET_CSUM_TYPE(w->data))); sectors = __set_blocks(w->data, le32_to_cpu(w->data->u64s), block_bytes(c)) * c->sb.block_size; - BUG_ON(sectors > j->sectors_free); - j->sectors_free -= sectors; + if (journal_write_alloc(j, sectors)) { + BUG(); + } + + bch_check_mark_super(c, &j->key, true); - extent_for_each_ptr(e, ptr) { + extent_for_each_ptr(bkey_i_to_s_extent(&j->key), ptr) { rcu_read_lock(); ca = PTR_CACHE(c, ptr); if (ca) @@ -1606,17 +1677,6 @@ static void journal_write_locked(struct closure *cl) BUG_ON(sectors > ca->journal.sectors_free); ca->journal.sectors_free -= sectors; - /* - * If we don't have enough space for a full-sized journal - * entry, go to the next bucket. We do this check after - * the write, so that if our bucket size is really small - * we don't get stuck forever. - */ - if (ca->journal.sectors_free < JOURNAL_BUF_SECTORS) { - j->sectors_free = 0; - ca->journal.sectors_free = 0; - } - bio = &ca->journal.bio; atomic_long_add(sectors, &ca->meta_sectors_written); @@ -1632,81 +1692,17 @@ static void journal_write_locked(struct closure *cl) bch_bio_map(bio, w->data); trace_bcache_journal_write(bio); - bio_list_add(&list, bio); + closure_bio_submit_punt(bio, cl, c); ptr->offset += sectors; ca->journal.bucket_seq[ca->journal.cur_idx] = le64_to_cpu(w->data->seq); } - /* - * Make a copy of the key we're writing to for check_mark_super, since - * journal_next_bucket will change it - */ - bkey_reassemble(&tmp.k, e.s_c); - - atomic_dec_bug(&fifo_back(&j->pin).count); - bch_journal_next_entry(j); - wake_up(&j->wait); - - spin_unlock(&j->lock); - - bch_check_mark_super(c, &tmp.k, true); - - while ((bio = bio_list_pop(&list))) - closure_bio_submit_punt(bio, cl, c); - closure_return_with_destructor(cl, journal_write_done); } -static bool __journal_write(struct journal *j, bool need_write_just_set) - __releases(j->lock) -{ - struct cache_set *c = container_of(j, struct cache_set, journal); - - EBUG_ON(!j->reservations.count && - !test_bit(JOURNAL_DIRTY, &j->flags)); - - if (test_bit(JOURNAL_IO_IN_FLIGHT, &j->flags)) - goto nowrite; - - switch (journal_entry_close(j)) { - case JOURNAL_ENTRY_ERROR: - goto journal_error; - case JOURNAL_ENTRY_INUSE: - goto nowrite; - case JOURNAL_ENTRY_CLOSED: - break; - } - - if (!need_write_just_set && - test_bit(JOURNAL_NEED_WRITE, &j->flags)) - __bch_time_stats_update(j->delay_time, j->need_write_time); - - set_bit(JOURNAL_IO_IN_FLIGHT, &j->flags); - j->write_start_time = local_clock(); - - __set_current_state(TASK_RUNNING); - -#if 0 - closure_call(&j->io, journal_write_locked, NULL, &c->cl); -#else - /* Shut sparse up: */ - closure_init(&j->io, &c->cl); - set_closure_fn(&j->io, journal_write_locked, NULL); - journal_write_locked(&j->io); -#endif - - return true; -journal_error: - closure_wake_up(&journal_cur_buf(j)->wait); -nowrite: - spin_unlock(&j->lock); - return false; -} - -static bool journal_try_write(struct journal *j) - __releases(j->lock) +static void journal_try_write(struct journal *j) { bool set_need_write = false; @@ -1715,15 +1711,7 @@ static bool journal_try_write(struct journal *j) set_need_write = true; } - return __journal_write(j, set_need_write); -} - -static void journal_unlock(struct journal *j) - __releases(j->lock) -{ - if (test_bit(JOURNAL_NEED_WRITE, &j->flags)) - __journal_write(j, false); - else + if (journal_buf_switch(j, set_need_write) != JOURNAL_UNLOCKED) spin_unlock(&j->lock); } @@ -1732,9 +1720,10 @@ static void journal_write_work(struct work_struct *work) struct journal *j = container_of(to_delayed_work(work), struct journal, write_work); spin_lock(&j->lock); - if (test_bit(JOURNAL_DIRTY, &j->flags)) - set_bit(JOURNAL_NEED_WRITE, &j->flags); - journal_unlock(j); + set_bit(JOURNAL_NEED_WRITE, &j->flags); + + if (journal_buf_switch(j, false) != JOURNAL_UNLOCKED) + spin_unlock(&j->lock); } void bch_journal_add_keys(struct journal *j, struct journal_res *res, @@ -1746,7 +1735,7 @@ void bch_journal_add_keys(struct journal *j, struct journal_res *res, BUG_ON(!res->ref); BUG_ON(actual > res->u64s); - bch_journal_add_entry_at(j, k, k->k.u64s, + bch_journal_add_entry_at(&j->buf[res->idx], k, k->k.u64s, JKEYS_BTREE_KEYS, id, level, res->offset); res->offset += actual; @@ -1760,8 +1749,6 @@ void bch_journal_add_keys(struct journal *j, struct journal_res *res, void bch_journal_res_put(struct journal *j, struct journal_res *res, u64 *journal_seq) { - union journal_res_state s; - if (!res->ref) return; @@ -1770,47 +1757,25 @@ void bch_journal_res_put(struct journal *j, struct journal_res *res, while (res->u64s) { unsigned actual = jset_u64s(0); - bch_journal_add_entry_at(j, NULL, 0, JKEYS_BTREE_KEYS, + bch_journal_add_entry_at(&j->buf[res->idx], NULL, 0, + JKEYS_BTREE_KEYS, 0, 0, res->offset); res->offset += actual; res->u64s -= actual; } - if (!test_bit(JOURNAL_DIRTY, &j->flags) && - !test_and_set_bit(JOURNAL_DIRTY, &j->flags)) - queue_delayed_work(system_freezable_wq, - &j->write_work, - msecs_to_jiffies(j->delay_ms)); - if (journal_seq) - *journal_seq = j->seq; + *journal_seq = j->buf[res->idx].data->seq; - s.v = atomic64_sub_return(journal_res_state(1, 0).v, - &j->reservations.counter); - BUG_ON((int) s.count < 0); - - if (!s.count) { - if (test_bit(JOURNAL_NEED_WRITE, &j->flags) && - !test_bit(JOURNAL_IO_IN_FLIGHT, &j->flags)) { - spin_lock(&j->lock); - journal_unlock(j); - } - - wake_up(&j->wait); - } + __journal_res_put(j, res->idx, false); memset(res, 0, sizeof(*res)); } -static inline bool journal_bucket_has_room(struct journal *j) -{ - return (j->sectors_free && fifo_free(&j->pin) > 1); -} - -static inline bool journal_res_get_fast(struct journal *j, - struct journal_res *res, - unsigned u64s_min, - unsigned u64s_max) +static inline int journal_res_get_fast(struct journal *j, + struct journal_res *res, + unsigned u64s_min, + unsigned u64s_max) { union journal_res_state old, new; u64 v = atomic64_read(&j->reservations.counter); @@ -1823,106 +1788,81 @@ static inline bool journal_res_get_fast(struct journal *j, * entry: */ if (old.cur_entry_offset + u64s_min > j->cur_entry_u64s) - return false; + return 0; res->offset = old.cur_entry_offset; res->u64s = min(u64s_max, j->cur_entry_u64s - old.cur_entry_offset); + journal_state_inc(&new); new.cur_entry_offset += res->u64s; - new.count++; } while ((v = atomic64_cmpxchg(&j->reservations.counter, old.v, new.v)) != old.v); res->ref = true; - return true; + res->idx = new.idx; + return 1; } static int __journal_res_get(struct journal *j, struct journal_res *res, unsigned u64s_min, unsigned u64s_max) { struct cache_set *c = container_of(j, struct cache_set, journal); - + int ret; retry: - if (journal_res_get_fast(j, res, u64s_min, u64s_max)) - return 1; - - /* - * If getting a journal reservation failed, either: - * - * - the current journal entry is full (and we should close it - * and write it, so a new one can be opened) - * - * - or, the current journal entry is closed - because we - * allocate/reclaim space before we can open the next one - * (with journal_next_bucket()) - * - * - or, the journal's in an error state. - */ + ret = journal_res_get_fast(j, res, u64s_min, u64s_max); + if (ret) + return ret; spin_lock(&j->lock); -get: /* - * Recheck after taking the lock, so we don't race with another - * thread that just did journal_entry_open() and call - * journal_entry_close() unnecessarily + * Recheck after taking the lock, so we don't race with another thread + * that just did journal_entry_open() and call journal_entry_close() + * unnecessarily */ - if (journal_res_get_fast(j, res, u64s_min, u64s_max)) { + ret = journal_res_get_fast(j, res, u64s_min, u64s_max); + if (ret) { spin_unlock(&j->lock); return 1; } - switch (journal_entry_close(j)) { + /* + * Ok, no more room in the current journal entry - try to start a new + * one: + */ + switch (journal_buf_switch(j, false)) { case JOURNAL_ENTRY_ERROR: spin_unlock(&j->lock); return -EIO; case JOURNAL_ENTRY_INUSE: - /* - * current journal entry still in use, can't do anything - * yet: - */ + /* haven't finished writing out the previous one: */ spin_unlock(&j->lock); + trace_bcache_journal_entry_full(c); goto blocked; case JOURNAL_ENTRY_CLOSED: break; + case JOURNAL_UNLOCKED: + goto retry; } - if (test_bit(JOURNAL_DIRTY, &j->flags)) { - /* - * If the current journal entry isn't empty, try to - * write it - if previous journal write is still in - * flight, we'll have to wait: - */ - - if (journal_try_write(j)) - goto retry; - - trace_bcache_journal_entry_full(c); - } else { - int ret; - - /* Try to get a new journal bucket */ - ret = journal_next_bucket(c); - if (ret) { - spin_unlock(&j->lock); - return ret; - } - - if (journal_bucket_has_room(j)) - goto get; + /* We now have a new, closed journal buf - see if we can open it: */ + ret = journal_entry_open(j); + spin_unlock(&j->lock); - /* Still no room, we have to wait */ + if (ret < 0) + return ret; + if (ret) + goto retry; - spin_unlock(&j->lock); + /* Journal's full, we have to wait */ - /* - * Direct reclaim - can't rely on reclaim from work item - * due to freezing.. - */ - journal_reclaim_work(&j->reclaim_work); + /* + * Direct reclaim - can't rely on reclaim from work item + * due to freezing.. + */ + journal_reclaim_work(&j->reclaim_work); - trace_bcache_journal_full(c); - } + trace_bcache_journal_full(c); blocked: if (!j->res_get_blocked_start) j->res_get_blocked_start = local_clock() ?: 1; @@ -1969,22 +1909,22 @@ void bch_journal_flush_seq_async(struct journal *j, u64 seq, struct closure *par if (!closure_wait(&journal_cur_buf(j)->wait, parent)) BUG(); } else if (seq + 1 == j->seq && - test_bit(JOURNAL_IO_IN_FLIGHT, &j->flags)) { + j->reservations.prev_buf_unwritten) { if (!closure_wait(&journal_prev_buf(j)->wait, parent)) BUG(); smp_mb(); - if (!test_bit(JOURNAL_IO_IN_FLIGHT, &j->flags)) + if (!j->reservations.prev_buf_unwritten) closure_wake_up(&journal_prev_buf(j)->wait); } } if (seq == j->seq) { - BUG_ON(!test_bit(JOURNAL_DIRTY, &j->flags)); + BUG_ON(journal_entry_not_started(j)); journal_try_write(j); } else { - journal_unlock(j); + spin_unlock(&j->lock); } } @@ -2039,7 +1979,7 @@ void bch_journal_flush_async(struct journal *j, struct closure *parent) u64 seq; spin_lock(&j->lock); - if (test_bit(JOURNAL_DIRTY, &j->flags)) { + if (journal_entry_is_open(j)) { seq = j->seq; } else if (j->seq) { seq = j->seq - 1; @@ -2057,7 +1997,7 @@ int bch_journal_flush(struct journal *j) u64 seq; spin_lock(&j->lock); - if (test_bit(JOURNAL_DIRTY, &j->flags)) { + if (journal_entry_is_open(j)) { seq = j->seq; } else if (j->seq) { seq = j->seq - 1; @@ -2072,8 +2012,8 @@ int bch_journal_flush(struct journal *j) void bch_journal_free(struct journal *j) { - free_pages((unsigned long) j->w[1].data, JOURNAL_BUF_ORDER); - free_pages((unsigned long) j->w[0].data, JOURNAL_BUF_ORDER); + free_pages((unsigned long) j->buf[1].data, JOURNAL_BUF_ORDER); + free_pages((unsigned long) j->buf[0].data, JOURNAL_BUF_ORDER); free_fifo(&j->pin); } @@ -2097,12 +2037,13 @@ int bch_journal_alloc(struct journal *j) bkey_extent_init(&j->key); atomic64_set(&j->reservations.counter, - journal_res_state(0, JOURNAL_ENTRY_CLOSED_VAL).v); + ((union journal_res_state) + { .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v); if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) || - !(j->w[0].data = (void *) __get_free_pages(GFP_KERNEL, + !(j->buf[0].data = (void *) __get_free_pages(GFP_KERNEL, JOURNAL_BUF_ORDER)) || - !(j->w[1].data = (void *) __get_free_pages(GFP_KERNEL, + !(j->buf[1].data = (void *) __get_free_pages(GFP_KERNEL, JOURNAL_BUF_ORDER))) return -ENOMEM; @@ -2112,6 +2053,7 @@ int bch_journal_alloc(struct journal *j) ssize_t bch_journal_print_debug(struct journal *j, char *buf) { struct cache_set *c = container_of(j, struct cache_set, journal); + union journal_res_state *s = &j->reservations; struct cache *ca; unsigned iter; ssize_t ret = 0; @@ -2124,7 +2066,6 @@ ssize_t bch_journal_print_debug(struct journal *j, char *buf) "seq:\t\t\t%llu\n" "last_seq:\t\t%llu\n" "last_seq_ondisk:\t%llu\n" - "sectors_free:\t\t%u\n" "reservation count:\t%u\n" "reservation offset:\t%u\n" "current entry u64s:\t%u\n" @@ -2136,13 +2077,12 @@ ssize_t bch_journal_print_debug(struct journal *j, char *buf) j->seq, last_seq(j), j->last_seq_ondisk, - j->sectors_free, - j->reservations.count, - j->reservations.cur_entry_offset, + journal_state_count(*s, s->idx), + s->cur_entry_offset, j->cur_entry_u64s, - test_bit(JOURNAL_IO_IN_FLIGHT, &j->flags), + s->prev_buf_unwritten, test_bit(JOURNAL_NEED_WRITE, &j->flags), - test_bit(JOURNAL_DIRTY, &j->flags), + journal_entry_is_open(j), test_bit(JOURNAL_REPLAY_DONE, &j->flags)); group_for_each_cache_rcu(ca, &c->cache_tiers[0], iter) { diff --git a/drivers/md/bcache/journal.h b/drivers/md/bcache/journal.h index 081874a254e2..be2b842b9197 100644 --- a/drivers/md/bcache/journal.h +++ b/drivers/md/bcache/journal.h @@ -169,9 +169,6 @@ static inline void journal_pin_drop(struct journal *j, pin->pin_list = NULL; } -#define journal_full(j) \ - (!(j)->sectors_free || fifo_free(&(j)->pin) <= 1) - struct closure; struct cache_set; struct keylist; @@ -181,18 +178,6 @@ struct bkey_i *bch_journal_find_btree_root(struct cache_set *, struct jset *, int bch_journal_seq_blacklisted(struct cache_set *, u64, struct btree *); -static inline struct journal_buf *journal_cur_buf(struct journal *j) -{ - - return j->w + test_bit(JOURNAL_BUF_IDX, &j->flags); -} - -static inline struct journal_buf *journal_prev_buf(struct journal *j) -{ - - return j->w + !test_bit(JOURNAL_BUF_IDX, &j->flags); -} - void bch_journal_add_keys(struct journal *, struct journal_res *, enum btree_id, const struct bkey_i *, unsigned); diff --git a/drivers/md/bcache/journal_types.h b/drivers/md/bcache/journal_types.h index e0d949c546bc..0633dc3f5c95 100644 --- a/drivers/md/bcache/journal_types.h +++ b/drivers/md/bcache/journal_types.h @@ -53,6 +53,7 @@ struct journal_seq_blacklist { struct journal_res { bool ref; + u8 idx; u16 offset; u16 u64s; }; @@ -67,32 +68,28 @@ union journal_res_state { }; struct { - unsigned count; - unsigned cur_entry_offset; + u64 cur_entry_offset:16, + idx:1, + prev_buf_unwritten:1, + buf0_count:23, + buf1_count:23; }; }; /* * We stash some journal state as sentinal values in cur_entry_offset: */ -#define JOURNAL_ENTRY_CLOSED_VAL ((u32) S32_MAX) -#define JOURNAL_ENTRY_ERROR_VAL ((u32) S32_MAX + 1) +#define JOURNAL_ENTRY_CLOSED_VAL (U16_MAX - 1) +#define JOURNAL_ENTRY_ERROR_VAL (U16_MAX) /* - * JOURNAL_DIRTY - current journal entry has stuff in it to write - * * JOURNAL_NEED_WRITE - current (pending) journal entry should be written ASAP, * either because something's waiting on the write to complete or because it's * been dirty too long and the timer's expired. - * - * If JOURNAL_NEED_WRITE is set, JOURNAL_DIRTY must be set. */ enum { - JOURNAL_DIRTY, JOURNAL_NEED_WRITE, - JOURNAL_IO_IN_FLIGHT, - JOURNAL_BUF_IDX, JOURNAL_REPLAY_DONE, }; @@ -109,13 +106,10 @@ struct journal { * Two journal entries -- one is currently open for new entries, the * other is possibly being written out. */ - struct journal_buf w[2]; + struct journal_buf buf[2]; spinlock_t lock; - /* minimum sectors free in the bucket(s) we're currently writing to */ - unsigned sectors_free; - /* Used when waiting because the journal was full */ wait_queue_head_t wait; |