t/dedupe: handle errors more gracefully
authorJens Axboe <axboe@kernel.dk>
Fri, 11 Mar 2022 12:09:20 +0000 (05:09 -0700)
committerJens Axboe <axboe@kernel.dk>
Fri, 11 Mar 2022 12:09:20 +0000 (05:09 -0700)
Don't assert for a deflate error, properly check for it and pass it
back up the stack so we can abort the thread.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
t/dedupe.c

index 109ea1af49a888b13ef6285fd450859446be22bf..561aa08d3a432776cbdee695df9e747b2359989e 100644 (file)
@@ -143,15 +143,15 @@ static int read_block(int fd, void *buf, off_t offset)
        return __read_block(fd, buf, offset, blocksize);
 }
 
-static void account_unique_capacity(uint64_t offset, uint64_t *unique_capacity,
-                                   struct zlib_ctrl *zc)
+static int account_unique_capacity(uint64_t offset, uint64_t *unique_capacity,
+                                  struct zlib_ctrl *zc)
 {
        z_stream *stream = &zc->stream;
        unsigned int compressed_len;
        int ret;
 
        if (read_block(file.fd, zc->buf_in, offset))
-               return;
+               return 1;
 
        stream->next_in = zc->buf_in;
        stream->avail_in = blocksize;
@@ -159,7 +159,8 @@ static void account_unique_capacity(uint64_t offset, uint64_t *unique_capacity,
        stream->next_out = zc->buf_out;
 
        ret = deflate(stream, Z_FINISH);
-       assert(ret != Z_STREAM_ERROR);
+       if (ret == Z_STREAM_ERROR)
+               return 1;
        compressed_len = blocksize - stream->avail_out;
 
        if (dump_output)
@@ -169,6 +170,7 @@ static void account_unique_capacity(uint64_t offset, uint64_t *unique_capacity,
 
        *unique_capacity += compressed_len;
        deflateReset(stream);
+       return 0;
 }
 
 static void add_item(struct chunk *c, struct item *i)
@@ -225,12 +227,12 @@ static struct chunk *alloc_chunk(void)
        return c;
 }
 
-static void insert_chunk(struct item *i, uint64_t *unique_capacity,
-                        struct zlib_ctrl *zc)
+static int insert_chunk(struct item *i, uint64_t *unique_capacity,
+                       struct zlib_ctrl *zc)
 {
        struct fio_rb_node **p, *parent;
        struct chunk *c;
-       int diff;
+       int ret, diff;
 
        p = &rb_root.rb_node;
        parent = NULL;
@@ -244,8 +246,6 @@ static void insert_chunk(struct item *i, uint64_t *unique_capacity,
                } else if (diff > 0) {
                        p = &(*p)->rb_right;
                } else {
-                       int ret;
-
                        if (!collision_check)
                                goto add;
 
@@ -266,17 +266,21 @@ static void insert_chunk(struct item *i, uint64_t *unique_capacity,
        memcpy(c->hash, i->hash, sizeof(i->hash));
        rb_link_node(&c->rb_node, parent, p);
        rb_insert_color(&c->rb_node, &rb_root);
-       if (compression)
-               account_unique_capacity(i->offset, unique_capacity, zc);
+       if (compression) {
+               ret = account_unique_capacity(i->offset, unique_capacity, zc);
+               if (ret)
+                       return ret;
+       }
 add:
        add_item(c, i);
+       return 0;
 }
 
-static void insert_chunks(struct item *items, unsigned int nitems,
-                         uint64_t *ndupes, uint64_t *unique_capacity,
-                         struct zlib_ctrl *zc)
+static int insert_chunks(struct item *items, unsigned int nitems,
+                        uint64_t *ndupes, uint64_t *unique_capacity,
+                        struct zlib_ctrl *zc)
 {
-       int i;
+       int i, ret;
 
        fio_sem_down(rb_lock);
 
@@ -288,11 +292,15 @@ static void insert_chunks(struct item *items, unsigned int nitems,
                        s = sizeof(items[i].hash) / sizeof(uint32_t);
                        r = bloom_set(bloom, items[i].hash, s);
                        *ndupes += r;
-               } else
-                       insert_chunk(&items[i], unique_capacity, zc);
+               } else {
+                       ret = insert_chunk(&items[i], unique_capacity, zc);
+                       if (ret)
+                               break;
+               }
        }
 
        fio_sem_up(rb_lock);
+       return ret;
 }
 
 static void crc_buf(void *buf, uint32_t *hash)
@@ -320,6 +328,7 @@ static int do_work(struct worker_thread *thread, void *buf)
        uint64_t ndupes = 0;
        uint64_t unique_capacity = 0;
        struct item *items;
+       int ret;
 
        offset = thread->cur_offset;
 
@@ -339,13 +348,17 @@ static int do_work(struct worker_thread *thread, void *buf)
                nitems++;
        }
 
-       insert_chunks(items, nitems, &ndupes, &unique_capacity, &thread->zc);
+       ret = insert_chunks(items, nitems, &ndupes, &unique_capacity, &thread->zc);
 
        free(items);
-       thread->items += nitems;
-       thread->dupes += ndupes;
-       thread->unique_capacity += unique_capacity;
-       return 0;
+       if (!ret) {
+               thread->items += nitems;
+               thread->dupes += ndupes;
+               thread->unique_capacity += unique_capacity;
+               return 0;
+       }
+
+       return ret;
 }
 
 static void thread_init_zlib_control(struct worker_thread *thread)