return __read_block(fd, buf, offset, blocksize);
}
-static void account_unique_capacity(uint64_t offset, uint64_t *unique_capacity,
- struct zlib_ctrl *zc)
+static int account_unique_capacity(uint64_t offset, uint64_t *unique_capacity,
+ struct zlib_ctrl *zc)
{
z_stream *stream = &zc->stream;
unsigned int compressed_len;
int ret;
if (read_block(file.fd, zc->buf_in, offset))
- return;
+ return 1;
stream->next_in = zc->buf_in;
stream->avail_in = blocksize;
stream->next_out = zc->buf_out;
ret = deflate(stream, Z_FINISH);
- assert(ret != Z_STREAM_ERROR);
+ if (ret == Z_STREAM_ERROR)
+ return 1;
compressed_len = blocksize - stream->avail_out;
if (dump_output)
*unique_capacity += compressed_len;
deflateReset(stream);
+ return 0;
}
static void add_item(struct chunk *c, struct item *i)
return c;
}
-static void insert_chunk(struct item *i, uint64_t *unique_capacity,
- struct zlib_ctrl *zc)
+static int insert_chunk(struct item *i, uint64_t *unique_capacity,
+ struct zlib_ctrl *zc)
{
struct fio_rb_node **p, *parent;
struct chunk *c;
- int diff;
+ int ret, diff;
p = &rb_root.rb_node;
parent = NULL;
} else if (diff > 0) {
p = &(*p)->rb_right;
} else {
- int ret;
-
if (!collision_check)
goto add;
memcpy(c->hash, i->hash, sizeof(i->hash));
rb_link_node(&c->rb_node, parent, p);
rb_insert_color(&c->rb_node, &rb_root);
- if (compression)
- account_unique_capacity(i->offset, unique_capacity, zc);
+ if (compression) {
+ ret = account_unique_capacity(i->offset, unique_capacity, zc);
+ if (ret)
+ return ret;
+ }
add:
add_item(c, i);
+ return 0;
}
-static void insert_chunks(struct item *items, unsigned int nitems,
- uint64_t *ndupes, uint64_t *unique_capacity,
- struct zlib_ctrl *zc)
+static int insert_chunks(struct item *items, unsigned int nitems,
+ uint64_t *ndupes, uint64_t *unique_capacity,
+ struct zlib_ctrl *zc)
{
- int i;
+ int i, ret;
fio_sem_down(rb_lock);
s = sizeof(items[i].hash) / sizeof(uint32_t);
r = bloom_set(bloom, items[i].hash, s);
*ndupes += r;
- } else
- insert_chunk(&items[i], unique_capacity, zc);
+ } else {
+ ret = insert_chunk(&items[i], unique_capacity, zc);
+ if (ret)
+ break;
+ }
}
fio_sem_up(rb_lock);
+ return ret;
}
static void crc_buf(void *buf, uint32_t *hash)
uint64_t ndupes = 0;
uint64_t unique_capacity = 0;
struct item *items;
+ int ret;
offset = thread->cur_offset;
nitems++;
}
- insert_chunks(items, nitems, &ndupes, &unique_capacity, &thread->zc);
+ ret = insert_chunks(items, nitems, &ndupes, &unique_capacity, &thread->zc);
free(items);
- thread->items += nitems;
- thread->dupes += ndupes;
- thread->unique_capacity += unique_capacity;
- return 0;
+ if (!ret) {
+ thread->items += nitems;
+ thread->dupes += ndupes;
+ thread->unique_capacity += unique_capacity;
+ return 0;
+ }
+
+ return ret;
}
static void thread_init_zlib_control(struct worker_thread *thread)