5f72ed76cf27a561a5f52483156b14460ca4c766
[fio.git] / t / dedupe.c
1 /*
2  * Small tool to check for dedupable blocks in a file or device. Basically
3  * just scans the filename for extents of the given size, checksums them,
4  * and orders them up.
5  */
6 #include <fcntl.h>
7 #include <inttypes.h>
8 #include <stdio.h>
9 #include <string.h>
10 #include <unistd.h>
11 #include <sys/stat.h>
12
13 #include "../fio.h"
14 #include "../flist.h"
15 #include "../log.h"
16 #include "../fio_sem.h"
17 #include "../smalloc.h"
18 #include "../minmax.h"
19 #include "../crc/md5.h"
20 #include "../os/os.h"
21 #include "../gettime.h"
22 #include "../fio_time.h"
23 #include "../lib/rbtree.h"
24
25 #include "../lib/bloom.h"
26 #include "debug.h"
27 #include "zlib.h"
28
29 struct zlib_ctrl {
30         z_stream stream;
31         unsigned char *buf_in;
32         unsigned char *buf_out;
33 };
34
35 struct worker_thread {
36         struct zlib_ctrl zc;
37         pthread_t thread;
38         uint64_t cur_offset;
39         uint64_t size;
40         unsigned long long unique_capacity;
41         unsigned long items;
42         unsigned long dupes;
43         int err;
44         int fd;
45         volatile int done;
46 };
47
48 struct extent {
49         struct flist_head list;
50         uint64_t offset;
51 };
52
53 struct chunk {
54         struct fio_rb_node rb_node;
55         uint64_t count;
56         uint32_t hash[MD5_HASH_WORDS];
57         struct flist_head extent_list[0];
58 };
59
60 struct item {
61         uint64_t offset;
62         uint32_t hash[MD5_HASH_WORDS];
63 };
64
65 static struct rb_root rb_root;
66 static struct bloom *bloom;
67 static struct fio_sem *rb_lock;
68
69 static unsigned int blocksize = 4096;
70 static unsigned int num_threads;
71 static unsigned int chunk_size = 1048576;
72 static unsigned int dump_output;
73 static unsigned int odirect;
74 static unsigned int collision_check;
75 static unsigned int print_progress = 1;
76 static unsigned int use_bloom = 1;
77 static unsigned int compression = 0;
78
79 static uint64_t total_size;
80 static uint64_t cur_offset;
81 static struct fio_sem *size_lock;
82
83 static struct fio_file file;
84
85 static uint64_t get_size(struct fio_file *f, struct stat *sb)
86 {
87         uint64_t ret;
88
89         if (S_ISBLK(sb->st_mode)) {
90                 unsigned long long bytes = 0;
91
92                 if (blockdev_size(f, &bytes)) {
93                         log_err("dedupe: failed getting bdev size\n");
94                         return 0;
95                 }
96                 ret = bytes;
97         } else
98                 ret = sb->st_size;
99
100         return (ret & ~((uint64_t)blocksize - 1));
101 }
102
103 static int get_work(uint64_t *offset, uint64_t *size)
104 {
105         uint64_t this_chunk;
106         int ret = 1;
107
108         fio_sem_down(size_lock);
109
110         if (cur_offset < total_size) {
111                 *offset = cur_offset;
112                 this_chunk = min((uint64_t)chunk_size, total_size - cur_offset);
113                 *size = this_chunk;
114                 cur_offset += this_chunk;
115                 ret = 0;
116         }
117
118         fio_sem_up(size_lock);
119         return ret;
120 }
121
122 static int __read_block(int fd, void *buf, off_t offset, size_t count)
123 {
124         ssize_t ret;
125
126         ret = pread(fd, buf, count, offset);
127         if (ret < 0) {
128                 perror("pread");
129                 return 1;
130         } else if (!ret)
131                 return 1;
132         else if (ret != count) {
133                 log_err("dedupe: short read on block\n");
134                 return 1;
135         }
136
137         return 0;
138 }
139
140 static int read_block(int fd, void *buf, off_t offset)
141 {
142         return __read_block(fd, buf, offset, blocksize);
143 }
144
145 static void account_unique_capacity(uint64_t offset, uint64_t *unique_capacity, struct zlib_ctrl *zc)
146 {
147         z_stream *stream = &zc->stream;
148         unsigned int compressed_len;
149         int ret;
150
151         if (read_block(file.fd, zc->buf_in, offset))
152                 return;
153
154         stream->next_in = zc->buf_in;
155         stream->avail_in = blocksize;
156         stream->avail_out = deflateBound(stream, blocksize);
157         stream->next_out = zc->buf_out;
158
159         ret = deflate(stream, Z_FINISH);
160         assert(ret != Z_STREAM_ERROR);
161         compressed_len = blocksize - stream->avail_out;
162
163         if (dump_output)
164                 printf("offset 0x%lx compressed to %d blocksize %d ratio %.2f \n",
165                                 (unsigned long) offset, compressed_len, blocksize,
166                                 (float)compressed_len / (float)blocksize);
167
168         *unique_capacity += compressed_len;
169
170         deflateReset(stream);
171 }
172
173 static void add_item(struct chunk *c, struct item *i)
174 {
175         /*      
176          * Save some memory and don't add extent items, if we don't
177          * use them.
178          */
179         if (dump_output || collision_check) {
180                 struct extent *e;
181
182                 e = malloc(sizeof(*e));
183                 e->offset = i->offset;
184                 flist_add_tail(&e->list, &c->extent_list[0]);
185         }
186
187         c->count++;
188 }
189
190 static int col_check(struct chunk *c, struct item *i)
191 {
192         struct extent *e;
193         char *cbuf, *ibuf;
194         int ret = 1;
195
196         cbuf = fio_memalign(blocksize, blocksize, false);
197         ibuf = fio_memalign(blocksize, blocksize, false);
198
199         e = flist_entry(c->extent_list[0].next, struct extent, list);
200         if (read_block(file.fd, cbuf, e->offset))
201                 goto out;
202
203         if (read_block(file.fd, ibuf, i->offset))
204                 goto out;
205
206         ret = memcmp(ibuf, cbuf, blocksize);
207 out:
208         fio_memfree(cbuf, blocksize, false);
209         fio_memfree(ibuf, blocksize, false);
210         return ret;
211 }
212
213 static struct chunk *alloc_chunk(void)
214 {
215         struct chunk *c;
216
217         if (collision_check || dump_output) {
218                 c = malloc(sizeof(struct chunk) + sizeof(struct flist_head));
219                 INIT_FLIST_HEAD(&c->extent_list[0]);
220         } else
221                 c = malloc(sizeof(struct chunk));
222
223         return c;
224 }
225
226 static void insert_chunk(struct item *i, uint64_t *unique_capacity, struct zlib_ctrl *zc)
227 {
228         struct fio_rb_node **p, *parent;
229         struct chunk *c;
230         int diff;
231
232         p = &rb_root.rb_node;
233         parent = NULL;
234         while (*p) {
235                 parent = *p;
236
237                 c = rb_entry(parent, struct chunk, rb_node);
238                 diff = memcmp(i->hash, c->hash, sizeof(i->hash));
239                 if (diff < 0)
240                         p = &(*p)->rb_left;
241                 else if (diff > 0)
242                         p = &(*p)->rb_right;
243                 else {
244                         int ret;
245
246                         if (!collision_check)
247                                 goto add;
248
249                         fio_sem_up(rb_lock);
250                         ret = col_check(c, i);
251                         fio_sem_down(rb_lock);
252
253                         if (!ret)
254                                 goto add;
255
256                         p = &(*p)->rb_right;
257                 }
258         }
259
260         c = alloc_chunk();
261         RB_CLEAR_NODE(&c->rb_node);
262         c->count = 0;
263         memcpy(c->hash, i->hash, sizeof(i->hash));
264         rb_link_node(&c->rb_node, parent, p);
265         rb_insert_color(&c->rb_node, &rb_root);
266         if (compression)
267                 account_unique_capacity(i->offset, unique_capacity, zc);
268 add:
269         add_item(c, i);
270 }
271
272 static void insert_chunks(struct item *items, unsigned int nitems,
273                           uint64_t *ndupes, uint64_t *unique_capacity, struct zlib_ctrl *zc)
274 {
275         int i;
276
277         fio_sem_down(rb_lock);
278
279         for (i = 0; i < nitems; i++) {
280                 if (bloom) {
281                         unsigned int s;
282                         int r;
283
284                         s = sizeof(items[i].hash) / sizeof(uint32_t);
285                         r = bloom_set(bloom, items[i].hash, s);
286                         *ndupes += r;
287                 } else
288                         insert_chunk(&items[i], unique_capacity, zc);
289         }
290
291         fio_sem_up(rb_lock);
292 }
293
294 static void crc_buf(void *buf, uint32_t *hash)
295 {
296         struct fio_md5_ctx ctx = { .hash = hash };
297
298         fio_md5_init(&ctx);
299         fio_md5_update(&ctx, buf, blocksize);
300         fio_md5_final(&ctx);
301 }
302
303 static unsigned int read_blocks(int fd, void *buf, off_t offset, size_t size)
304 {
305         if (__read_block(fd, buf, offset, size))
306                 return 0;
307
308         return size / blocksize;
309 }
310
311 static int do_work(struct worker_thread *thread, void *buf)
312 {
313         unsigned int nblocks, i;
314         off_t offset;
315         int nitems = 0;
316         uint64_t ndupes = 0;
317         uint64_t unique_capacity = 0;
318         struct item *items;
319
320         offset = thread->cur_offset;
321
322         nblocks = read_blocks(thread->fd, buf, offset, min(thread->size, (uint64_t)chunk_size));
323         if (!nblocks)
324                 return 1;
325
326         items = malloc(sizeof(*items) * nblocks);
327
328         for (i = 0; i < nblocks; i++) {
329                 void *thisptr = buf + (i * blocksize);
330
331                 items[i].offset = offset;
332                 crc_buf(thisptr, items[i].hash);
333                 offset += blocksize;
334                 nitems++;
335         }
336
337         insert_chunks(items, nitems, &ndupes, &unique_capacity, &thread->zc);
338
339         free(items);
340         thread->items += nitems;
341         thread->dupes += ndupes;
342         thread->unique_capacity += unique_capacity;
343         return 0;
344 }
345
346 static void thread_init_zlib_control(struct worker_thread *thread)
347 {
348         z_stream *stream = &thread->zc.stream;
349         stream->zalloc = Z_NULL;
350         stream->zfree = Z_NULL;
351         stream->opaque = Z_NULL;
352
353         if (deflateInit(stream, Z_DEFAULT_COMPRESSION) != Z_OK)
354                 return;
355
356         thread->zc.buf_in = fio_memalign(blocksize, blocksize, false);
357         thread->zc.buf_out = fio_memalign(blocksize, deflateBound(stream, blocksize), false);
358 }
359
360 static void *thread_fn(void *data)
361 {
362         struct worker_thread *thread = data;
363         void *buf;
364
365         buf = fio_memalign(blocksize, chunk_size, false);
366
367         thread_init_zlib_control(thread);
368
369         do {
370                 if (get_work(&thread->cur_offset, &thread->size)) {
371                         thread->err = 1;
372                         break;
373                 }
374                 if (do_work(thread, buf)) {
375                         thread->err = 1;
376                         break;
377                 }
378         } while (1);
379
380         thread->done = 1;
381         fio_memfree(buf, chunk_size, false);
382         return NULL;
383 }
384
385 static void show_progress(struct worker_thread *threads, unsigned long total)
386 {
387         unsigned long last_nitems = 0;
388         struct timespec last_tv;
389
390         fio_gettime(&last_tv, NULL);
391
392         while (print_progress) {
393                 unsigned long this_items;
394                 unsigned long nitems = 0;
395                 uint64_t tdiff;
396                 float perc;
397                 int some_done = 0;
398                 int i;
399
400                 for (i = 0; i < num_threads; i++) {
401                         nitems += threads[i].items;
402                         some_done = threads[i].done;
403                         if (some_done)
404                                 break;
405                 }
406
407                 if (some_done)
408                         break;
409
410                 perc = (float) nitems / (float) total;
411                 perc *= 100.0;
412                 this_items = nitems - last_nitems;
413                 this_items *= blocksize;
414                 tdiff = mtime_since_now(&last_tv);
415                 if (tdiff) {
416                         this_items = (this_items * 1000) / (tdiff * 1024);
417                         printf("%3.2f%% done (%luKiB/sec)\r", perc, this_items);
418                         last_nitems = nitems;
419                         fio_gettime(&last_tv, NULL);
420                 } else
421                         printf("%3.2f%% done\r", perc);
422                 fflush(stdout);
423                 usleep(250000);
424         };
425 }
426
427 static int run_dedupe_threads(struct fio_file *f, uint64_t dev_size,
428                               uint64_t *nextents, uint64_t *nchunks, uint64_t *unique_capacity)
429 {
430         struct worker_thread *threads;
431         unsigned long nitems, total_items;
432         int i, err = 0;
433
434         total_size = dev_size;
435         total_items = dev_size / blocksize;
436         cur_offset = 0;
437         size_lock = fio_sem_init(FIO_SEM_UNLOCKED);
438
439         threads = malloc(num_threads * sizeof(struct worker_thread));
440         for (i = 0; i < num_threads; i++) {
441                 memset(&threads[i], 0, sizeof(struct worker_thread));
442                 threads[i].fd = f->fd;
443
444                 err = pthread_create(&threads[i].thread, NULL, thread_fn, &threads[i]);
445                 if (err) {
446                         log_err("fio: thread startup failed\n");
447                         break;
448                 }
449         }
450
451         show_progress(threads, total_items);
452
453         nitems = 0;
454         *nextents = 0;
455         *nchunks = 1;
456         *unique_capacity = 0;
457         for (i = 0; i < num_threads; i++) {
458                 void *ret;
459                 pthread_join(threads[i].thread, &ret);
460                 nitems += threads[i].items;
461                 *nchunks += threads[i].dupes;
462                 *unique_capacity += threads[i].unique_capacity;
463         }
464
465         printf("Threads(%u): %lu items processed\n", num_threads, nitems);
466
467         *nextents = nitems;
468         *nchunks = nitems - *nchunks;
469
470         fio_sem_remove(size_lock);
471         free(threads);
472         return err;
473 }
474
475 static int dedupe_check(const char *filename, uint64_t *nextents,
476                         uint64_t *nchunks, uint64_t *unique_capacity)
477 {
478         uint64_t dev_size;
479         struct stat sb;
480         int flags;
481
482         flags = O_RDONLY;
483         if (odirect)
484                 flags |= OS_O_DIRECT;
485
486         memset(&file, 0, sizeof(file));
487         file.file_name = strdup(filename);
488
489         file.fd = open(filename, flags);
490         if (file.fd == -1) {
491                 perror("open");
492                 goto err;
493         }
494
495         if (fstat(file.fd, &sb) < 0) {
496                 perror("fstat");
497                 goto err;
498         }
499
500         dev_size = get_size(&file, &sb);
501         if (!dev_size)
502                 goto err;
503
504         if (use_bloom) {
505                 uint64_t bloom_entries;
506
507                 bloom_entries = 8 * (dev_size / blocksize);
508                 bloom = bloom_new(bloom_entries);
509         }
510
511         printf("Will check <%s>, size <%llu>, using %u threads\n", filename, (unsigned long long) dev_size, num_threads);
512
513         return run_dedupe_threads(&file, dev_size, nextents, nchunks, unique_capacity);
514 err:
515         if (file.fd != -1)
516                 close(file.fd);
517         free(file.file_name);
518         return 1;
519 }
520
521 static void show_chunk(struct chunk *c)
522 {
523         struct flist_head *n;
524         struct extent *e;
525
526         printf("c hash %8x %8x %8x %8x, count %lu\n", c->hash[0], c->hash[1], c->hash[2], c->hash[3], (unsigned long) c->count);
527         flist_for_each(n, &c->extent_list[0]) {
528                 e = flist_entry(n, struct extent, list);
529                 printf("\toffset %llu\n", (unsigned long long) e->offset);
530         }
531 }
532
533 static const char *capacity_unit[] = {"b","KB", "MB", "GB", "TB", "PB", "EB"};
534
535 static uint64_t bytes_to_human_readable_unit(uint64_t n, const char **unit_out)
536 {
537         uint8_t i = 0;
538
539         while (n >= 1024) {
540                 i++;
541                 n /= 1024;
542         }
543
544         *unit_out = capacity_unit[i];
545
546         return n;
547 }
548
549 static void show_stat(uint64_t nextents, uint64_t nchunks, uint64_t ndupextents, uint64_t unique_capacity)
550 {
551         double perc, ratio;
552         const char *unit;
553         uint64_t uc_human;
554
555         printf("Extents=%lu, Unique extents=%lu", (unsigned long) nextents, (unsigned long) nchunks);
556         if (!bloom)
557                 printf(" Duplicated extents=%lu", (unsigned long) ndupextents);
558         printf("\n");
559
560         if (nchunks) {
561                 ratio = (double) nextents / (double) nchunks;
562                 printf("De-dupe ratio: 1:%3.2f\n", ratio - 1.0);
563         } else
564                 printf("De-dupe ratio: 1:infinite\n");
565
566         if (ndupextents)
567                 printf("De-dupe working set at least: %3.2f%%\n", 100.0 * (double) ndupextents / (double) nextents);
568
569         perc = 1.00 - ((double) nchunks / (double) nextents);
570         perc *= 100.0;
571         printf("Fio setting: dedupe_percentage=%u\n", (int) (perc + 0.50));
572
573
574         if (compression) {
575                 uc_human = bytes_to_human_readable_unit(unique_capacity, &unit);
576                 printf("Unique capacity %lu%s\n", (unsigned long) uc_human, unit);
577         }
578 }
579
580 static void iter_rb_tree(uint64_t *nextents, uint64_t *nchunks, uint64_t *ndupextents)
581 {
582         struct fio_rb_node *n;
583         *nchunks = *nextents = *ndupextents = 0;
584
585         n = rb_first(&rb_root);
586         if (!n)
587                 return;
588
589         do {
590                 struct chunk *c;
591
592                 c = rb_entry(n, struct chunk, rb_node);
593                 (*nchunks)++;
594                 *nextents += c->count;
595                 *ndupextents += (c->count > 1);
596
597                 if (dump_output)
598                         show_chunk(c);
599
600         } while ((n = rb_next(n)) != NULL);
601 }
602
603 static int usage(char *argv[])
604 {
605         log_err("Check for dedupable blocks on a device/file\n\n");
606         log_err("%s: [options] <device or file>\n", argv[0]);
607         log_err("\t-b\tChunk size to use\n");
608         log_err("\t-t\tNumber of threads to use\n");
609         log_err("\t-d\tFull extent/chunk debug output\n");
610         log_err("\t-o\tUse O_DIRECT\n");
611         log_err("\t-c\tFull collision check\n");
612         log_err("\t-B\tUse probabilistic bloom filter\n");
613         log_err("\t-p\tPrint progress indicator\n");
614         log_err("\t-C\tCalculate compressible size\n");
615         return 1;
616 }
617
618 int main(int argc, char *argv[])
619 {
620         uint64_t nextents = 0, nchunks = 0, ndupextents = 0, unique_capacity;
621         int c, ret;
622
623         arch_init(argv);
624         debug_init();
625
626         while ((c = getopt(argc, argv, "b:t:d:o:c:p:B:C:")) != -1) {
627                 switch (c) {
628                 case 'b':
629                         blocksize = atoi(optarg);
630                         break;
631                 case 't':
632                         num_threads = atoi(optarg);
633                         break;
634                 case 'd':
635                         dump_output = atoi(optarg);
636                         break;
637                 case 'o':
638                         odirect = atoi(optarg);
639                         break;
640                 case 'c':
641                         collision_check = atoi(optarg);
642                         break;
643                 case 'p':
644                         print_progress = atoi(optarg);
645                         break;
646                 case 'B':
647                         use_bloom = atoi(optarg);
648                         break;
649                 case 'C':
650                         compression = atoi(optarg);
651                         break;
652                 case '?':
653                 default:
654                         return usage(argv);
655                 }
656         }
657
658         if (collision_check || dump_output || compression)
659                 use_bloom = 0;
660
661         if (!num_threads)
662                 num_threads = cpus_online();
663
664         if (argc == optind)
665                 return usage(argv);
666
667         sinit();
668
669         rb_root = RB_ROOT;
670         rb_lock = fio_sem_init(FIO_SEM_UNLOCKED);
671
672         ret = dedupe_check(argv[optind], &nextents, &nchunks, &unique_capacity);
673
674         if (!ret) {
675                 if (!bloom)
676                         iter_rb_tree(&nextents, &nchunks, &ndupextents);
677
678                 show_stat(nextents, nchunks, ndupextents, unique_capacity);
679         }
680
681         fio_sem_remove(rb_lock);
682         if (bloom)
683                 bloom_free(bloom);
684         scleanup();
685         return ret;
686 }