Merge branch 'directory-operation' of https://github.com/friendy-su/fio
[fio.git] / t / dedupe.c
1 /*
2  * Small tool to check for dedupable blocks in a file or device. Basically
3  * just scans the filename for extents of the given size, checksums them,
4  * and orders them up.
5  */
6 #include <fcntl.h>
7 #include <inttypes.h>
8 #include <stdio.h>
9 #include <string.h>
10 #include <unistd.h>
11 #include <sys/stat.h>
12
13 #include "../fio.h"
14 #include "../flist.h"
15 #include "../log.h"
16 #include "../fio_sem.h"
17 #include "../smalloc.h"
18 #include "../minmax.h"
19 #include "../crc/md5.h"
20 #include "../os/os.h"
21 #include "../gettime.h"
22 #include "../fio_time.h"
23 #include "../lib/rbtree.h"
24
25 #include "../lib/bloom.h"
26 #include "debug.h"
27 #include "zlib.h"
28
29 struct zlib_ctrl {
30         z_stream stream;
31         unsigned char *buf_in;
32         unsigned char *buf_out;
33 };
34
35 struct worker_thread {
36         struct zlib_ctrl zc;
37         pthread_t thread;
38         uint64_t cur_offset;
39         uint64_t size;
40         unsigned long long unique_capacity;
41         unsigned long items;
42         unsigned long dupes;
43         int err;
44         int fd;
45         volatile int done;
46 };
47
48 struct extent {
49         struct flist_head list;
50         uint64_t offset;
51 };
52
53 struct chunk {
54         struct fio_rb_node rb_node;
55         uint64_t count;
56         uint32_t hash[MD5_HASH_WORDS];
57         struct flist_head extent_list[0];
58 };
59
60 struct item {
61         uint64_t offset;
62         uint32_t hash[MD5_HASH_WORDS];
63 };
64
65 static struct rb_root rb_root;
66 static struct bloom *bloom;
67 static struct fio_sem *rb_lock;
68
69 static unsigned int blocksize = 4096;
70 static unsigned int num_threads;
71 static unsigned int chunk_size = 1048576;
72 static unsigned int dump_output;
73 static unsigned int odirect;
74 static unsigned int collision_check;
75 static unsigned int print_progress = 1;
76 static unsigned int use_bloom = 1;
77 static unsigned int compression = 0;
78
79 static uint64_t total_size;
80 static uint64_t cur_offset;
81 static struct fio_sem *size_lock;
82
83 static struct fio_file file;
84
85 static uint64_t get_size(struct fio_file *f, struct stat *sb)
86 {
87         uint64_t ret;
88
89         if (S_ISBLK(sb->st_mode)) {
90                 unsigned long long bytes = 0;
91
92                 if (blockdev_size(f, &bytes)) {
93                         log_err("dedupe: failed getting bdev size\n");
94                         return 0;
95                 }
96                 ret = bytes;
97         } else {
98                 ret = sb->st_size;
99         }
100
101         return (ret & ~((uint64_t)blocksize - 1));
102 }
103
104 static int get_work(uint64_t *offset, uint64_t *size)
105 {
106         uint64_t this_chunk;
107         int ret = 1;
108
109         fio_sem_down(size_lock);
110
111         if (cur_offset < total_size) {
112                 *offset = cur_offset;
113                 this_chunk = min((uint64_t)chunk_size, total_size - cur_offset);
114                 *size = this_chunk;
115                 cur_offset += this_chunk;
116                 ret = 0;
117         }
118
119         fio_sem_up(size_lock);
120         return ret;
121 }
122
123 static int __read_block(int fd, void *buf, off_t offset, size_t count)
124 {
125         ssize_t ret;
126
127         ret = pread(fd, buf, count, offset);
128         if (ret < 0) {
129                 perror("pread");
130                 return 1;
131         } else if (!ret) {
132                 return 1;
133         } else if (ret != count) {
134                 log_err("dedupe: short read on block\n");
135                 return 1;
136         }
137
138         return 0;
139 }
140
141 static int read_block(int fd, void *buf, off_t offset)
142 {
143         return __read_block(fd, buf, offset, blocksize);
144 }
145
146 static int account_unique_capacity(uint64_t offset, uint64_t *unique_capacity,
147                                    struct zlib_ctrl *zc)
148 {
149         z_stream *stream = &zc->stream;
150         unsigned int compressed_len;
151         int ret;
152
153         if (read_block(file.fd, zc->buf_in, offset))
154                 return 1;
155
156         stream->next_in = zc->buf_in;
157         stream->avail_in = blocksize;
158         stream->avail_out = deflateBound(stream, blocksize);
159         stream->next_out = zc->buf_out;
160
161         ret = deflate(stream, Z_FINISH);
162         if (ret == Z_STREAM_ERROR)
163                 return 1;
164         compressed_len = blocksize - stream->avail_out;
165
166         if (dump_output)
167                 printf("offset 0x%lx compressed to %d blocksize %d ratio %.2f \n",
168                                 (unsigned long) offset, compressed_len, blocksize,
169                                 (float)compressed_len / (float)blocksize);
170
171         *unique_capacity += compressed_len;
172         deflateReset(stream);
173         return 0;
174 }
175
176 static void add_item(struct chunk *c, struct item *i)
177 {
178         /*      
179          * Save some memory and don't add extent items, if we don't
180          * use them.
181          */
182         if (dump_output || collision_check) {
183                 struct extent *e;
184
185                 e = malloc(sizeof(*e));
186                 e->offset = i->offset;
187                 flist_add_tail(&e->list, &c->extent_list[0]);
188         }
189
190         c->count++;
191 }
192
193 static int col_check(struct chunk *c, struct item *i)
194 {
195         struct extent *e;
196         char *cbuf, *ibuf;
197         int ret = 1;
198
199         cbuf = fio_memalign(blocksize, blocksize, false);
200         ibuf = fio_memalign(blocksize, blocksize, false);
201
202         e = flist_entry(c->extent_list[0].next, struct extent, list);
203         if (read_block(file.fd, cbuf, e->offset))
204                 goto out;
205
206         if (read_block(file.fd, ibuf, i->offset))
207                 goto out;
208
209         ret = memcmp(ibuf, cbuf, blocksize);
210 out:
211         fio_memfree(cbuf, blocksize, false);
212         fio_memfree(ibuf, blocksize, false);
213         return ret;
214 }
215
216 static struct chunk *alloc_chunk(void)
217 {
218         struct chunk *c;
219
220         if (collision_check || dump_output) {
221                 c = malloc(sizeof(struct chunk) + sizeof(struct flist_head));
222                 INIT_FLIST_HEAD(&c->extent_list[0]);
223         } else {
224                 c = malloc(sizeof(struct chunk));
225         }
226
227         return c;
228 }
229
230 static int insert_chunk(struct item *i, uint64_t *unique_capacity,
231                         struct zlib_ctrl *zc)
232 {
233         struct fio_rb_node **p, *parent;
234         struct chunk *c;
235         int ret, diff;
236
237         p = &rb_root.rb_node;
238         parent = NULL;
239         while (*p) {
240                 parent = *p;
241
242                 c = rb_entry(parent, struct chunk, rb_node);
243                 diff = memcmp(i->hash, c->hash, sizeof(i->hash));
244                 if (diff < 0) {
245                         p = &(*p)->rb_left;
246                 } else if (diff > 0) {
247                         p = &(*p)->rb_right;
248                 } else {
249                         if (!collision_check)
250                                 goto add;
251
252                         fio_sem_up(rb_lock);
253                         ret = col_check(c, i);
254                         fio_sem_down(rb_lock);
255
256                         if (!ret)
257                                 goto add;
258
259                         p = &(*p)->rb_right;
260                 }
261         }
262
263         c = alloc_chunk();
264         RB_CLEAR_NODE(&c->rb_node);
265         c->count = 0;
266         memcpy(c->hash, i->hash, sizeof(i->hash));
267         rb_link_node(&c->rb_node, parent, p);
268         rb_insert_color(&c->rb_node, &rb_root);
269         if (compression) {
270                 ret = account_unique_capacity(i->offset, unique_capacity, zc);
271                 if (ret)
272                         return ret;
273         }
274 add:
275         add_item(c, i);
276         return 0;
277 }
278
279 static int insert_chunks(struct item *items, unsigned int nitems,
280                          uint64_t *ndupes, uint64_t *unique_capacity,
281                          struct zlib_ctrl *zc)
282 {
283         int i, ret = 0;
284
285         fio_sem_down(rb_lock);
286
287         for (i = 0; i < nitems; i++) {
288                 if (bloom) {
289                         unsigned int s;
290                         int r;
291
292                         s = sizeof(items[i].hash) / sizeof(uint32_t);
293                         r = bloom_set(bloom, items[i].hash, s);
294                         *ndupes += r;
295                 } else {
296                         ret = insert_chunk(&items[i], unique_capacity, zc);
297                         if (ret)
298                                 break;
299                 }
300         }
301
302         fio_sem_up(rb_lock);
303         return ret;
304 }
305
306 static void crc_buf(void *buf, uint32_t *hash)
307 {
308         struct fio_md5_ctx ctx = { .hash = hash };
309
310         fio_md5_init(&ctx);
311         fio_md5_update(&ctx, buf, blocksize);
312         fio_md5_final(&ctx);
313 }
314
315 static unsigned int read_blocks(int fd, void *buf, off_t offset, size_t size)
316 {
317         if (__read_block(fd, buf, offset, size))
318                 return 0;
319
320         return size / blocksize;
321 }
322
323 static int do_work(struct worker_thread *thread, void *buf)
324 {
325         unsigned int nblocks, i;
326         off_t offset;
327         int nitems = 0;
328         uint64_t ndupes = 0;
329         uint64_t unique_capacity = 0;
330         struct item *items;
331         int ret;
332
333         offset = thread->cur_offset;
334
335         nblocks = read_blocks(thread->fd, buf, offset,
336                                 min(thread->size, (uint64_t) chunk_size));
337         if (!nblocks)
338                 return 1;
339
340         items = malloc(sizeof(*items) * nblocks);
341
342         for (i = 0; i < nblocks; i++) {
343                 void *thisptr = buf + (i * blocksize);
344
345                 items[i].offset = offset;
346                 crc_buf(thisptr, items[i].hash);
347                 offset += blocksize;
348                 nitems++;
349         }
350
351         ret = insert_chunks(items, nitems, &ndupes, &unique_capacity, &thread->zc);
352
353         free(items);
354         if (!ret) {
355                 thread->items += nitems;
356                 thread->dupes += ndupes;
357                 thread->unique_capacity += unique_capacity;
358                 return 0;
359         }
360
361         return ret;
362 }
363
364 static void thread_init_zlib_control(struct worker_thread *thread)
365 {
366         size_t sz;
367
368         z_stream *stream = &thread->zc.stream;
369         stream->zalloc = Z_NULL;
370         stream->zfree = Z_NULL;
371         stream->opaque = Z_NULL;
372
373         if (deflateInit(stream, Z_DEFAULT_COMPRESSION) != Z_OK)
374                 return;
375
376         thread->zc.buf_in = fio_memalign(blocksize, blocksize, false);
377         sz = deflateBound(stream, blocksize);
378         thread->zc.buf_out = fio_memalign(blocksize, sz, false);
379 }
380
381 static void *thread_fn(void *data)
382 {
383         struct worker_thread *thread = data;
384         void *buf;
385
386         buf = fio_memalign(blocksize, chunk_size, false);
387         thread_init_zlib_control(thread);
388
389         do {
390                 if (get_work(&thread->cur_offset, &thread->size)) {
391                         thread->err = 1;
392                         break;
393                 }
394                 if (do_work(thread, buf)) {
395                         thread->err = 1;
396                         break;
397                 }
398         } while (1);
399
400         thread->done = 1;
401         fio_memfree(buf, chunk_size, false);
402         return NULL;
403 }
404
405 static void show_progress(struct worker_thread *threads, unsigned long total)
406 {
407         unsigned long last_nitems = 0;
408         struct timespec last_tv;
409
410         fio_gettime(&last_tv, NULL);
411
412         while (print_progress) {
413                 unsigned long this_items;
414                 unsigned long nitems = 0;
415                 uint64_t tdiff;
416                 float perc;
417                 int some_done = 0;
418                 int i;
419
420                 for (i = 0; i < num_threads; i++) {
421                         nitems += threads[i].items;
422                         some_done = threads[i].done;
423                         if (some_done)
424                                 break;
425                 }
426
427                 if (some_done)
428                         break;
429
430                 perc = (float) nitems / (float) total;
431                 perc *= 100.0;
432                 this_items = nitems - last_nitems;
433                 this_items *= blocksize;
434                 tdiff = mtime_since_now(&last_tv);
435                 if (tdiff) {
436                         this_items = (this_items * 1000) / (tdiff * 1024);
437                         printf("%3.2f%% done (%luKiB/sec)\r", perc, this_items);
438                         last_nitems = nitems;
439                         fio_gettime(&last_tv, NULL);
440                 } else {
441                         printf("%3.2f%% done\r", perc);
442                 }
443                 fflush(stdout);
444                 usleep(250000);
445         };
446 }
447
448 static int run_dedupe_threads(struct fio_file *f, uint64_t dev_size,
449                               uint64_t *nextents, uint64_t *nchunks,
450                               uint64_t *unique_capacity)
451 {
452         struct worker_thread *threads;
453         unsigned long nitems, total_items;
454         int i, err = 0;
455
456         total_size = dev_size;
457         total_items = dev_size / blocksize;
458         cur_offset = 0;
459         size_lock = fio_sem_init(FIO_SEM_UNLOCKED);
460
461         threads = malloc(num_threads * sizeof(struct worker_thread));
462         for (i = 0; i < num_threads; i++) {
463                 memset(&threads[i], 0, sizeof(struct worker_thread));
464                 threads[i].fd = f->fd;
465
466                 err = pthread_create(&threads[i].thread, NULL, thread_fn, &threads[i]);
467                 if (err) {
468                         log_err("fio: thread startup failed\n");
469                         break;
470                 }
471         }
472
473         show_progress(threads, total_items);
474
475         nitems = 0;
476         *nextents = 0;
477         *nchunks = 1;
478         *unique_capacity = 0;
479         for (i = 0; i < num_threads; i++) {
480                 void *ret;
481                 pthread_join(threads[i].thread, &ret);
482                 nitems += threads[i].items;
483                 *nchunks += threads[i].dupes;
484                 *unique_capacity += threads[i].unique_capacity;
485         }
486
487         printf("Threads(%u): %lu items processed\n", num_threads, nitems);
488
489         *nextents = nitems;
490         *nchunks = nitems - *nchunks;
491
492         fio_sem_remove(size_lock);
493         free(threads);
494         return err;
495 }
496
497 static int dedupe_check(const char *filename, uint64_t *nextents,
498                         uint64_t *nchunks, uint64_t *unique_capacity)
499 {
500         uint64_t dev_size;
501         struct stat sb;
502         int flags;
503
504         flags = O_RDONLY;
505         if (odirect)
506                 flags |= OS_O_DIRECT;
507
508         memset(&file, 0, sizeof(file));
509         file.file_name = strdup(filename);
510
511         file.fd = open(filename, flags);
512         if (file.fd == -1) {
513                 perror("open");
514                 goto err;
515         }
516
517         if (fstat(file.fd, &sb) < 0) {
518                 perror("fstat");
519                 goto err;
520         }
521
522         dev_size = get_size(&file, &sb);
523         if (!dev_size)
524                 goto err;
525
526         if (use_bloom) {
527                 uint64_t bloom_entries;
528
529                 bloom_entries = 8 * (dev_size / blocksize);
530                 bloom = bloom_new(bloom_entries);
531         }
532
533         printf("Will check <%s>, size <%llu>, using %u threads\n", filename,
534                                 (unsigned long long) dev_size, num_threads);
535
536         return run_dedupe_threads(&file, dev_size, nextents, nchunks,
537                                         unique_capacity);
538 err:
539         if (file.fd != -1)
540                 close(file.fd);
541         free(file.file_name);
542         return 1;
543 }
544
545 static void show_chunk(struct chunk *c)
546 {
547         struct flist_head *n;
548         struct extent *e;
549
550         printf("c hash %8x %8x %8x %8x, count %lu\n", c->hash[0], c->hash[1],
551                         c->hash[2], c->hash[3], (unsigned long) c->count);
552         flist_for_each(n, &c->extent_list[0]) {
553                 e = flist_entry(n, struct extent, list);
554                 printf("\toffset %llu\n", (unsigned long long) e->offset);
555         }
556 }
557
558 static const char *capacity_unit[] = {"b","KB", "MB", "GB", "TB", "PB", "EB"};
559
560 static uint64_t bytes_to_human_readable_unit(uint64_t n, const char **unit_out)
561 {
562         uint8_t i = 0;
563
564         while (n >= 1024) {
565                 i++;
566                 n /= 1024;
567         }
568
569         *unit_out = capacity_unit[i];
570         return n;
571 }
572
573 static void show_stat(uint64_t nextents, uint64_t nchunks, uint64_t ndupextents,
574                       uint64_t unique_capacity)
575 {
576         double perc, ratio;
577         const char *unit;
578         uint64_t uc_human;
579
580         printf("Extents=%lu, Unique extents=%lu", (unsigned long) nextents,
581                                                 (unsigned long) nchunks);
582         if (!bloom)
583                 printf(" Duplicated extents=%lu", (unsigned long) ndupextents);
584         printf("\n");
585
586         if (nchunks) {
587                 ratio = (double) nextents / (double) nchunks;
588                 printf("De-dupe ratio: 1:%3.2f\n", ratio - 1.0);
589         } else {
590                 printf("De-dupe ratio: 1:infinite\n");
591         }
592
593         if (ndupextents) {
594                 printf("De-dupe working set at least: %3.2f%%\n",
595                         100.0 * (double) ndupextents / (double) nextents);
596         }
597
598         perc = 1.00 - ((double) nchunks / (double) nextents);
599         perc *= 100.0;
600         printf("Fio setting: dedupe_percentage=%u\n", (int) (perc + 0.50));
601
602
603         if (compression) {
604                 uc_human = bytes_to_human_readable_unit(unique_capacity, &unit);
605                 printf("Unique capacity %lu%s\n", (unsigned long) uc_human, unit);
606         }
607 }
608
609 static void iter_rb_tree(uint64_t *nextents, uint64_t *nchunks, uint64_t *ndupextents)
610 {
611         struct fio_rb_node *n;
612         *nchunks = *nextents = *ndupextents = 0;
613
614         n = rb_first(&rb_root);
615         if (!n)
616                 return;
617
618         do {
619                 struct chunk *c;
620
621                 c = rb_entry(n, struct chunk, rb_node);
622                 (*nchunks)++;
623                 *nextents += c->count;
624                 *ndupextents += (c->count > 1);
625
626                 if (dump_output)
627                         show_chunk(c);
628
629         } while ((n = rb_next(n)) != NULL);
630 }
631
632 static int usage(char *argv[])
633 {
634         log_err("Check for dedupable blocks on a device/file\n\n");
635         log_err("%s: [options] <device or file>\n", argv[0]);
636         log_err("\t-b\tChunk size to use\n");
637         log_err("\t-t\tNumber of threads to use\n");
638         log_err("\t-d\tFull extent/chunk debug output\n");
639         log_err("\t-o\tUse O_DIRECT\n");
640         log_err("\t-c\tFull collision check\n");
641         log_err("\t-B\tUse probabilistic bloom filter\n");
642         log_err("\t-p\tPrint progress indicator\n");
643         log_err("\t-C\tCalculate compressible size\n");
644         return 1;
645 }
646
647 int main(int argc, char *argv[])
648 {
649         uint64_t nextents = 0, nchunks = 0, ndupextents = 0, unique_capacity;
650         int c, ret;
651
652         arch_init(argv);
653         debug_init();
654
655         while ((c = getopt(argc, argv, "b:t:d:o:c:p:B:C:")) != -1) {
656                 switch (c) {
657                 case 'b':
658                         blocksize = atoi(optarg);
659                         break;
660                 case 't':
661                         num_threads = atoi(optarg);
662                         break;
663                 case 'd':
664                         dump_output = atoi(optarg);
665                         break;
666                 case 'o':
667                         odirect = atoi(optarg);
668                         break;
669                 case 'c':
670                         collision_check = atoi(optarg);
671                         break;
672                 case 'p':
673                         print_progress = atoi(optarg);
674                         break;
675                 case 'B':
676                         use_bloom = atoi(optarg);
677                         break;
678                 case 'C':
679                         compression = atoi(optarg);
680                         break;
681                 case '?':
682                 default:
683                         return usage(argv);
684                 }
685         }
686
687         if (collision_check || dump_output || compression)
688                 use_bloom = 0;
689
690         if (!num_threads)
691                 num_threads = cpus_configured();
692
693         if (argc == optind)
694                 return usage(argv);
695
696         sinit();
697
698         rb_root = RB_ROOT;
699         rb_lock = fio_sem_init(FIO_SEM_UNLOCKED);
700
701         ret = dedupe_check(argv[optind], &nextents, &nchunks, &unique_capacity);
702
703         if (!ret) {
704                 if (!bloom)
705                         iter_rb_tree(&nextents, &nchunks, &ndupextents);
706
707                 show_stat(nextents, nchunks, ndupextents, unique_capacity);
708         }
709
710         fio_sem_remove(rb_lock);
711         if (bloom)
712                 bloom_free(bloom);
713         scleanup();
714         return ret;
715 }