dm cache: add fail io mode and needs_check flag
authorJoe Thornber <ejt@redhat.com>
Wed, 22 Apr 2015 20:42:35 +0000 (16:42 -0400)
committerMike Snitzer <snitzer@redhat.com>
Thu, 11 Jun 2015 21:13:00 +0000 (17:13 -0400)
If a cache metadata operation fails (e.g. transaction commit) the
cache's metadata device will abort the current transaction, set a new
needs_check flag, and the cache will transition to "read-only" mode.  If
aborting the transaction or setting the needs_check flag fails the cache
will transition to "fail-io" mode.

Once needs_check is set the cache device will not be allowed to
activate.  Activation requires write access to metadata.  Future work is
needed to add proper support for running the cache in read-only mode.

Once in fail-io mode the cache will report a status of "Fail".

Also, add commit() wrapper that will disallow commits if in read_only or
fail mode.

Signed-off-by: Joe Thornber <ejt@redhat.com>
Signed-off-by: Mike Snitzer <snitzer@redhat.com>
Documentation/device-mapper/cache.txt
drivers/md/dm-cache-metadata.c
drivers/md/dm-cache-metadata.h
drivers/md/dm-cache-policy-internal.h
drivers/md/dm-cache-policy-mq.c
drivers/md/dm-cache-policy.h
drivers/md/dm-cache-target.c

index 68c0f517c60edb1ab7a61e990720c7c8f097bb16..82960cffbad346d73a8e47ebd188a54a9db4e4c1 100644 (file)
@@ -221,6 +221,7 @@ Status
 <#read hits> <#read misses> <#write hits> <#write misses>
 <#demotions> <#promotions> <#dirty> <#features> <features>*
 <#core args> <core args>* <policy name> <#policy args> <policy args>*
+<cache metadata mode>
 
 metadata block size     : Fixed block size for each metadata block in
                             sectors
@@ -251,8 +252,12 @@ core args           : Key/value pairs for tuning the core
                             e.g. migration_threshold
 policy name             : Name of the policy
 #policy args            : Number of policy arguments to follow (must be even)
-policy args             : Key/value pairs
-                            e.g. sequential_threshold
+policy args             : Key/value pairs e.g. sequential_threshold
+cache metadata mode      : ro if read-only, rw if read-write
+       In serious cases where even a read-only mode is deemed unsafe
+       no further I/O will be permitted and the status will just
+       contain the string 'Fail'.  The userspace recovery tools
+       should then be used.
 
 Messages
 --------
index c1c010498a21b99a9bf730b0cd4277776abc159a..20cc36b01b77895625adbe82a14923148d688edb 100644 (file)
@@ -39,6 +39,8 @@
 enum superblock_flag_bits {
        /* for spotting crashes that would invalidate the dirty bitset */
        CLEAN_SHUTDOWN,
+       /* metadata must be checked using the tools */
+       NEEDS_CHECK,
 };
 
 /*
@@ -107,6 +109,7 @@ struct dm_cache_metadata {
        struct dm_disk_bitset discard_info;
 
        struct rw_semaphore root_lock;
+       unsigned long flags;
        dm_block_t root;
        dm_block_t hint_root;
        dm_block_t discard_root;
@@ -129,6 +132,14 @@ struct dm_cache_metadata {
         * buffer before the superblock is locked and updated.
         */
        __u8 metadata_space_map_root[SPACE_MAP_ROOT_SIZE];
+
+       /*
+        * Set if a transaction has to be aborted but the attempt to roll
+        * back to the previous (good) transaction failed.  The only
+        * metadata operation permissible in this state is the closing of
+        * the device.
+        */
+       bool fail_io:1;
 };
 
 /*-------------------------------------------------------------------
@@ -527,6 +538,7 @@ static unsigned long clear_clean_shutdown(unsigned long flags)
 static void read_superblock_fields(struct dm_cache_metadata *cmd,
                                   struct cache_disk_superblock *disk_super)
 {
+       cmd->flags = le32_to_cpu(disk_super->flags);
        cmd->root = le64_to_cpu(disk_super->mapping_root);
        cmd->hint_root = le64_to_cpu(disk_super->hint_root);
        cmd->discard_root = le64_to_cpu(disk_super->discard_root);
@@ -625,6 +637,7 @@ static int __commit_transaction(struct dm_cache_metadata *cmd,
        if (mutator)
                update_flags(disk_super, mutator);
 
+       disk_super->flags = cpu_to_le32(cmd->flags);
        disk_super->mapping_root = cpu_to_le64(cmd->root);
        disk_super->hint_root = cpu_to_le64(cmd->hint_root);
        disk_super->discard_root = cpu_to_le64(cmd->discard_root);
@@ -693,6 +706,7 @@ static struct dm_cache_metadata *metadata_open(struct block_device *bdev,
        cmd->cache_blocks = 0;
        cmd->policy_hint_size = policy_hint_size;
        cmd->changed = true;
+       cmd->fail_io = false;
 
        r = __create_persistent_data_objects(cmd, may_format_device);
        if (r) {
@@ -796,7 +810,8 @@ void dm_cache_metadata_close(struct dm_cache_metadata *cmd)
                list_del(&cmd->list);
                mutex_unlock(&table_lock);
 
-               __destroy_persistent_data_objects(cmd);
+               if (!cmd->fail_io)
+                       __destroy_persistent_data_objects(cmd);
                kfree(cmd);
        }
 }
@@ -848,13 +863,26 @@ static int blocks_are_unmapped_or_clean(struct dm_cache_metadata *cmd,
        return 0;
 }
 
+#define WRITE_LOCK(cmd) \
+       if (cmd->fail_io || dm_bm_is_read_only(cmd->bm)) \
+               return -EINVAL; \
+       down_write(&cmd->root_lock)
+
+#define WRITE_LOCK_VOID(cmd) \
+       if (cmd->fail_io || dm_bm_is_read_only(cmd->bm)) \
+               return; \
+       down_write(&cmd->root_lock)
+
+#define WRITE_UNLOCK(cmd) \
+       up_write(&cmd->root_lock)
+
 int dm_cache_resize(struct dm_cache_metadata *cmd, dm_cblock_t new_cache_size)
 {
        int r;
        bool clean;
        __le64 null_mapping = pack_value(0, 0);
 
-       down_write(&cmd->root_lock);
+       WRITE_LOCK(cmd);
        __dm_bless_for_disk(&null_mapping);
 
        if (from_cblock(new_cache_size) < from_cblock(cmd->cache_blocks)) {
@@ -880,7 +908,7 @@ int dm_cache_resize(struct dm_cache_metadata *cmd, dm_cblock_t new_cache_size)
        cmd->changed = true;
 
 out:
-       up_write(&cmd->root_lock);
+       WRITE_UNLOCK(cmd);
 
        return r;
 }
@@ -891,7 +919,7 @@ int dm_cache_discard_bitset_resize(struct dm_cache_metadata *cmd,
 {
        int r;
 
-       down_write(&cmd->root_lock);
+       WRITE_LOCK(cmd);
        r = dm_bitset_resize(&cmd->discard_info,
                             cmd->discard_root,
                             from_dblock(cmd->discard_nr_blocks),
@@ -903,7 +931,7 @@ int dm_cache_discard_bitset_resize(struct dm_cache_metadata *cmd,
        }
 
        cmd->changed = true;
-       up_write(&cmd->root_lock);
+       WRITE_UNLOCK(cmd);
 
        return r;
 }
@@ -946,9 +974,9 @@ int dm_cache_set_discard(struct dm_cache_metadata *cmd,
 {
        int r;
 
-       down_write(&cmd->root_lock);
+       WRITE_LOCK(cmd);
        r = __discard(cmd, dblock, discard);
-       up_write(&cmd->root_lock);
+       WRITE_UNLOCK(cmd);
 
        return r;
 }
@@ -1020,9 +1048,9 @@ int dm_cache_remove_mapping(struct dm_cache_metadata *cmd, dm_cblock_t cblock)
 {
        int r;
 
-       down_write(&cmd->root_lock);
+       WRITE_LOCK(cmd);
        r = __remove(cmd, cblock);
-       up_write(&cmd->root_lock);
+       WRITE_UNLOCK(cmd);
 
        return r;
 }
@@ -1048,9 +1076,9 @@ int dm_cache_insert_mapping(struct dm_cache_metadata *cmd,
 {
        int r;
 
-       down_write(&cmd->root_lock);
+       WRITE_LOCK(cmd);
        r = __insert(cmd, cblock, oblock);
-       up_write(&cmd->root_lock);
+       WRITE_UNLOCK(cmd);
 
        return r;
 }
@@ -1234,9 +1262,9 @@ int dm_cache_set_dirty(struct dm_cache_metadata *cmd,
 {
        int r;
 
-       down_write(&cmd->root_lock);
+       WRITE_LOCK(cmd);
        r = __dirty(cmd, cblock, dirty);
-       up_write(&cmd->root_lock);
+       WRITE_UNLOCK(cmd);
 
        return r;
 }
@@ -1252,9 +1280,9 @@ void dm_cache_metadata_get_stats(struct dm_cache_metadata *cmd,
 void dm_cache_metadata_set_stats(struct dm_cache_metadata *cmd,
                                 struct dm_cache_statistics *stats)
 {
-       down_write(&cmd->root_lock);
+       WRITE_LOCK_VOID(cmd);
        cmd->stats = *stats;
-       up_write(&cmd->root_lock);
+       WRITE_UNLOCK(cmd);
 }
 
 int dm_cache_commit(struct dm_cache_metadata *cmd, bool clean_shutdown)
@@ -1263,7 +1291,7 @@ int dm_cache_commit(struct dm_cache_metadata *cmd, bool clean_shutdown)
        flags_mutator mutator = (clean_shutdown ? set_clean_shutdown :
                                 clear_clean_shutdown);
 
-       down_write(&cmd->root_lock);
+       WRITE_LOCK(cmd);
        r = __commit_transaction(cmd, mutator);
        if (r)
                goto out;
@@ -1271,7 +1299,7 @@ int dm_cache_commit(struct dm_cache_metadata *cmd, bool clean_shutdown)
        r = __begin_transaction(cmd);
 
 out:
-       up_write(&cmd->root_lock);
+       WRITE_UNLOCK(cmd);
        return r;
 }
 
@@ -1376,9 +1404,9 @@ int dm_cache_write_hints(struct dm_cache_metadata *cmd, struct dm_cache_policy *
 {
        int r;
 
-       down_write(&cmd->root_lock);
+       WRITE_LOCK(cmd);
        r = write_hints(cmd, policy);
-       up_write(&cmd->root_lock);
+       WRITE_UNLOCK(cmd);
 
        return r;
 }
@@ -1387,3 +1415,70 @@ int dm_cache_metadata_all_clean(struct dm_cache_metadata *cmd, bool *result)
 {
        return blocks_are_unmapped_or_clean(cmd, 0, cmd->cache_blocks, result);
 }
+
+void dm_cache_metadata_set_read_only(struct dm_cache_metadata *cmd)
+{
+       WRITE_LOCK_VOID(cmd);
+       dm_bm_set_read_only(cmd->bm);
+       WRITE_UNLOCK(cmd);
+}
+
+void dm_cache_metadata_set_read_write(struct dm_cache_metadata *cmd)
+{
+       WRITE_LOCK_VOID(cmd);
+       dm_bm_set_read_write(cmd->bm);
+       WRITE_UNLOCK(cmd);
+}
+
+int dm_cache_metadata_set_needs_check(struct dm_cache_metadata *cmd)
+{
+       int r;
+       struct dm_block *sblock;
+       struct cache_disk_superblock *disk_super;
+
+       /*
+        * We ignore fail_io for this function.
+        */
+       down_write(&cmd->root_lock);
+       set_bit(NEEDS_CHECK, &cmd->flags);
+
+       r = superblock_lock(cmd, &sblock);
+       if (r) {
+               DMERR("couldn't read superblock");
+               goto out;
+       }
+
+       disk_super = dm_block_data(sblock);
+       disk_super->flags = cpu_to_le32(cmd->flags);
+
+       dm_bm_unlock(sblock);
+
+out:
+       up_write(&cmd->root_lock);
+       return r;
+}
+
+bool dm_cache_metadata_needs_check(struct dm_cache_metadata *cmd)
+{
+       bool needs_check;
+
+       down_read(&cmd->root_lock);
+       needs_check = !!test_bit(NEEDS_CHECK, &cmd->flags);
+       up_read(&cmd->root_lock);
+
+       return needs_check;
+}
+
+int dm_cache_metadata_abort(struct dm_cache_metadata *cmd)
+{
+       int r;
+
+       WRITE_LOCK(cmd);
+       __destroy_persistent_data_objects(cmd);
+       r = __create_persistent_data_objects(cmd, false);
+       if (r)
+               cmd->fail_io = true;
+       WRITE_UNLOCK(cmd);
+
+       return r;
+}
index 4ecc403be28370cce26052236b3d954ec0280a3f..2ffee21f318dfef70c967570b9677da0193d6e12 100644 (file)
@@ -102,6 +102,10 @@ struct dm_cache_statistics {
 
 void dm_cache_metadata_get_stats(struct dm_cache_metadata *cmd,
                                 struct dm_cache_statistics *stats);
+
+/*
+ * 'void' because it's no big deal if it fails.
+ */
 void dm_cache_metadata_set_stats(struct dm_cache_metadata *cmd,
                                 struct dm_cache_statistics *stats);
 
@@ -133,6 +137,12 @@ int dm_cache_write_hints(struct dm_cache_metadata *cmd, struct dm_cache_policy *
  */
 int dm_cache_metadata_all_clean(struct dm_cache_metadata *cmd, bool *result);
 
+bool dm_cache_metadata_needs_check(struct dm_cache_metadata *cmd);
+int dm_cache_metadata_set_needs_check(struct dm_cache_metadata *cmd);
+void dm_cache_metadata_set_read_only(struct dm_cache_metadata *cmd);
+void dm_cache_metadata_set_read_write(struct dm_cache_metadata *cmd);
+int dm_cache_metadata_abort(struct dm_cache_metadata *cmd);
+
 /*----------------------------------------------------------------*/
 
 #endif /* DM_CACHE_METADATA_H */
index 9dc05a52369e60cb280c9bebac3985c1ce9b16d1..ccbe852d53622055a0799b1d85d583c765e11f27 100644 (file)
@@ -89,13 +89,15 @@ static inline void policy_tick(struct dm_cache_policy *p)
                return p->tick(p);
 }
 
-static inline int policy_emit_config_values(struct dm_cache_policy *p, char *result, unsigned maxlen)
+static inline int policy_emit_config_values(struct dm_cache_policy *p, char *result,
+                                           unsigned maxlen, ssize_t *sz_ptr)
 {
-       ssize_t sz = 0;
+       ssize_t sz = *sz_ptr;
        if (p->emit_config_values)
-               return p->emit_config_values(p, result, maxlen);
+               return p->emit_config_values(p, result, maxlen, sz_ptr);
 
-       DMEMIT("0");
+       DMEMIT("0 ");
+       *sz_ptr = sz;
        return 0;
 }
 
index 7cbae125879c7079bc87c7fb3d96ea13462a782e..084eec6533210e22a3760c985ffc44f3a69ce5f1 100644 (file)
@@ -1323,22 +1323,24 @@ static int mq_set_config_value(struct dm_cache_policy *p,
        return 0;
 }
 
-static int mq_emit_config_values(struct dm_cache_policy *p, char *result, unsigned maxlen)
+static int mq_emit_config_values(struct dm_cache_policy *p, char *result,
+                                unsigned maxlen, ssize_t *sz_ptr)
 {
-       ssize_t sz = 0;
+       ssize_t sz = *sz_ptr;
        struct mq_policy *mq = to_mq_policy(p);
 
        DMEMIT("10 random_threshold %u "
               "sequential_threshold %u "
               "discard_promote_adjustment %u "
               "read_promote_adjustment %u "
-              "write_promote_adjustment %u",
+              "write_promote_adjustment %u ",
               mq->tracker.thresholds[PATTERN_RANDOM],
               mq->tracker.thresholds[PATTERN_SEQUENTIAL],
               mq->discard_promote_adjustment,
               mq->read_promote_adjustment,
               mq->write_promote_adjustment);
 
+       *sz_ptr = sz;
        return 0;
 }
 
index 6106ca3aa350a62326558e61c296e801b9fbe12b..74709129d856adfa2eb213338ea5c74792317506 100644 (file)
@@ -208,8 +208,8 @@ struct dm_cache_policy {
        /*
         * Configuration.
         */
-       int (*emit_config_values)(struct dm_cache_policy *p,
-                                 char *result, unsigned maxlen);
+       int (*emit_config_values)(struct dm_cache_policy *p, char *result,
+                                 unsigned maxlen, ssize_t *sz_ptr);
        int (*set_config_value)(struct dm_cache_policy *p,
                                const char *key, const char *value);
 
index 6d36ed3c46a0c1f91328f3c3365a37d64509ab41..dae0321ebfa9314c9df23ed77244f9fc9a76e497 100644 (file)
@@ -150,12 +150,10 @@ static void dm_unhook_bio(struct dm_hook_info *h, struct bio *bio)
 #define DATA_DEV_BLOCK_SIZE_MIN_SECTORS (32 * 1024 >> SECTOR_SHIFT)
 #define DATA_DEV_BLOCK_SIZE_MAX_SECTORS (1024 * 1024 * 1024 >> SECTOR_SHIFT)
 
-/*
- * FIXME: the cache is read/write for the time being.
- */
 enum cache_metadata_mode {
        CM_WRITE,               /* metadata may be changed */
        CM_READ_ONLY,           /* metadata may not be changed */
+       CM_FAIL
 };
 
 enum cache_io_mode {
@@ -385,6 +383,8 @@ struct prealloc {
        struct dm_bio_prison_cell *cell2;
 };
 
+static enum cache_metadata_mode get_cache_mode(struct cache *cache);
+
 static void wake_worker(struct cache *cache)
 {
        queue_work(cache->wq, &cache->worker);
@@ -699,6 +699,9 @@ static void save_stats(struct cache *cache)
 {
        struct dm_cache_statistics stats;
 
+       if (get_cache_mode(cache) >= CM_READ_ONLY)
+               return;
+
        stats.read_hits = atomic_read(&cache->stats.read_hit);
        stats.read_misses = atomic_read(&cache->stats.read_miss);
        stats.write_hits = atomic_read(&cache->stats.write_hit);
@@ -957,6 +960,84 @@ static void remap_to_origin_then_cache(struct cache *cache, struct bio *bio,
        remap_to_origin_clear_discard(pb->cache, bio, oblock);
 }
 
+/*----------------------------------------------------------------
+ * Failure modes
+ *--------------------------------------------------------------*/
+static enum cache_metadata_mode get_cache_mode(struct cache *cache)
+{
+       return cache->features.mode;
+}
+
+static void notify_mode_switch(struct cache *cache, enum cache_metadata_mode mode)
+{
+       const char *descs[] = {
+               "write",
+               "read-only",
+               "fail"
+       };
+
+       dm_table_event(cache->ti->table);
+       DMINFO("switching cache to %s mode", descs[(int)mode]);
+}
+
+static void set_cache_mode(struct cache *cache, enum cache_metadata_mode new_mode)
+{
+       bool needs_check = dm_cache_metadata_needs_check(cache->cmd);
+       enum cache_metadata_mode old_mode = get_cache_mode(cache);
+
+       if (new_mode == CM_WRITE && needs_check) {
+               DMERR("unable to switch cache to write mode until repaired.");
+               if (old_mode != new_mode)
+                       new_mode = old_mode;
+               else
+                       new_mode = CM_READ_ONLY;
+       }
+
+       /* Never move out of fail mode */
+       if (old_mode == CM_FAIL)
+               new_mode = CM_FAIL;
+
+       switch (new_mode) {
+       case CM_FAIL:
+       case CM_READ_ONLY:
+               dm_cache_metadata_set_read_only(cache->cmd);
+               break;
+
+       case CM_WRITE:
+               dm_cache_metadata_set_read_write(cache->cmd);
+               break;
+       }
+
+       cache->features.mode = new_mode;
+
+       if (new_mode != old_mode)
+               notify_mode_switch(cache, new_mode);
+}
+
+static void abort_transaction(struct cache *cache)
+{
+       if (get_cache_mode(cache) >= CM_READ_ONLY)
+               return;
+
+       if (dm_cache_metadata_set_needs_check(cache->cmd)) {
+               DMERR("failed to set 'needs_check' flag in metadata");
+               set_cache_mode(cache, CM_FAIL);
+       }
+
+       DMERR_LIMIT("aborting current metadata transaction");
+       if (dm_cache_metadata_abort(cache->cmd)) {
+               DMERR("failed to abort metadata transaction");
+               set_cache_mode(cache, CM_FAIL);
+       }
+}
+
+static void metadata_operation_failed(struct cache *cache, const char *op, int r)
+{
+       DMERR_LIMIT("metadata operation '%s' failed: error = %d", op, r);
+       abort_transaction(cache);
+       set_cache_mode(cache, CM_READ_ONLY);
+}
+
 /*----------------------------------------------------------------
  * Migration processing
  *
@@ -1063,6 +1144,7 @@ static void migration_failure(struct dm_cache_migration *mg)
 
 static void migration_success_pre_commit(struct dm_cache_migration *mg)
 {
+       int r;
        unsigned long flags;
        struct cache *cache = mg->cache;
 
@@ -1073,8 +1155,10 @@ static void migration_success_pre_commit(struct dm_cache_migration *mg)
                return;
 
        } else if (mg->demote) {
-               if (dm_cache_remove_mapping(cache->cmd, mg->cblock)) {
+               r = dm_cache_remove_mapping(cache->cmd, mg->cblock);
+               if (r) {
                        DMWARN_LIMIT("demotion failed; couldn't update on disk metadata");
+                       metadata_operation_failed(cache, "dm_cache_remove_mapping", r);
                        policy_force_mapping(cache->policy, mg->new_oblock,
                                             mg->old_oblock);
                        if (mg->promote)
@@ -1083,8 +1167,10 @@ static void migration_success_pre_commit(struct dm_cache_migration *mg)
                        return;
                }
        } else {
-               if (dm_cache_insert_mapping(cache->cmd, mg->cblock, mg->new_oblock)) {
+               r = dm_cache_insert_mapping(cache->cmd, mg->cblock, mg->new_oblock);
+               if (r) {
                        DMWARN_LIMIT("promotion failed; couldn't update on disk metadata");
+                       metadata_operation_failed(cache, "dm_cache_insert_mapping", r);
                        policy_remove_mapping(cache->policy, mg->new_oblock);
                        free_io_migration(mg);
                        return;
@@ -1812,15 +1898,32 @@ static int need_commit_due_to_time(struct cache *cache)
               jiffies > cache->last_commit_jiffies + COMMIT_PERIOD;
 }
 
+/*
+ * A non-zero return indicates read_only or fail_io mode.
+ */
+static int commit(struct cache *cache, bool clean_shutdown)
+{
+       int r;
+
+       if (get_cache_mode(cache) >= CM_READ_ONLY)
+               return -EINVAL;
+
+       atomic_inc(&cache->stats.commit_count);
+       r = dm_cache_commit(cache->cmd, clean_shutdown);
+       if (r)
+               metadata_operation_failed(cache, "dm_cache_commit", r);
+
+       return r;
+}
+
 static int commit_if_needed(struct cache *cache)
 {
        int r = 0;
 
        if ((cache->commit_requested || need_commit_due_to_time(cache)) &&
            dm_cache_changed_this_transaction(cache->cmd)) {
-               atomic_inc(&cache->stats.commit_count);
+               r = commit(cache, false);
                cache->commit_requested = false;
-               r = dm_cache_commit(cache->cmd, false);
                cache->last_commit_jiffies = jiffies;
        }
 
@@ -1988,8 +2091,10 @@ static void process_invalidation_request(struct cache *cache, struct invalidatio
                r = policy_remove_cblock(cache->policy, to_cblock(begin));
                if (!r) {
                        r = dm_cache_remove_mapping(cache->cmd, to_cblock(begin));
-                       if (r)
+                       if (r) {
+                               metadata_operation_failed(cache, "dm_cache_remove_mapping", r);
                                break;
+                       }
 
                } else if (r == -ENODATA) {
                        /* harmless, already unmapped */
@@ -2133,12 +2238,6 @@ static void do_worker(struct work_struct *ws)
                if (commit_if_needed(cache)) {
                        process_deferred_flush_bios(cache, false);
                        process_migrations(cache, &cache->need_commit_migrations, migration_failure);
-
-                       /*
-                        * FIXME: rollback metadata or just go into a
-                        * failure mode and error everything
-                        */
-
                } else {
                        process_deferred_flush_bios(cache, true);
                        process_migrations(cache, &cache->need_commit_migrations,
@@ -2711,6 +2810,12 @@ static int cache_create(struct cache_args *ca, struct cache **result)
                goto bad;
        }
        cache->cmd = cmd;
+       set_cache_mode(cache, CM_WRITE);
+       if (get_cache_mode(cache) != CM_WRITE) {
+               *error = "Unable to get write access to metadata, please check/repair metadata.";
+               r = -EINVAL;
+               goto bad;
+       }
 
        if (passthrough_mode(&cache->features)) {
                bool all_clean;
@@ -3043,11 +3148,16 @@ static int write_dirty_bitset(struct cache *cache)
 {
        unsigned i, r;
 
+       if (get_cache_mode(cache) >= CM_READ_ONLY)
+               return -EINVAL;
+
        for (i = 0; i < from_cblock(cache->cache_size); i++) {
                r = dm_cache_set_dirty(cache->cmd, to_cblock(i),
                                       is_dirty(cache, to_cblock(i)));
-               if (r)
+               if (r) {
+                       metadata_operation_failed(cache, "dm_cache_set_dirty", r);
                        return r;
+               }
        }
 
        return 0;
@@ -3057,18 +3167,40 @@ static int write_discard_bitset(struct cache *cache)
 {
        unsigned i, r;
 
+       if (get_cache_mode(cache) >= CM_READ_ONLY)
+               return -EINVAL;
+
        r = dm_cache_discard_bitset_resize(cache->cmd, cache->discard_block_size,
                                           cache->discard_nr_blocks);
        if (r) {
                DMERR("could not resize on-disk discard bitset");
+               metadata_operation_failed(cache, "dm_cache_discard_bitset_resize", r);
                return r;
        }
 
        for (i = 0; i < from_dblock(cache->discard_nr_blocks); i++) {
                r = dm_cache_set_discard(cache->cmd, to_dblock(i),
                                         is_discarded(cache, to_dblock(i)));
-               if (r)
+               if (r) {
+                       metadata_operation_failed(cache, "dm_cache_set_discard", r);
                        return r;
+               }
+       }
+
+       return 0;
+}
+
+static int write_hints(struct cache *cache)
+{
+       int r;
+
+       if (get_cache_mode(cache) >= CM_READ_ONLY)
+               return -EINVAL;
+
+       r = dm_cache_write_hints(cache->cmd, cache->policy);
+       if (r) {
+               metadata_operation_failed(cache, "dm_cache_write_hints", r);
+               return r;
        }
 
        return 0;
@@ -3091,7 +3223,7 @@ static bool sync_metadata(struct cache *cache)
 
        save_stats(cache);
 
-       r3 = dm_cache_write_hints(cache->cmd, cache->policy);
+       r3 = write_hints(cache);
        if (r3)
                DMERR("could not write hints");
 
@@ -3100,9 +3232,9 @@ static bool sync_metadata(struct cache *cache)
         * set the clean shutdown flag.  This will effectively force every
         * dirty bit to be set on reload.
         */
-       r4 = dm_cache_commit(cache->cmd, !r1 && !r2 && !r3);
+       r4 = commit(cache, !r1 && !r2 && !r3);
        if (r4)
-               DMERR("could not write cache metadata.  Data loss may occur.");
+               DMERR("could not write cache metadata.");
 
        return !r1 && !r2 && !r3 && !r4;
 }
@@ -3118,7 +3250,8 @@ static void cache_postsuspend(struct dm_target *ti)
        requeue_deferred_cells(cache);
        stop_quiescing(cache);
 
-       (void) sync_metadata(cache);
+       if (get_cache_mode(cache) == CM_WRITE)
+               (void) sync_metadata(cache);
 }
 
 static int load_mapping(void *context, dm_oblock_t oblock, dm_cblock_t cblock,
@@ -3257,6 +3390,7 @@ static int resize_cache_dev(struct cache *cache, dm_cblock_t new_size)
        r = dm_cache_resize(cache->cmd, new_size);
        if (r) {
                DMERR("could not resize cache metadata");
+               metadata_operation_failed(cache, "dm_cache_resize", r);
                return r;
        }
 
@@ -3295,6 +3429,7 @@ static int cache_preresume(struct dm_target *ti)
                                           load_mapping, cache);
                if (r) {
                        DMERR("could not load cache mappings");
+                       metadata_operation_failed(cache, "dm_cache_load_mappings", r);
                        return r;
                }
 
@@ -3315,6 +3450,7 @@ static int cache_preresume(struct dm_target *ti)
                r = dm_cache_load_discards(cache->cmd, load_discard, &li);
                if (r) {
                        DMERR("could not load origin discards");
+                       metadata_operation_failed(cache, "dm_cache_load_discards", r);
                        return r;
                }
                set_discard_range(&li);
@@ -3342,7 +3478,7 @@ static void cache_resume(struct dm_target *ti)
  * <#demotions> <#promotions> <#dirty>
  * <#features> <features>*
  * <#core args> <core args>
- * <policy name> <#policy args> <policy args>*
+ * <policy name> <#policy args> <policy args>* <cache metadata mode>
  */
 static void cache_status(struct dm_target *ti, status_type_t type,
                         unsigned status_flags, char *result, unsigned maxlen)
@@ -3358,13 +3494,15 @@ static void cache_status(struct dm_target *ti, status_type_t type,
 
        switch (type) {
        case STATUSTYPE_INFO:
-               /* Commit to ensure statistics aren't out-of-date */
-               if (!(status_flags & DM_STATUS_NOFLUSH_FLAG) && !dm_suspended(ti)) {
-                       r = dm_cache_commit(cache->cmd, false);
-                       if (r)
-                               DMERR("could not commit metadata for accurate status");
+               if (get_cache_mode(cache) == CM_FAIL) {
+                       DMEMIT("Fail");
+                       break;
                }
 
+               /* Commit to ensure statistics aren't out-of-date */
+               if (!(status_flags & DM_STATUS_NOFLUSH_FLAG) && !dm_suspended(ti))
+                       (void) commit(cache, false);
+
                r = dm_cache_get_free_metadata_block_count(cache->cmd,
                                                           &nr_free_blocks_metadata);
                if (r) {
@@ -3413,11 +3551,16 @@ static void cache_status(struct dm_target *ti, status_type_t type,
 
                DMEMIT("%s ", dm_cache_policy_get_name(cache->policy));
                if (sz < maxlen) {
-                       r = policy_emit_config_values(cache->policy, result + sz, maxlen - sz);
+                       r = policy_emit_config_values(cache->policy, result, maxlen, &sz);
                        if (r)
                                DMERR("policy_emit_config_values returned %d", r);
                }
 
+               if (get_cache_mode(cache) == CM_READ_ONLY)
+                       DMEMIT("ro ");
+               else
+                       DMEMIT("rw ");
+
                break;
 
        case STATUSTYPE_TABLE:
@@ -3573,6 +3716,11 @@ static int cache_message(struct dm_target *ti, unsigned argc, char **argv)
        if (!argc)
                return -EINVAL;
 
+       if (get_cache_mode(cache) >= CM_READ_ONLY) {
+               DMERR("unable to service cache target messages in READ_ONLY or FAIL mode");
+               return -EOPNOTSUPP;
+       }
+
        if (!strcasecmp(argv[0], "invalidate_cblocks"))
                return process_invalidate_cblocks_message(cache, argc - 1, (const char **) argv + 1);
 
@@ -3646,7 +3794,7 @@ static void cache_io_hints(struct dm_target *ti, struct queue_limits *limits)
 
 static struct target_type cache_target = {
        .name = "cache",
-       .version = {1, 6, 0},
+       .version = {1, 7, 0},
        .module = THIS_MODULE,
        .ctr = cache_ctr,
        .dtr = cache_dtr,