0a033a4c91e48e77b6a0790b0cf5c45bee4d6134
[fio.git] / t / dedupe.c
1 /*
2  * Small tool to check for dedupable blocks in a file or device. Basically
3  * just scans the filename for extents of the given size, checksums them,
4  * and orders them up.
5  */
6 #include <stdio.h>
7 #include <stdio.h>
8 #include <unistd.h>
9 #include <inttypes.h>
10 #include <assert.h>
11 #include <sys/types.h>
12 #include <sys/stat.h>
13 #include <sys/ioctl.h>
14 #include <linux/fs.h>
15 #include <fcntl.h>
16 #include <string.h>
17
18 #include "../lib/rbtree.h"
19 #include "../flist.h"
20 #include "../log.h"
21 #include "../mutex.h"
22 #include "../smalloc.h"
23 #include "../minmax.h"
24 #include "../crc/md5.h"
25 #include "../memalign.h"
26 #include "../os/os.h"
27 #include "../gettime.h"
28 #include "../fio_time.h"
29
30 #include "../lib/bloom.h"
31 #include "debug.h"
32
33 struct worker_thread {
34         pthread_t thread;
35
36         volatile int done;
37
38         int fd;
39         uint64_t cur_offset;
40         uint64_t size;
41
42         unsigned long items;
43         unsigned long dupes;
44         int err;
45 };
46
47 struct extent {
48         struct flist_head list;
49         uint64_t offset;
50 };
51
52 struct chunk {
53         struct rb_node rb_node;
54         uint64_t count;
55         uint32_t hash[MD5_HASH_WORDS];
56         struct flist_head extent_list[0];
57 };
58
59 struct item {
60         uint64_t offset;
61         uint32_t hash[MD5_HASH_WORDS];
62 };
63
64 static struct rb_root rb_root;
65 static struct bloom *bloom;
66 static struct fio_mutex *rb_lock;
67
68 static unsigned int blocksize = 4096;
69 static unsigned int num_threads;
70 static unsigned int chunk_size = 1048576;
71 static unsigned int dump_output;
72 static unsigned int odirect;
73 static unsigned int collision_check;
74 static unsigned int print_progress = 1;
75 static unsigned int use_bloom = 1;
76
77 static uint64_t total_size;
78 static uint64_t cur_offset;
79 static struct fio_mutex *size_lock;
80
81 static struct fio_file file;
82
83 static uint64_t get_size(struct fio_file *f, struct stat *sb)
84 {
85         uint64_t ret;
86
87         if (S_ISBLK(sb->st_mode)) {
88                 unsigned long long bytes;
89
90                 if (blockdev_size(f, &bytes)) {
91                         log_err("dedupe: failed getting bdev size\n");
92                         return 0;
93                 }
94                 ret = bytes;
95         } else
96                 ret = sb->st_size;
97
98         return (ret & ~((uint64_t)blocksize - 1));
99 }
100
101 static int get_work(uint64_t *offset, uint64_t *size)
102 {
103         uint64_t this_chunk;
104         int ret = 1;
105
106         fio_mutex_down(size_lock);
107
108         if (cur_offset < total_size) {
109                 *offset = cur_offset;
110                 this_chunk = min((uint64_t)chunk_size, total_size - cur_offset);
111                 *size = this_chunk;
112                 cur_offset += this_chunk;
113                 ret = 0;
114         }
115
116         fio_mutex_up(size_lock);
117         return ret;
118 }
119
120 static int __read_block(int fd, void *buf, off_t offset, size_t count)
121 {
122         ssize_t ret;
123
124         ret = pread(fd, buf, count, offset);
125         if (ret < 0) {
126                 perror("pread");
127                 return 1;
128         } else if (!ret)
129                 return 1;
130         else if (ret != count) {
131                 log_err("dedupe: short read on block\n");
132                 return 1;
133         }
134
135         return 0;
136 }
137
138 static int read_block(int fd, void *buf, off_t offset)
139 {
140         return __read_block(fd, buf, offset, blocksize);
141 }
142
143 static void add_item(struct chunk *c, struct item *i)
144 {
145         /*      
146          * Save some memory and don't add extent items, if we don't
147          * use them.
148          */
149         if (dump_output || collision_check) {
150                 struct extent *e;
151
152                 e = malloc(sizeof(*e));
153                 e->offset = i->offset;
154                 flist_add_tail(&e->list, &c->extent_list[0]);
155         }
156
157         c->count++;
158 }
159
160 static int col_check(struct chunk *c, struct item *i)
161 {
162         struct extent *e;
163         char *cbuf, *ibuf;
164         int ret = 1;
165
166         cbuf = fio_memalign(blocksize, blocksize);
167         ibuf = fio_memalign(blocksize, blocksize);
168
169         e = flist_entry(c->extent_list[0].next, struct extent, list);
170         if (read_block(file.fd, cbuf, e->offset))
171                 goto out;
172
173         if (read_block(file.fd, ibuf, i->offset))
174                 goto out;
175
176         ret = memcmp(ibuf, cbuf, blocksize);
177 out:
178         fio_memfree(cbuf, blocksize);
179         fio_memfree(ibuf, blocksize);
180         return ret;
181 }
182
183 static struct chunk *alloc_chunk(void)
184 {
185         struct chunk *c;
186
187         if (collision_check || dump_output) {
188                 c = malloc(sizeof(struct chunk) + sizeof(struct flist_head));
189                 INIT_FLIST_HEAD(&c->extent_list[0]);
190         } else
191                 c = malloc(sizeof(struct chunk));
192
193         return c;
194 }
195
196 static void insert_chunk(struct item *i)
197 {
198         struct rb_node **p, *parent;
199         struct chunk *c;
200         int diff;
201
202         p = &rb_root.rb_node;
203         parent = NULL;
204         while (*p) {
205                 parent = *p;
206
207                 c = rb_entry(parent, struct chunk, rb_node);
208                 diff = memcmp(i->hash, c->hash, sizeof(i->hash));
209                 if (diff < 0)
210                         p = &(*p)->rb_left;
211                 else if (diff > 0)
212                         p = &(*p)->rb_right;
213                 else {
214                         int ret;
215
216                         if (!collision_check)
217                                 goto add;
218
219                         fio_mutex_up(rb_lock);
220                         ret = col_check(c, i);
221                         fio_mutex_down(rb_lock);
222
223                         if (!ret)
224                                 goto add;
225
226                         p = &(*p)->rb_right;
227                 }
228         }
229
230         c = alloc_chunk();
231         RB_CLEAR_NODE(&c->rb_node);
232         c->count = 0;
233         memcpy(c->hash, i->hash, sizeof(i->hash));
234         rb_link_node(&c->rb_node, parent, p);
235         rb_insert_color(&c->rb_node, &rb_root);
236 add:
237         add_item(c, i);
238 }
239
240 static void insert_chunks(struct item *items, unsigned int nitems,
241                           uint64_t *ndupes)
242 {
243         int i;
244
245         fio_mutex_down(rb_lock);
246
247         for (i = 0; i < nitems; i++) {
248                 if (bloom) {
249                         unsigned int s;
250                         int r;
251
252                         s = sizeof(items[i].hash) / sizeof(uint32_t);
253                         r = bloom_set(bloom, items[i].hash, s);
254                         *ndupes += r;
255                 } else
256                         insert_chunk(&items[i]);
257         }
258
259         fio_mutex_up(rb_lock);
260 }
261
262 static void crc_buf(void *buf, uint32_t *hash)
263 {
264         struct fio_md5_ctx ctx = { .hash = hash };
265
266         fio_md5_init(&ctx);
267         fio_md5_update(&ctx, buf, blocksize);
268         fio_md5_final(&ctx);
269 }
270
271 static unsigned int read_blocks(int fd, void *buf, off_t offset, size_t size)
272 {
273         if (__read_block(fd, buf, offset, size))
274                 return 0;
275
276         return size / blocksize;
277 }
278
279 static int do_work(struct worker_thread *thread, void *buf)
280 {
281         unsigned int nblocks, i;
282         off_t offset;
283         int nitems = 0;
284         uint64_t ndupes = 0;
285         struct item *items;
286
287         offset = thread->cur_offset;
288
289         nblocks = read_blocks(thread->fd, buf, offset, min(thread->size, (uint64_t)chunk_size));
290         if (!nblocks)
291                 return 1;
292
293         items = malloc(sizeof(*items) * nblocks);
294
295         for (i = 0; i < nblocks; i++) {
296                 void *thisptr = buf + (i * blocksize);
297
298                 if (items)
299                         items[i].offset = offset;
300                 crc_buf(thisptr, items[i].hash);
301                 offset += blocksize;
302                 nitems++;
303         }
304
305         insert_chunks(items, nitems, &ndupes);
306
307         free(items);
308         thread->items += nitems;
309         thread->dupes += ndupes;
310         return 0;
311 }
312
313 static void *thread_fn(void *data)
314 {
315         struct worker_thread *thread = data;
316         void *buf;
317
318         buf = fio_memalign(blocksize, chunk_size);
319
320         do {
321                 if (get_work(&thread->cur_offset, &thread->size)) {
322                         thread->err = 1;
323                         break;
324                 }
325                 if (do_work(thread, buf)) {
326                         thread->err = 1;
327                         break;
328                 }
329         } while (1);
330
331         thread->done = 1;
332         fio_memfree(buf, chunk_size);
333         return NULL;
334 }
335
336 static void show_progress(struct worker_thread *threads, unsigned long total)
337 {
338         unsigned long last_nitems = 0;
339         struct timeval last_tv;
340
341         fio_gettime(&last_tv, NULL);
342
343         while (print_progress) {
344                 unsigned long this_items;
345                 unsigned long nitems = 0;
346                 uint64_t tdiff;
347                 float perc;
348                 int some_done = 0;
349                 int i;
350
351                 for (i = 0; i < num_threads; i++) {
352                         nitems += threads[i].items;
353                         some_done = threads[i].done;
354                         if (some_done)
355                                 break;
356                 }
357
358                 if (some_done)
359                         break;
360
361                 perc = (float) nitems / (float) total;
362                 perc *= 100.0;
363                 this_items = nitems - last_nitems;
364                 this_items *= blocksize;
365                 tdiff = mtime_since_now(&last_tv);
366                 if (tdiff) {
367                         this_items = (this_items * 1000) / (tdiff * 1024);
368                         printf("%3.2f%% done (%luKB/sec)\r", perc, this_items);
369                         last_nitems = nitems;
370                         fio_gettime(&last_tv, NULL);
371                 } else
372                         printf("%3.2f%% done\r", perc);
373                 fflush(stdout);
374                 usleep(250000);
375         };
376 }
377
378 static int run_dedupe_threads(struct fio_file *f, uint64_t dev_size,
379                               uint64_t *nextents, uint64_t *nchunks)
380 {
381         struct worker_thread *threads;
382         unsigned long nitems, total_items;
383         int i, err = 0;
384
385         total_size = dev_size;
386         total_items = dev_size / blocksize;
387         cur_offset = 0;
388         size_lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
389
390         threads = malloc(num_threads * sizeof(struct worker_thread));
391         for (i = 0; i < num_threads; i++) {
392                 threads[i].fd = f->fd;
393                 threads[i].items = 0;
394                 threads[i].err = 0;
395                 threads[i].done = 0;
396
397                 err = pthread_create(&threads[i].thread, NULL, thread_fn, &threads[i]);
398                 if (err) {
399                         log_err("fio: thread startup failed\n");
400                         break;
401                 }
402         }
403
404         show_progress(threads, total_items);
405
406         nitems = 0;
407         *nextents = 0;
408         *nchunks = 1;
409         for (i = 0; i < num_threads; i++) {
410                 void *ret;
411                 pthread_join(threads[i].thread, &ret);
412                 nitems += threads[i].items;
413                 *nchunks += threads[i].dupes;
414         }
415
416         printf("Threads(%u): %lu items processed\n", num_threads, nitems);
417
418         *nextents = nitems;
419         *nchunks = nitems - *nchunks;
420
421         fio_mutex_remove(size_lock);
422         free(threads);
423         return err;
424 }
425
426 static int dedupe_check(const char *filename, uint64_t *nextents,
427                         uint64_t *nchunks)
428 {
429         uint64_t dev_size;
430         struct stat sb;
431         int flags;
432
433         flags = O_RDONLY;
434         if (odirect)
435                 flags |= O_DIRECT;
436
437         memset(&file, 0, sizeof(file));
438         file.file_name = strdup(filename);
439
440         file.fd = open(filename, flags);
441         if (file.fd == -1) {
442                 perror("open");
443                 goto err;
444         }
445
446         if (fstat(file.fd, &sb) < 0) {
447                 perror("fstat");
448                 goto err;
449         }
450
451         dev_size = get_size(&file, &sb);
452         if (!dev_size)
453                 goto err;
454
455         if (use_bloom) {
456                 uint64_t bloom_entries;
457
458                 bloom_entries = 8 * (dev_size / blocksize);
459                 bloom = bloom_new(bloom_entries);
460         }
461
462         printf("Will check <%s>, size <%llu>, using %u threads\n", filename, (unsigned long long) dev_size, num_threads);
463
464         return run_dedupe_threads(&file, dev_size, nextents, nchunks);
465 err:
466         if (file.fd != -1)
467                 close(file.fd);
468         free(file.file_name);
469         return 1;
470 }
471
472 static void show_chunk(struct chunk *c)
473 {
474         struct flist_head *n;
475         struct extent *e;
476
477         printf("c hash %8x %8x %8x %8x, count %lu\n", c->hash[0], c->hash[1], c->hash[2], c->hash[3], (unsigned long) c->count);
478         flist_for_each(n, &c->extent_list[0]) {
479                 e = flist_entry(n, struct extent, list);
480                 printf("\toffset %llu\n", (unsigned long long) e->offset);
481         }
482 }
483
484 static void show_stat(uint64_t nextents, uint64_t nchunks)
485 {
486         double perc, ratio;
487
488         printf("Extents=%lu, Unique extents=%lu\n", (unsigned long) nextents, (unsigned long) nchunks);
489
490         if (nchunks) {
491                 ratio = (double) nextents / (double) nchunks;
492                 printf("De-dupe ratio: 1:%3.2f\n", ratio - 1.0);
493         } else
494                 printf("De-dupe ratio: 1:infinite\n");
495
496         perc = 1.00 - ((double) nchunks / (double) nextents);
497         perc *= 100.0;
498         printf("Fio setting: dedupe_percentage=%u\n", (int) (perc + 0.50));
499
500 }
501
502 static void iter_rb_tree(uint64_t *nextents, uint64_t *nchunks)
503 {
504         struct rb_node *n;
505
506         *nchunks = *nextents = 0;
507
508         n = rb_first(&rb_root);
509         if (!n)
510                 return;
511
512         do {
513                 struct chunk *c;
514
515                 c = rb_entry(n, struct chunk, rb_node);
516                 (*nchunks)++;
517                 *nextents += c->count;
518
519                 if (dump_output)
520                         show_chunk(c);
521
522         } while ((n = rb_next(n)) != NULL);
523 }
524
525 static int usage(char *argv[])
526 {
527         log_err("Check for dedupable blocks on a device/file\n\n");
528         log_err("%s: [options] <device or file>\n", argv[0]);
529         log_err("\t-b\tChunk size to use\n");
530         log_err("\t-t\tNumber of threads to use\n");
531         log_err("\t-d\tFull extent/chunk debug output\n");
532         log_err("\t-o\tUse O_DIRECT\n");
533         log_err("\t-c\tFull collision check\n");
534         log_err("\t-B\tUse probabilistic bloom filter\n");
535         log_err("\t-p\tPrint progress indicator\n");
536         return 1;
537 }
538
539 int main(int argc, char *argv[])
540 {
541         uint64_t nextents = 0, nchunks = 0;
542         int c, ret;
543
544         debug_init();
545
546         while ((c = getopt(argc, argv, "b:t:d:o:c:p:B:")) != -1) {
547                 switch (c) {
548                 case 'b':
549                         blocksize = atoi(optarg);
550                         break;
551                 case 't':
552                         num_threads = atoi(optarg);
553                         break;
554                 case 'd':
555                         dump_output = atoi(optarg);
556                         break;
557                 case 'o':
558                         odirect = atoi(optarg);
559                         break;
560                 case 'c':
561                         collision_check = atoi(optarg);
562                         break;
563                 case 'p':
564                         print_progress = atoi(optarg);
565                         break;
566                 case 'B':
567                         use_bloom = atoi(optarg);
568                         break;
569                 case '?':
570                 default:
571                         return usage(argv);
572                 }
573         }
574
575         if (collision_check || dump_output)
576                 use_bloom = 0;
577
578         if (!num_threads)
579                 num_threads = cpus_online();
580
581         if (argc == optind)
582                 return usage(argv);
583
584         sinit();
585
586         rb_root = RB_ROOT;
587         rb_lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
588
589         ret = dedupe_check(argv[optind], &nextents, &nchunks);
590
591         if (!ret) {
592                 if (!bloom)
593                         iter_rb_tree(&nextents, &nchunks);
594
595                 show_stat(nextents, nchunks);
596         }
597
598         fio_mutex_remove(rb_lock);
599         if (bloom)
600                 bloom_free(bloom);
601         scleanup();
602         return ret;
603 }