summaryrefslogtreecommitdiff
path: root/libbcache/io.c
diff options
context:
space:
mode:
Diffstat (limited to 'libbcache/io.c')
-rw-r--r--libbcache/io.c1378
1 files changed, 1378 insertions, 0 deletions
diff --git a/libbcache/io.c b/libbcache/io.c
new file mode 100644
index 0000000..7219b65
--- /dev/null
+++ b/libbcache/io.c
@@ -0,0 +1,1378 @@
+/*
+ * Some low level IO code, and hacks for various block layer limitations
+ *
+ * Copyright 2010, 2011 Kent Overstreet <kent.overstreet@gmail.com>
+ * Copyright 2012 Google, Inc.
+ */
+
+#include "bcache.h"
+#include "alloc.h"
+#include "bset.h"
+#include "btree_update.h"
+#include "buckets.h"
+#include "checksum.h"
+#include "compress.h"
+#include "clock.h"
+#include "debug.h"
+#include "error.h"
+#include "extents.h"
+#include "io.h"
+#include "journal.h"
+#include "keylist.h"
+#include "move.h"
+#include "notify.h"
+#include "stats.h"
+#include "super.h"
+
+#include <linux/blkdev.h>
+#include <linux/random.h>
+
+#include <trace/events/bcache.h>
+
+static inline void __bio_inc_remaining(struct bio *bio)
+{
+ bio_set_flag(bio, BIO_CHAIN);
+ smp_mb__before_atomic();
+ atomic_inc(&bio->__bi_remaining);
+}
+
+void bch_generic_make_request(struct bio *bio, struct cache_set *c)
+{
+ if (current->bio_list) {
+ spin_lock(&c->bio_submit_lock);
+ bio_list_add(&c->bio_submit_list, bio);
+ spin_unlock(&c->bio_submit_lock);
+ queue_work(bcache_io_wq, &c->bio_submit_work);
+ } else {
+ generic_make_request(bio);
+ }
+}
+
+void bch_bio_submit_work(struct work_struct *work)
+{
+ struct cache_set *c = container_of(work, struct cache_set,
+ bio_submit_work);
+ struct bio_list bl;
+ struct bio *bio;
+
+ spin_lock(&c->bio_submit_lock);
+ bl = c->bio_submit_list;
+ bio_list_init(&c->bio_submit_list);
+ spin_unlock(&c->bio_submit_lock);
+
+ while ((bio = bio_list_pop(&bl)))
+ generic_make_request(bio);
+}
+
+/* Allocate, free from mempool: */
+
+void bch_bio_free_pages_pool(struct cache_set *c, struct bio *bio)
+{
+ struct bio_vec *bv;
+ unsigned i;
+
+ bio_for_each_segment_all(bv, bio, i)
+ if (bv->bv_page != ZERO_PAGE(0))
+ mempool_free(bv->bv_page, &c->bio_bounce_pages);
+ bio->bi_vcnt = 0;
+}
+
+static void bch_bio_alloc_page_pool(struct cache_set *c, struct bio *bio,
+ bool *using_mempool)
+{
+ struct bio_vec *bv = &bio->bi_io_vec[bio->bi_vcnt++];
+
+ if (likely(!*using_mempool)) {
+ bv->bv_page = alloc_page(GFP_NOIO);
+ if (unlikely(!bv->bv_page)) {
+ mutex_lock(&c->bio_bounce_pages_lock);
+ *using_mempool = true;
+ goto pool_alloc;
+
+ }
+ } else {
+pool_alloc:
+ bv->bv_page = mempool_alloc(&c->bio_bounce_pages, GFP_NOIO);
+ }
+
+ bv->bv_len = PAGE_SIZE;
+ bv->bv_offset = 0;
+}
+
+void bch_bio_alloc_pages_pool(struct cache_set *c, struct bio *bio,
+ size_t bytes)
+{
+ bool using_mempool = false;
+
+ bio->bi_iter.bi_size = bytes;
+
+ while (bio->bi_vcnt < DIV_ROUND_UP(bytes, PAGE_SIZE))
+ bch_bio_alloc_page_pool(c, bio, &using_mempool);
+
+ if (using_mempool)
+ mutex_unlock(&c->bio_bounce_pages_lock);
+}
+
+/* Bios with headers */
+
+static void bch_submit_wbio(struct cache_set *c, struct bch_write_bio *wbio,
+ struct cache *ca, const struct bch_extent_ptr *ptr,
+ bool punt)
+{
+ wbio->ca = ca;
+ wbio->submit_time_us = local_clock_us();
+ wbio->bio.bi_iter.bi_sector = ptr->offset;
+ wbio->bio.bi_bdev = ca ? ca->disk_sb.bdev : NULL;
+
+ if (!ca)
+ bcache_io_error(c, &wbio->bio, "device has been removed");
+ else if (punt)
+ bch_generic_make_request(&wbio->bio, c);
+ else
+ generic_make_request(&wbio->bio);
+}
+
+void bch_submit_wbio_replicas(struct bch_write_bio *wbio, struct cache_set *c,
+ const struct bkey_i *k, bool punt)
+{
+ struct bkey_s_c_extent e = bkey_i_to_s_c_extent(k);
+ const struct bch_extent_ptr *ptr;
+ struct bch_write_bio *n;
+ struct cache *ca;
+
+ wbio->split = false;
+ wbio->c = c;
+
+ extent_for_each_ptr(e, ptr) {
+ rcu_read_lock();
+ ca = PTR_CACHE(c, ptr);
+ if (ca)
+ percpu_ref_get(&ca->ref);
+ rcu_read_unlock();
+
+ if (!ca) {
+ bch_submit_wbio(c, wbio, ca, ptr, punt);
+ break;
+ }
+
+ if (ptr + 1 < &extent_entry_last(e)->ptr) {
+ n = to_wbio(bio_clone_fast(&wbio->bio, GFP_NOIO,
+ &ca->replica_set));
+
+ n->bio.bi_end_io = wbio->bio.bi_end_io;
+ n->bio.bi_private = wbio->bio.bi_private;
+ n->c = c;
+ n->orig = &wbio->bio;
+ n->bounce = false;
+ n->split = true;
+ n->put_bio = true;
+ n->bio.bi_opf = wbio->bio.bi_opf;
+ __bio_inc_remaining(n->orig);
+ } else {
+ n = wbio;
+ }
+
+ if (!journal_flushes_device(ca))
+ n->bio.bi_opf |= REQ_FUA;
+
+ bch_submit_wbio(c, n, ca, ptr, punt);
+ }
+}
+
+/* IO errors */
+
+/* Writes */
+
+static struct workqueue_struct *index_update_wq(struct bch_write_op *op)
+{
+ return op->alloc_reserve == RESERVE_MOVINGGC
+ ? op->c->copygc_wq
+ : op->c->wq;
+}
+
+static void __bch_write(struct closure *);
+
+static void bch_write_done(struct closure *cl)
+{
+ struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
+
+ BUG_ON(!(op->flags & BCH_WRITE_DONE));
+
+ if (!op->error && (op->flags & BCH_WRITE_FLUSH))
+ op->error = bch_journal_error(&op->c->journal);
+
+ bch_disk_reservation_put(op->c, &op->res);
+ percpu_ref_put(&op->c->writes);
+ bch_keylist_free(&op->insert_keys, op->inline_keys);
+ closure_return(cl);
+}
+
+static u64 keylist_sectors(struct keylist *keys)
+{
+ struct bkey_i *k;
+ u64 ret = 0;
+
+ for_each_keylist_key(keys, k)
+ ret += k->k.size;
+
+ return ret;
+}
+
+static int bch_write_index_default(struct bch_write_op *op)
+{
+ struct keylist *keys = &op->insert_keys;
+ struct btree_iter iter;
+ int ret;
+
+ bch_btree_iter_init_intent(&iter, op->c, BTREE_ID_EXTENTS,
+ bkey_start_pos(&bch_keylist_front(keys)->k));
+
+ ret = bch_btree_insert_list_at(&iter, keys, &op->res,
+ NULL, op_journal_seq(op),
+ BTREE_INSERT_NOFAIL);
+ bch_btree_iter_unlock(&iter);
+
+ return ret;
+}
+
+/**
+ * bch_write_index - after a write, update index to point to new data
+ */
+static void bch_write_index(struct closure *cl)
+{
+ struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
+ struct cache_set *c = op->c;
+ struct keylist *keys = &op->insert_keys;
+ unsigned i;
+
+ op->flags |= BCH_WRITE_LOOPED;
+
+ if (!bch_keylist_empty(keys)) {
+ u64 sectors_start = keylist_sectors(keys);
+ int ret = op->index_update_fn(op);
+
+ BUG_ON(keylist_sectors(keys) && !ret);
+
+ op->written += sectors_start - keylist_sectors(keys);
+
+ if (ret) {
+ __bcache_io_error(c, "btree IO error %i", ret);
+ op->error = ret;
+ }
+ }
+
+ for (i = 0; i < ARRAY_SIZE(op->open_buckets); i++)
+ if (op->open_buckets[i]) {
+ bch_open_bucket_put(c,
+ c->open_buckets +
+ op->open_buckets[i]);
+ op->open_buckets[i] = 0;
+ }
+
+ if (!(op->flags & BCH_WRITE_DONE))
+ continue_at(cl, __bch_write, op->io_wq);
+
+ if (!op->error && (op->flags & BCH_WRITE_FLUSH)) {
+ bch_journal_flush_seq_async(&c->journal,
+ *op_journal_seq(op),
+ cl);
+ continue_at(cl, bch_write_done, index_update_wq(op));
+ } else {
+ continue_at_nobarrier(cl, bch_write_done, NULL);
+ }
+}
+
+/**
+ * bch_write_discard - discard range of keys
+ *
+ * Used to implement discard, and to handle when writethrough write hits
+ * a write error on the cache device.
+ */
+static void bch_write_discard(struct closure *cl)
+{
+ struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
+ struct bio *bio = &op->bio->bio;
+ struct bpos end = op->pos;
+
+ end.offset += bio_sectors(bio);
+
+ op->error = bch_discard(op->c, op->pos, end, op->version,
+ &op->res, NULL, NULL);
+}
+
+/*
+ * Convert extents to be inserted to discards after an error:
+ */
+static void bch_write_io_error(struct closure *cl)
+{
+ struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
+
+ if (op->flags & BCH_WRITE_DISCARD_ON_ERROR) {
+ struct bkey_i *src = bch_keylist_front(&op->insert_keys);
+ struct bkey_i *dst = bch_keylist_front(&op->insert_keys);
+
+ /*
+ * Our data write just errored, which means we've got a bunch
+ * of keys to insert that point to data that wasn't
+ * successfully written.
+ *
+ * We don't have to insert those keys but we still have to
+ * invalidate that region of the cache - so, if we just strip
+ * off all the pointers from the keys we'll accomplish just
+ * that.
+ */
+
+ while (src != op->insert_keys.top) {
+ struct bkey_i *n = bkey_next(src);
+
+ set_bkey_val_u64s(&src->k, 0);
+ src->k.type = KEY_TYPE_DISCARD;
+ bkey_copy(dst, src);
+
+ dst = bkey_next(dst);
+ src = n;
+ }
+
+ op->insert_keys.top = dst;
+ op->flags |= BCH_WRITE_DISCARD;
+ } else {
+ /* TODO: We could try to recover from this. */
+ while (!bch_keylist_empty(&op->insert_keys))
+ bch_keylist_pop_front(&op->insert_keys);
+
+ op->error = -EIO;
+ op->flags |= BCH_WRITE_DONE;
+ }
+
+ bch_write_index(cl);
+}
+
+static void bch_write_endio(struct bio *bio)
+{
+ struct closure *cl = bio->bi_private;
+ struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
+ struct bch_write_bio *wbio = to_wbio(bio);
+ struct cache_set *c = wbio->c;
+ struct bio *orig = wbio->orig;
+ struct cache *ca = wbio->ca;
+
+ if (cache_nonfatal_io_err_on(bio->bi_error, ca,
+ "data write"))
+ set_closure_fn(cl, bch_write_io_error, index_update_wq(op));
+
+ bch_account_io_completion_time(ca, wbio->submit_time_us,
+ REQ_OP_WRITE);
+ if (ca)
+ percpu_ref_put(&ca->ref);
+
+ if (bio->bi_error && orig)
+ orig->bi_error = bio->bi_error;
+
+ if (wbio->bounce)
+ bch_bio_free_pages_pool(c, bio);
+
+ if (wbio->put_bio)
+ bio_put(bio);
+
+ if (orig)
+ bio_endio(orig);
+ else
+ closure_put(cl);
+}
+
+static void init_append_extent(struct bch_write_op *op,
+ unsigned compressed_size,
+ unsigned uncompressed_size,
+ unsigned compression_type,
+ u64 csum, unsigned csum_type,
+ struct open_bucket *ob)
+{
+ struct bkey_i_extent *e = bkey_extent_init(op->insert_keys.top);
+
+ op->pos.offset += uncompressed_size;
+ e->k.p = op->pos;
+ e->k.size = uncompressed_size;
+
+ bch_extent_crc_append(e, compressed_size,
+ uncompressed_size,
+ compression_type,
+ csum, csum_type);
+
+ bch_alloc_sectors_append_ptrs(op->c, e, op->nr_replicas,
+ ob, compressed_size);
+
+ bkey_extent_set_cached(&e->k, (op->flags & BCH_WRITE_CACHED));
+ bch_keylist_push(&op->insert_keys);
+}
+
+static int bch_write_extent(struct bch_write_op *op,
+ struct open_bucket *ob,
+ struct bio *orig)
+{
+ struct cache_set *c = op->c;
+ struct bio *bio;
+ struct bch_write_bio *wbio;
+ unsigned key_to_write_offset = op->insert_keys.top_p -
+ op->insert_keys.keys_p;
+ struct bkey_i *key_to_write;
+ unsigned csum_type = c->opts.data_checksum;
+ unsigned compression_type = op->compression_type;
+ int ret;
+
+ /* don't refetch csum type/compression type */
+ barrier();
+
+ /* Need to decompress data? */
+ if ((op->flags & BCH_WRITE_DATA_COMPRESSED) &&
+ (op->crc.uncompressed_size != op->size ||
+ op->crc.compressed_size > ob->sectors_free)) {
+ int ret;
+
+ ret = bch_bio_uncompress_inplace(c, orig, op->size, op->crc);
+ if (ret)
+ return ret;
+
+ op->flags &= ~BCH_WRITE_DATA_COMPRESSED;
+ }
+
+ if (op->flags & BCH_WRITE_DATA_COMPRESSED) {
+ init_append_extent(op,
+ op->crc.compressed_size,
+ op->crc.uncompressed_size,
+ op->crc.compression_type,
+ op->crc.csum,
+ op->crc.csum_type,
+ ob);
+
+ bio = orig;
+ wbio = to_wbio(bio);
+ wbio->orig = NULL;
+ wbio->bounce = false;
+ wbio->put_bio = false;
+ ret = 0;
+ } else if (csum_type != BCH_CSUM_NONE ||
+ compression_type != BCH_COMPRESSION_NONE) {
+ /* all units here in bytes */
+ unsigned total_output = 0, output_available =
+ min(ob->sectors_free << 9, orig->bi_iter.bi_size);
+ u64 csum;
+
+ bio = bio_alloc_bioset(GFP_NOIO,
+ DIV_ROUND_UP(output_available, PAGE_SIZE),
+ &c->bio_write);
+ /*
+ * XXX: can't use mempool for more than
+ * BCH_COMPRESSED_EXTENT_MAX worth of pages
+ */
+ bch_bio_alloc_pages_pool(c, bio, output_available);
+
+ /* copy WRITE_SYNC flag */
+ bio->bi_opf = orig->bi_opf;
+ wbio = to_wbio(bio);
+ wbio->orig = NULL;
+ wbio->bounce = true;
+ wbio->put_bio = true;
+
+ do {
+ unsigned fragment_compression_type = compression_type;
+ size_t dst_len, src_len;
+
+ bch_bio_compress(c, bio, &dst_len,
+ orig, &src_len,
+ &fragment_compression_type);
+
+ BUG_ON(!dst_len || dst_len > bio->bi_iter.bi_size);
+ BUG_ON(!src_len || src_len > orig->bi_iter.bi_size);
+ BUG_ON(dst_len & (block_bytes(c) - 1));
+ BUG_ON(src_len & (block_bytes(c) - 1));
+
+ swap(bio->bi_iter.bi_size, dst_len);
+ csum = bch_checksum_bio(bio, csum_type);
+ swap(bio->bi_iter.bi_size, dst_len);
+
+ init_append_extent(op,
+ dst_len >> 9, src_len >> 9,
+ fragment_compression_type,
+ csum, csum_type, ob);
+
+ total_output += dst_len;
+ bio_advance(bio, dst_len);
+ bio_advance(orig, src_len);
+ } while (bio->bi_iter.bi_size &&
+ orig->bi_iter.bi_size &&
+ !bch_keylist_realloc(&op->insert_keys,
+ op->inline_keys,
+ ARRAY_SIZE(op->inline_keys),
+ BKEY_EXTENT_U64s_MAX));
+
+ BUG_ON(total_output > output_available);
+
+ memset(&bio->bi_iter, 0, sizeof(bio->bi_iter));
+ bio->bi_iter.bi_size = total_output;
+
+ /*
+ * Free unneeded pages after compressing:
+ */
+ while (bio->bi_vcnt * PAGE_SIZE >
+ round_up(bio->bi_iter.bi_size, PAGE_SIZE))
+ mempool_free(bio->bi_io_vec[--bio->bi_vcnt].bv_page,
+ &c->bio_bounce_pages);
+
+ ret = orig->bi_iter.bi_size != 0;
+ } else {
+ bio = bio_next_split(orig, ob->sectors_free, GFP_NOIO,
+ &c->bio_write);
+
+ wbio = to_wbio(bio);
+ wbio->orig = NULL;
+ wbio->bounce = false;
+ wbio->put_bio = bio != orig;
+
+ init_append_extent(op, bio_sectors(bio), bio_sectors(bio),
+ compression_type, 0, csum_type, ob);
+
+ ret = bio != orig;
+ }
+
+ bio->bi_end_io = bch_write_endio;
+ bio->bi_private = &op->cl;
+ bio_set_op_attrs(bio, REQ_OP_WRITE, 0);
+
+ closure_get(bio->bi_private);
+
+ /* might have done a realloc... */
+
+ key_to_write = (void *) (op->insert_keys.keys_p + key_to_write_offset);
+
+ if (!(op->flags & BCH_WRITE_CACHED))
+ bch_check_mark_super(c, key_to_write, false);
+
+#ifndef CONFIG_BCACHE_NO_IO
+ bch_submit_wbio_replicas(to_wbio(bio), c, key_to_write, false);
+#else
+ to_wbio(bio)->ca = NULL;
+ bio_endio(bio);
+#endif
+ return ret;
+}
+
+static void __bch_write(struct closure *cl)
+{
+ struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
+ struct cache_set *c = op->c;
+ struct bio *bio = &op->bio->bio;
+ unsigned open_bucket_nr = 0;
+ struct open_bucket *b;
+ int ret;
+
+ memset(op->open_buckets, 0, sizeof(op->open_buckets));
+
+ if (op->flags & BCH_WRITE_DISCARD) {
+ op->flags |= BCH_WRITE_DONE;
+ bch_write_discard(cl);
+ bio_put(bio);
+ continue_at(cl, bch_write_done, index_update_wq(op));
+ }
+
+ /*
+ * Journal writes are marked REQ_PREFLUSH; if the original write was a
+ * flush, it'll wait on the journal write.
+ */
+ bio->bi_opf &= ~(REQ_PREFLUSH|REQ_FUA);
+
+ do {
+ EBUG_ON(bio->bi_iter.bi_sector != op->pos.offset);
+ EBUG_ON(!bio_sectors(bio));
+
+ if (open_bucket_nr == ARRAY_SIZE(op->open_buckets))
+ continue_at(cl, bch_write_index, index_update_wq(op));
+
+ /* for the device pointers and 1 for the chksum */
+ if (bch_keylist_realloc(&op->insert_keys,
+ op->inline_keys,
+ ARRAY_SIZE(op->inline_keys),
+ BKEY_EXTENT_U64s_MAX))
+ continue_at(cl, bch_write_index, index_update_wq(op));
+
+ b = bch_alloc_sectors_start(c, op->wp, op->nr_replicas,
+ op->alloc_reserve,
+ (op->flags & BCH_WRITE_ALLOC_NOWAIT) ? NULL : cl);
+ EBUG_ON(!b);
+
+ if (unlikely(IS_ERR(b))) {
+ if (unlikely(PTR_ERR(b) != -EAGAIN)) {
+ ret = PTR_ERR(b);
+ goto err;
+ }
+
+ /*
+ * If we already have some keys, must insert them first
+ * before allocating another open bucket. We only hit
+ * this case if open_bucket_nr > 1.
+ */
+ if (!bch_keylist_empty(&op->insert_keys))
+ continue_at(cl, bch_write_index,
+ index_update_wq(op));
+
+ /*
+ * If we've looped, we're running out of a workqueue -
+ * not the bch_write() caller's context - and we don't
+ * want to block the workqueue:
+ */
+ if (op->flags & BCH_WRITE_LOOPED)
+ continue_at(cl, __bch_write, op->io_wq);
+
+ /*
+ * Otherwise, we do want to block the caller on alloc
+ * failure instead of letting it queue up more and more
+ * writes:
+ * XXX: this technically needs a try_to_freeze() -
+ * except that that's not safe because caller may have
+ * issued other IO... hmm..
+ */
+ closure_sync(cl);
+ continue;
+ }
+
+ BUG_ON(b - c->open_buckets == 0 ||
+ b - c->open_buckets > U8_MAX);
+ op->open_buckets[open_bucket_nr++] = b - c->open_buckets;
+
+ ret = bch_write_extent(op, b, bio);
+
+ bch_alloc_sectors_done(c, op->wp, b);
+
+ if (ret < 0)
+ goto err;
+ } while (ret);
+
+ op->flags |= BCH_WRITE_DONE;
+ continue_at(cl, bch_write_index, index_update_wq(op));
+err:
+ if (op->flags & BCH_WRITE_DISCARD_ON_ERROR) {
+ /*
+ * If we were writing cached data, not doing the write is fine
+ * so long as we discard whatever would have been overwritten -
+ * then it's equivalent to doing the write and immediately
+ * reclaiming it.
+ */
+
+ bch_write_discard(cl);
+ } else {
+ /*
+ * Right now we can only error here if we went RO - the
+ * allocation failed, but we already checked for -ENOSPC when we
+ * got our reservation.
+ *
+ * XXX capacity might have changed, but we don't check for that
+ * yet:
+ */
+ op->error = ret;
+ }
+
+ op->flags |= BCH_WRITE_DONE;
+
+ /*
+ * No reason not to insert keys for whatever data was successfully
+ * written (especially for a cmpxchg operation that's moving data
+ * around)
+ */
+ continue_at(cl, !bch_keylist_empty(&op->insert_keys)
+ ? bch_write_index
+ : bch_write_done, index_update_wq(op));
+}
+
+void bch_wake_delayed_writes(unsigned long data)
+{
+ struct cache_set *c = (void *) data;
+ struct bch_write_op *op;
+ unsigned long flags;
+
+ spin_lock_irqsave(&c->foreground_write_pd_lock, flags);
+
+ while ((op = c->write_wait_head)) {
+ if (!test_bit(CACHE_SET_RO, &c->flags) &&
+ !test_bit(CACHE_SET_STOPPING, &c->flags) &&
+ time_after(op->expires, jiffies)) {
+ mod_timer(&c->foreground_write_wakeup, op->expires);
+ break;
+ }
+
+ c->write_wait_head = op->next;
+ if (!c->write_wait_head)
+ c->write_wait_tail = NULL;
+
+ closure_put(&op->cl);
+ }
+
+ spin_unlock_irqrestore(&c->foreground_write_pd_lock, flags);
+}
+
+/**
+ * bch_write - handle a write to a cache device or flash only volume
+ *
+ * This is the starting point for any data to end up in a cache device; it could
+ * be from a normal write, or a writeback write, or a write to a flash only
+ * volume - it's also used by the moving garbage collector to compact data in
+ * mostly empty buckets.
+ *
+ * It first writes the data to the cache, creating a list of keys to be inserted
+ * (if the data won't fit in a single open bucket, there will be multiple keys);
+ * after the data is written it calls bch_journal, and after the keys have been
+ * added to the next journal write they're inserted into the btree.
+ *
+ * It inserts the data in op->bio; bi_sector is used for the key offset, and
+ * op->inode is used for the key inode.
+ *
+ * If op->discard is true, instead of inserting the data it invalidates the
+ * region of the cache represented by op->bio and op->inode.
+ */
+void bch_write(struct closure *cl)
+{
+ struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
+ struct bio *bio = &op->bio->bio;
+ struct cache_set *c = op->c;
+ u64 inode = op->pos.inode;
+
+ trace_bcache_write(c, inode, bio,
+ !(op->flags & BCH_WRITE_CACHED),
+ op->flags & BCH_WRITE_DISCARD);
+
+ if (!percpu_ref_tryget(&c->writes)) {
+ __bcache_io_error(c, "read only");
+ op->error = -EROFS;
+ bch_disk_reservation_put(c, &op->res);
+ closure_return(cl);
+ }
+
+ if (!(op->flags & BCH_WRITE_DISCARD))
+ bch_increment_clock(c, bio_sectors(bio), WRITE);
+
+ if (!(op->flags & BCH_WRITE_DISCARD))
+ bch_mark_foreground_write(c, bio_sectors(bio));
+ else
+ bch_mark_discard(c, bio_sectors(bio));
+
+ /* Don't call bch_next_delay() if rate is >= 1 GB/sec */
+
+ if (c->foreground_write_ratelimit_enabled &&
+ c->foreground_write_pd.rate.rate < (1 << 30) &&
+ !(op->flags & BCH_WRITE_DISCARD) && op->wp->throttle) {
+ unsigned long flags;
+ u64 delay;
+
+ spin_lock_irqsave(&c->foreground_write_pd_lock, flags);
+ bch_ratelimit_increment(&c->foreground_write_pd.rate,
+ bio->bi_iter.bi_size);
+
+ delay = bch_ratelimit_delay(&c->foreground_write_pd.rate);
+
+ if (delay >= HZ / 100) {
+ trace_bcache_write_throttle(c, inode, bio, delay);
+
+ closure_get(&op->cl); /* list takes a ref */
+
+ op->expires = jiffies + delay;
+ op->next = NULL;
+
+ if (c->write_wait_tail)
+ c->write_wait_tail->next = op;
+ else
+ c->write_wait_head = op;
+ c->write_wait_tail = op;
+
+ if (!timer_pending(&c->foreground_write_wakeup))
+ mod_timer(&c->foreground_write_wakeup,
+ op->expires);
+
+ spin_unlock_irqrestore(&c->foreground_write_pd_lock,
+ flags);
+ continue_at(cl, __bch_write, index_update_wq(op));
+ }
+
+ spin_unlock_irqrestore(&c->foreground_write_pd_lock, flags);
+ }
+
+ continue_at_nobarrier(cl, __bch_write, NULL);
+}
+
+void bch_write_op_init(struct bch_write_op *op, struct cache_set *c,
+ struct bch_write_bio *bio, struct disk_reservation res,
+ struct write_point *wp, struct bpos pos,
+ u64 *journal_seq, unsigned flags)
+{
+ op->c = c;
+ op->io_wq = index_update_wq(op);
+ op->bio = bio;
+ op->written = 0;
+ op->error = 0;
+ op->flags = flags;
+ op->compression_type = c->opts.compression;
+ op->nr_replicas = res.nr_replicas;
+ op->alloc_reserve = RESERVE_NONE;
+ op->pos = pos;
+ op->version = 0;
+ op->res = res;
+ op->wp = wp;
+
+ if (journal_seq) {
+ op->journal_seq_p = journal_seq;
+ op->flags |= BCH_WRITE_JOURNAL_SEQ_PTR;
+ } else {
+ op->journal_seq = 0;
+ }
+
+ op->index_update_fn = bch_write_index_default;
+
+ bch_keylist_init(&op->insert_keys,
+ op->inline_keys,
+ ARRAY_SIZE(op->inline_keys));
+
+ if (version_stress_test(c))
+ get_random_bytes(&op->version, sizeof(op->version));
+}
+
+/* Discard */
+
+/* bch_discard - discard a range of keys from start_key to end_key.
+ * @c cache set
+ * @start_key pointer to start location
+ * NOTE: discard starts at bkey_start_offset(start_key)
+ * @end_key pointer to end location
+ * NOTE: discard ends at KEY_OFFSET(end_key)
+ * @version version of discard (0ULL if none)
+ *
+ * Returns:
+ * 0 on success
+ * <0 on error
+ *
+ * XXX: this needs to be refactored with inode_truncate, or more
+ * appropriately inode_truncate should call this
+ */
+int bch_discard(struct cache_set *c, struct bpos start,
+ struct bpos end, u64 version,
+ struct disk_reservation *disk_res,
+ struct extent_insert_hook *hook,
+ u64 *journal_seq)
+{
+ return bch_btree_delete_range(c, BTREE_ID_EXTENTS, start, end, version,
+ disk_res, hook, journal_seq);
+}
+
+/* Cache promotion on read */
+
+struct cache_promote_op {
+ struct closure cl;
+ struct migrate_write write;
+ struct bio_vec bi_inline_vecs[0]; /* must be last */
+};
+
+/* Read */
+
+static int bio_checksum_uncompress(struct cache_set *c,
+ struct bch_read_bio *rbio)
+{
+ struct bio *src = &rbio->bio;
+ struct bio *dst = &bch_rbio_parent(rbio)->bio;
+ struct bvec_iter dst_iter = rbio->parent_iter;
+ u64 csum;
+ int ret = 0;
+
+ /*
+ * reset iterator for checksumming and copying bounced data: here we've
+ * set rbio->compressed_size to the amount of data we actually read,
+ * which was not necessarily the full extent if we were only bouncing
+ * in order to promote
+ */
+ if (rbio->bounce) {
+ src->bi_iter.bi_size = rbio->crc.compressed_size << 9;
+ src->bi_iter.bi_idx = 0;
+ src->bi_iter.bi_bvec_done = 0;
+ } else {
+ src->bi_iter = rbio->parent_iter;
+ }
+
+ csum = bch_checksum_bio(src, rbio->crc.csum_type);
+ if (cache_nonfatal_io_err_on(rbio->crc.csum != csum, rbio->ca,
+ "data checksum error, inode %llu offset %llu: expected %0llx got %0llx (type %u)",
+ rbio->inode, (u64) rbio->parent_iter.bi_sector << 9,
+ rbio->crc.csum, csum, rbio->crc.csum_type))
+ ret = -EIO;
+
+ /*
+ * If there was a checksum error, still copy the data back - unless it
+ * was compressed, we don't want to decompress bad data:
+ */
+ if (rbio->crc.compression_type != BCH_COMPRESSION_NONE) {
+ if (!ret) {
+ ret = bch_bio_uncompress(c, src, dst,
+ dst_iter, rbio->crc);
+ if (ret)
+ __bcache_io_error(c, "decompression error");
+ }
+ } else if (rbio->bounce) {
+ bio_advance(src, rbio->crc.offset << 9);
+ bio_copy_data_iter(dst, dst_iter,
+ src, src->bi_iter);
+ }
+
+ return ret;
+}
+
+static void bch_rbio_free(struct cache_set *c, struct bch_read_bio *rbio)
+{
+ struct bio *bio = &rbio->bio;
+
+ BUG_ON(rbio->ca);
+ BUG_ON(!rbio->split);
+
+ if (rbio->promote)
+ kfree(rbio->promote);
+ if (rbio->bounce)
+ bch_bio_free_pages_pool(c, bio);
+
+ bio_put(bio);
+}
+
+static void bch_rbio_done(struct cache_set *c, struct bch_read_bio *rbio)
+{
+ struct bio *orig = &bch_rbio_parent(rbio)->bio;
+
+ percpu_ref_put(&rbio->ca->ref);
+ rbio->ca = NULL;
+
+ if (rbio->split) {
+ if (rbio->bio.bi_error)
+ orig->bi_error = rbio->bio.bi_error;
+
+ bio_endio(orig);
+ bch_rbio_free(c, rbio);
+ } else {
+ if (rbio->promote)
+ kfree(rbio->promote);
+
+ orig->bi_end_io = rbio->orig_bi_end_io;
+ bio_endio_nodec(orig);
+ }
+}
+
+/*
+ * Decide if we want to retry the read - returns true if read is being retried,
+ * false if caller should pass error on up
+ */
+static void bch_read_error_maybe_retry(struct cache_set *c,
+ struct bch_read_bio *rbio,
+ int error)
+{
+ unsigned long flags;
+
+ if ((error == -EINTR) &&
+ (rbio->flags & BCH_READ_RETRY_IF_STALE)) {
+ atomic_long_inc(&c->cache_read_races);
+ goto retry;
+ }
+
+ if (error == -EIO) {
+ /* io error - do we have another replica? */
+ }
+
+ bch_rbio_parent(rbio)->bio.bi_error = error;
+ bch_rbio_done(c, rbio);
+ return;
+retry:
+ percpu_ref_put(&rbio->ca->ref);
+ rbio->ca = NULL;
+
+ spin_lock_irqsave(&c->read_retry_lock, flags);
+ bio_list_add(&c->read_retry_list, &rbio->bio);
+ spin_unlock_irqrestore(&c->read_retry_lock, flags);
+ queue_work(c->wq, &c->read_retry_work);
+}
+
+static void cache_promote_done(struct closure *cl)
+{
+ struct cache_promote_op *op =
+ container_of(cl, struct cache_promote_op, cl);
+
+ bch_bio_free_pages_pool(op->write.op.c, &op->write.wbio.bio);
+ kfree(op);
+}
+
+/* Inner part that may run in process context */
+static void __bch_read_endio(struct cache_set *c, struct bch_read_bio *rbio)
+{
+ int ret;
+
+ ret = bio_checksum_uncompress(c, rbio);
+ if (ret) {
+ bch_read_error_maybe_retry(c, rbio, ret);
+ return;
+ }
+
+ if (rbio->promote &&
+ !test_bit(CACHE_SET_RO, &c->flags) &&
+ !test_bit(CACHE_SET_STOPPING, &c->flags)) {
+ struct cache_promote_op *promote = rbio->promote;
+ struct closure *cl = &promote->cl;
+
+ BUG_ON(!rbio->split || !rbio->bounce);
+
+ /* we now own pages: */
+ swap(promote->write.wbio.bio.bi_vcnt, rbio->bio.bi_vcnt);
+ rbio->promote = NULL;
+
+ bch_rbio_done(c, rbio);
+
+ closure_init(cl, &c->cl);
+ closure_call(&promote->write.op.cl, bch_write, c->wq, cl);
+ closure_return_with_destructor(cl, cache_promote_done);
+ } else {
+ bch_rbio_done(c, rbio);
+ }
+}
+
+void bch_bio_decompress_work(struct work_struct *work)
+{
+ struct bio_decompress_worker *d =
+ container_of(work, struct bio_decompress_worker, work);
+ struct llist_node *list, *next;
+ struct bch_read_bio *rbio;
+
+ while ((list = llist_del_all(&d->bio_list)))
+ for (list = llist_reverse_order(list);
+ list;
+ list = next) {
+ next = llist_next(list);
+ rbio = container_of(list, struct bch_read_bio, list);
+
+ __bch_read_endio(d->c, rbio);
+ }
+}
+
+static void bch_read_endio(struct bio *bio)
+{
+ struct bch_read_bio *rbio =
+ container_of(bio, struct bch_read_bio, bio);
+ struct cache_set *c = rbio->ca->set;
+ int stale = ((rbio->flags & BCH_READ_RETRY_IF_STALE) && race_fault()) ||
+ ptr_stale(rbio->ca, &rbio->ptr) ? -EINTR : 0;
+ int error = bio->bi_error ?: stale;
+
+ bch_account_io_completion_time(rbio->ca, rbio->submit_time_us, REQ_OP_READ);
+
+ cache_nonfatal_io_err_on(bio->bi_error, rbio->ca, "data read");
+
+ if (error) {
+ bch_read_error_maybe_retry(c, rbio, error);
+ return;
+ }
+
+ if (rbio->crc.compression_type != BCH_COMPRESSION_NONE) {
+ struct bio_decompress_worker *d;
+
+ preempt_disable();
+ d = this_cpu_ptr(c->bio_decompress_worker);
+ llist_add(&rbio->list, &d->bio_list);
+ queue_work(system_unbound_wq, &d->work);
+ preempt_enable();
+ } else {
+ __bch_read_endio(c, rbio);
+ }
+}
+
+void bch_read_extent_iter(struct cache_set *c, struct bch_read_bio *orig,
+ struct bvec_iter iter, struct bkey_s_c k,
+ struct extent_pick_ptr *pick, unsigned flags)
+{
+ struct bch_read_bio *rbio;
+ struct cache_promote_op *promote_op = NULL;
+ unsigned skip = iter.bi_sector - bkey_start_offset(k.k);
+ bool bounce = false, split, read_full = false;
+
+ EBUG_ON(bkey_start_offset(k.k) > iter.bi_sector ||
+ k.k->p.offset < bvec_iter_end_sector(iter));
+
+ /* only promote if we're not reading from the fastest tier: */
+
+ /*
+ * XXX: multiple promotes can race with each other, wastefully. Keep a
+ * list of outstanding promotes?
+ */
+ if ((flags & BCH_READ_PROMOTE) && pick->ca->mi.tier) {
+ /*
+ * biovec needs to be big enough to hold decompressed data, if
+ * the bch_write_extent() has to decompress/recompress it:
+ */
+ unsigned sectors =
+ max_t(unsigned, k.k->size,
+ pick->crc.uncompressed_size);
+ unsigned pages = DIV_ROUND_UP(sectors, PAGE_SECTORS);
+
+ promote_op = kmalloc(sizeof(*promote_op) +
+ sizeof(struct bio_vec) * pages, GFP_NOIO);
+ if (promote_op) {
+ struct bio *promote_bio = &promote_op->write.wbio.bio;
+
+ bio_init(promote_bio);
+ promote_bio->bi_max_vecs = pages;
+ promote_bio->bi_io_vec = promote_bio->bi_inline_vecs;
+ bounce = true;
+ /* could also set read_full */
+ }
+ }
+
+ /*
+ * note: if compression_type and crc_type both == none, then
+ * compressed/uncompressed size is zero
+ */
+ if (pick->crc.compression_type != BCH_COMPRESSION_NONE ||
+ (pick->crc.csum_type != BCH_CSUM_NONE &&
+ (bvec_iter_sectors(iter) != pick->crc.uncompressed_size ||
+ (flags & BCH_READ_FORCE_BOUNCE)))) {
+ read_full = true;
+ bounce = true;
+ }
+
+ if (bounce) {
+ unsigned sectors = read_full
+ ? (pick->crc.compressed_size ?: k.k->size)
+ : bvec_iter_sectors(iter);
+
+ rbio = container_of(bio_alloc_bioset(GFP_NOIO,
+ DIV_ROUND_UP(sectors, PAGE_SECTORS),
+ &c->bio_read_split),
+ struct bch_read_bio, bio);
+
+ bch_bio_alloc_pages_pool(c, &rbio->bio, sectors << 9);
+ split = true;
+ } else if (!(flags & BCH_READ_MAY_REUSE_BIO) ||
+ !(flags & BCH_READ_IS_LAST)) {
+ /*
+ * Have to clone if there were any splits, due to error
+ * reporting issues (if a split errored, and retrying didn't
+ * work, when it reports the error to its parent (us) we don't
+ * know if the error was from our bio, and we should retry, or
+ * from the whole bio, in which case we don't want to retry and
+ * lose the error)
+ */
+ rbio = container_of(bio_clone_fast(&orig->bio,
+ GFP_NOIO, &c->bio_read_split),
+ struct bch_read_bio, bio);
+ rbio->bio.bi_iter = iter;
+ split = true;
+ } else {
+ rbio = orig;
+ rbio->bio.bi_iter = iter;
+ split = false;
+ BUG_ON(bio_flagged(&rbio->bio, BIO_CHAIN));
+ }
+
+ if (!(flags & BCH_READ_IS_LAST))
+ __bio_inc_remaining(&orig->bio);
+
+ if (split)
+ rbio->parent = orig;
+ else
+ rbio->orig_bi_end_io = orig->bio.bi_end_io;
+ rbio->parent_iter = iter;
+
+ rbio->inode = k.k->p.inode;
+ rbio->flags = flags;
+ rbio->bounce = bounce;
+ rbio->split = split;
+ rbio->crc = pick->crc;
+ /*
+ * crc.compressed_size will be 0 if there wasn't any checksum
+ * information, also we need to stash the original size of the bio if we
+ * bounced (which isn't necessarily the original key size, if we bounced
+ * only for promoting)
+ */
+ rbio->crc.compressed_size = bio_sectors(&rbio->bio);
+ rbio->ptr = pick->ptr;
+ rbio->ca = pick->ca;
+ rbio->promote = promote_op;
+
+ rbio->bio.bi_bdev = pick->ca->disk_sb.bdev;
+ rbio->bio.bi_opf = orig->bio.bi_opf;
+ rbio->bio.bi_iter.bi_sector = pick->ptr.offset;
+ rbio->bio.bi_end_io = bch_read_endio;
+
+ if (promote_op) {
+ struct bio *promote_bio = &promote_op->write.wbio.bio;
+
+ promote_bio->bi_iter = rbio->bio.bi_iter;
+ memcpy(promote_bio->bi_io_vec, rbio->bio.bi_io_vec,
+ sizeof(struct bio_vec) * rbio->bio.bi_vcnt);
+
+ bch_migrate_write_init(c, &promote_op->write,
+ &c->promote_write_point,
+ k, NULL,
+ BCH_WRITE_ALLOC_NOWAIT);
+ promote_op->write.promote = true;
+
+ if (rbio->crc.compression_type) {
+ promote_op->write.op.flags |= BCH_WRITE_DATA_COMPRESSED;
+ promote_op->write.op.crc = rbio->crc;
+ promote_op->write.op.size = k.k->size;
+ } else if (read_full) {
+ /*
+ * Adjust bio to correspond to _live_ portion of @k -
+ * which might be less than what we're actually reading:
+ */
+ bio_advance(promote_bio, rbio->crc.offset << 9);
+ BUG_ON(bio_sectors(promote_bio) < k.k->size);
+ promote_bio->bi_iter.bi_size = k.k->size << 9;
+ } else {
+ /*
+ * Set insert pos to correspond to what we're actually
+ * reading:
+ */
+ promote_op->write.op.pos.offset = iter.bi_sector;
+ }
+
+ promote_bio->bi_iter.bi_sector =
+ promote_op->write.op.pos.offset;
+ }
+
+ /* _after_ promete stuff has looked at rbio->crc.offset */
+ if (read_full)
+ rbio->crc.offset += skip;
+ else
+ rbio->bio.bi_iter.bi_sector += skip;
+
+ rbio->submit_time_us = local_clock_us();
+
+#ifndef CONFIG_BCACHE_NO_IO
+ generic_make_request(&rbio->bio);
+#else
+ bio_endio(&rbio->bio);
+#endif
+}
+
+static void bch_read_iter(struct cache_set *c, struct bch_read_bio *rbio,
+ struct bvec_iter bvec_iter, u64 inode,
+ unsigned flags)
+{
+ struct bio *bio = &rbio->bio;
+ struct btree_iter iter;
+ struct bkey_s_c k;
+ int ret;
+
+ for_each_btree_key_with_holes(&iter, c, BTREE_ID_EXTENTS,
+ POS(inode, bvec_iter.bi_sector), k) {
+ BKEY_PADDED(k) tmp;
+ struct extent_pick_ptr pick;
+ unsigned bytes, sectors;
+ bool is_last;
+
+ /*
+ * Unlock the iterator while the btree node's lock is still in
+ * cache, before doing the IO:
+ */
+ bkey_reassemble(&tmp.k, k);
+ k = bkey_i_to_s_c(&tmp.k);
+ bch_btree_iter_unlock(&iter);
+
+ bch_extent_pick_ptr(c, k, &pick);
+ if (IS_ERR(pick.ca)) {
+ bcache_io_error(c, bio, "no device to read from");
+ bio_endio(bio);
+ return;
+ }
+
+ sectors = min_t(u64, k.k->p.offset,
+ bvec_iter_end_sector(bvec_iter)) -
+ bvec_iter.bi_sector;
+ bytes = sectors << 9;
+ is_last = bytes == bvec_iter.bi_size;
+ swap(bvec_iter.bi_size, bytes);
+
+ if (is_last)
+ flags |= BCH_READ_IS_LAST;
+
+ if (pick.ca) {
+ PTR_BUCKET(pick.ca, &pick.ptr)->read_prio =
+ c->prio_clock[READ].hand;
+
+ bch_read_extent_iter(c, rbio, bvec_iter,
+ k, &pick, flags);
+
+ flags &= ~BCH_READ_MAY_REUSE_BIO;
+ } else {
+ zero_fill_bio_iter(bio, bvec_iter);
+
+ if (is_last)
+ bio_endio(bio);
+ }
+
+ if (is_last)
+ return;
+
+ swap(bvec_iter.bi_size, bytes);
+ bio_advance_iter(bio, &bvec_iter, bytes);
+ }
+
+ /*
+ * If we get here, it better have been because there was an error
+ * reading a btree node
+ */
+ ret = bch_btree_iter_unlock(&iter);
+ BUG_ON(!ret);
+ bcache_io_error(c, bio, "btree IO error %i", ret);
+ bio_endio(bio);
+}
+
+void bch_read(struct cache_set *c, struct bch_read_bio *bio, u64 inode)
+{
+ bch_increment_clock(c, bio_sectors(&bio->bio), READ);
+
+ bch_read_iter(c, bio, bio->bio.bi_iter, inode,
+ BCH_READ_FORCE_BOUNCE|
+ BCH_READ_RETRY_IF_STALE|
+ BCH_READ_PROMOTE|
+ BCH_READ_MAY_REUSE_BIO);
+}
+EXPORT_SYMBOL(bch_read);
+
+/**
+ * bch_read_retry - re-submit a bio originally from bch_read()
+ */
+static void bch_read_retry(struct cache_set *c, struct bch_read_bio *rbio)
+{
+ struct bch_read_bio *parent = bch_rbio_parent(rbio);
+ struct bvec_iter iter = rbio->parent_iter;
+ u64 inode = rbio->inode;
+
+ trace_bcache_read_retry(&rbio->bio);
+
+ if (rbio->split)
+ bch_rbio_free(c, rbio);
+ else
+ rbio->bio.bi_end_io = rbio->orig_bi_end_io;
+
+ bch_read_iter(c, parent, iter, inode,
+ BCH_READ_FORCE_BOUNCE|
+ BCH_READ_RETRY_IF_STALE|
+ BCH_READ_PROMOTE);
+}
+
+void bch_read_retry_work(struct work_struct *work)
+{
+ struct cache_set *c = container_of(work, struct cache_set,
+ read_retry_work);
+ struct bch_read_bio *rbio;
+ struct bio *bio;
+ unsigned long flags;
+
+ while (1) {
+ spin_lock_irqsave(&c->read_retry_lock, flags);
+ bio = bio_list_pop(&c->read_retry_list);
+ spin_unlock_irqrestore(&c->read_retry_lock, flags);
+
+ if (!bio)
+ break;
+
+ rbio = container_of(bio, struct bch_read_bio, bio);
+ bch_read_retry(c, rbio);
+ }
+}