3ae17242f4a741bf2ccefd818f4fc8aeea3e6bc4
[fio.git] / t / dedupe.c
1 /*
2  * Small tool to check for dedupable blocks in a file or device. Basically
3  * just scans the filename for extents of the given size, checksums them,
4  * and orders them up.
5  */
6 #include <stdio.h>
7 #include <stdio.h>
8 #include <unistd.h>
9 #include <inttypes.h>
10 #include <assert.h>
11 #include <sys/types.h>
12 #include <sys/stat.h>
13 #include <sys/ioctl.h>
14 #include <linux/fs.h>
15 #include <fcntl.h>
16 #include <string.h>
17
18 #include "../lib/rbtree.h"
19 #include "../flist.h"
20 #include "../log.h"
21 #include "../mutex.h"
22 #include "../smalloc.h"
23 #include "../minmax.h"
24 #include "../crc/md5.h"
25 #include "../memalign.h"
26 #include "../os/os.h"
27 #include "../gettime.h"
28 #include "../fio_time.h"
29
30 #include "../lib/bloom.h"
31 #include "debug.h"
32
33 struct worker_thread {
34         pthread_t thread;
35
36         volatile int done;
37
38         int fd;
39         uint64_t cur_offset;
40         uint64_t size;
41
42         unsigned long items;
43         unsigned long dupes;
44         int err;
45 };
46
47 struct extent {
48         struct flist_head list;
49         uint64_t offset;
50 };
51
52 struct chunk {
53         struct rb_node rb_node;
54         uint64_t count;
55         uint32_t hash[MD5_HASH_WORDS];
56         struct flist_head extent_list[0];
57 };
58
59 struct item {
60         uint64_t offset;
61         uint32_t hash[MD5_HASH_WORDS];
62 };
63
64 static struct rb_root rb_root;
65 static struct bloom *bloom;
66 static struct fio_mutex *rb_lock;
67
68 static unsigned int blocksize = 4096;
69 static unsigned int num_threads;
70 static unsigned int chunk_size = 1048576;
71 static unsigned int dump_output;
72 static unsigned int odirect;
73 static unsigned int collision_check;
74 static unsigned int print_progress = 1;
75 static unsigned int use_bloom = 1;
76
77 static uint64_t total_size;
78 static uint64_t cur_offset;
79 static struct fio_mutex *size_lock;
80
81 static int dev_fd;
82
83 static uint64_t get_size(int fd, struct stat *sb)
84 {
85         uint64_t ret;
86
87         if (S_ISBLK(sb->st_mode)) {
88                 if (ioctl(fd, BLKGETSIZE64, &ret) < 0) {
89                         perror("ioctl");
90                         return 0;
91                 }
92         } else
93                 ret = sb->st_size;
94
95         return (ret & ~((uint64_t)blocksize - 1));
96 }
97
98 static int get_work(uint64_t *offset, uint64_t *size)
99 {
100         uint64_t this_chunk;
101         int ret = 1;
102
103         fio_mutex_down(size_lock);
104
105         if (cur_offset < total_size) {
106                 *offset = cur_offset;
107                 this_chunk = min((uint64_t)chunk_size, total_size - cur_offset);
108                 *size = this_chunk;
109                 cur_offset += this_chunk;
110                 ret = 0;
111         }
112
113         fio_mutex_up(size_lock);
114         return ret;
115 }
116
117 static int read_block(int fd, void *buf, off_t offset)
118 {
119         ssize_t ret;
120
121         ret = pread(fd, buf, blocksize, offset);
122         if (ret < 0) {
123                 perror("pread");
124                 return 1;
125         } else if (!ret)
126                 return 1;
127         else if (ret != blocksize) {
128                 log_err("dedupe: short read on block\n");
129                 return 1;
130         }
131
132         return 0;
133 }
134
135 static void add_item(struct chunk *c, struct item *i)
136 {
137         /*      
138          * Save some memory and don't add extent items, if we don't
139          * use them.
140          */
141         if (dump_output || collision_check) {
142                 struct extent *e;
143
144                 e = malloc(sizeof(*e));
145                 e->offset = i->offset;
146                 flist_add_tail(&e->list, &c->extent_list[0]);
147         }
148
149         c->count++;
150 }
151
152 static int col_check(struct chunk *c, struct item *i)
153 {
154         struct extent *e;
155         char *cbuf, *ibuf;
156         int ret = 1;
157
158         cbuf = fio_memalign(blocksize, blocksize);
159         ibuf = fio_memalign(blocksize, blocksize);
160
161         e = flist_entry(c->extent_list[0].next, struct extent, list);
162         if (read_block(dev_fd, cbuf, e->offset))
163                 goto out;
164
165         if (read_block(dev_fd, ibuf, i->offset))
166                 goto out;
167
168         ret = memcmp(ibuf, cbuf, blocksize);
169 out:
170         fio_memfree(cbuf, blocksize);
171         fio_memfree(ibuf, blocksize);
172         return ret;
173 }
174
175 static struct chunk *alloc_chunk(void)
176 {
177         struct chunk *c;
178
179         if (collision_check || dump_output) {
180                 c = malloc(sizeof(struct chunk) + sizeof(struct flist_head));
181                 INIT_FLIST_HEAD(&c->extent_list[0]);
182         } else
183                 c = malloc(sizeof(struct chunk));
184
185         return c;
186 }
187
188 static void insert_chunk(struct item *i)
189 {
190         struct rb_node **p, *parent;
191         struct chunk *c;
192         int diff;
193
194         p = &rb_root.rb_node;
195         parent = NULL;
196         while (*p) {
197                 parent = *p;
198
199                 c = rb_entry(parent, struct chunk, rb_node);
200                 diff = memcmp(i->hash, c->hash, sizeof(i->hash));
201                 if (diff < 0)
202                         p = &(*p)->rb_left;
203                 else if (diff > 0)
204                         p = &(*p)->rb_right;
205                 else {
206                         int ret;
207
208                         if (!collision_check)
209                                 goto add;
210
211                         fio_mutex_up(rb_lock);
212                         ret = col_check(c, i);
213                         fio_mutex_down(rb_lock);
214
215                         if (!ret)
216                                 goto add;
217
218                         p = &(*p)->rb_right;
219                 }
220         }
221
222         c = alloc_chunk();
223         RB_CLEAR_NODE(&c->rb_node);
224         c->count = 0;
225         memcpy(c->hash, i->hash, sizeof(i->hash));
226         rb_link_node(&c->rb_node, parent, p);
227         rb_insert_color(&c->rb_node, &rb_root);
228 add:
229         add_item(c, i);
230 }
231
232 static void insert_chunks(struct item *items, unsigned int nitems,
233                           uint64_t *ndupes)
234 {
235         int i;
236
237         fio_mutex_down(rb_lock);
238
239         for (i = 0; i < nitems; i++) {
240                 if (bloom) {
241                         unsigned int s;
242                         int r;
243
244                         s = sizeof(items[i].hash) / sizeof(uint32_t);
245                         r = bloom_set(bloom, items[i].hash, s);
246                         *ndupes += r;
247                 } else
248                         insert_chunk(&items[i]);
249         }
250
251         fio_mutex_up(rb_lock);
252 }
253
254 static void crc_buf(void *buf, uint32_t *hash)
255 {
256         struct fio_md5_ctx ctx = { .hash = hash };
257
258         fio_md5_init(&ctx);
259         fio_md5_update(&ctx, buf, blocksize);
260         fio_md5_final(&ctx);
261 }
262
263 static int do_work(struct worker_thread *thread, void *buf)
264 {
265         unsigned int nblocks, i;
266         off_t offset;
267         int err = 0, nitems = 0;
268         uint64_t ndupes = 0;
269         struct item *items;
270
271         nblocks = thread->size / blocksize;
272         offset = thread->cur_offset;
273         items = malloc(sizeof(*items) * nblocks);
274
275         for (i = 0; i < nblocks; i++) {
276                 if (read_block(thread->fd, buf, offset))
277                         break;
278                 if (items)
279                         items[i].offset = offset;
280                 crc_buf(buf, items[i].hash);
281                 offset += blocksize;
282                 nitems++;
283         }
284
285         insert_chunks(items, nitems, &ndupes);
286
287         free(items);
288         thread->items += nitems;
289         thread->dupes += ndupes;
290         return err;
291 }
292
293 static void *thread_fn(void *data)
294 {
295         struct worker_thread *thread = data;
296         void *buf;
297
298         buf = fio_memalign(blocksize, blocksize);
299
300         do {
301                 if (get_work(&thread->cur_offset, &thread->size)) {
302                         thread->err = 1;
303                         break;
304                 }
305                 if (do_work(thread, buf)) {
306                         thread->err = 1;
307                         break;
308                 }
309         } while (1);
310
311         thread->done = 1;
312         fio_memfree(buf, blocksize);
313         return NULL;
314 }
315
316 static void show_progress(struct worker_thread *threads, unsigned long total)
317 {
318         unsigned long last_nitems = 0;
319         struct timeval last_tv;
320
321         fio_gettime(&last_tv, NULL);
322
323         while (print_progress) {
324                 unsigned long this_items;
325                 unsigned long nitems = 0;
326                 uint64_t tdiff;
327                 float perc;
328                 int some_done;
329                 int i;
330
331                 for (i = 0; i < num_threads; i++) {
332                         nitems += threads[i].items;
333                         some_done = threads[i].done;
334                         if (some_done)
335                                 break;
336                 }
337
338                 if (some_done)
339                         break;
340
341                 perc = (float) nitems / (float) total;
342                 perc *= 100.0;
343                 this_items = nitems - last_nitems;
344                 this_items *= blocksize;
345                 tdiff = mtime_since_now(&last_tv);
346                 if (tdiff) {
347                         this_items /= tdiff;
348                         printf("%3.2f%% done (%luKB/sec)\r", perc, this_items);
349                         last_nitems = nitems;
350                         fio_gettime(&last_tv, NULL);
351                 } else
352                         printf("%3.2f%% done\r", perc);
353                 fflush(stdout);
354                 usleep(250000);
355         };
356 }
357
358 static int run_dedupe_threads(int fd, uint64_t dev_size, uint64_t *nextents,
359                                 uint64_t *nchunks)
360 {
361         struct worker_thread *threads;
362         unsigned long nitems, total_items;
363         int i, err = 0;
364
365         total_size = dev_size;
366         total_items = dev_size / blocksize;
367         cur_offset = 0;
368         size_lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
369
370         threads = malloc(num_threads * sizeof(struct worker_thread));
371         for (i = 0; i < num_threads; i++) {
372                 threads[i].fd = fd;
373                 threads[i].items = 0;
374                 threads[i].err = 0;
375                 threads[i].done = 0;
376
377                 err = pthread_create(&threads[i].thread, NULL, thread_fn, &threads[i]);
378                 if (err) {
379                         log_err("fio: thread startup failed\n");
380                         break;
381                 }
382         }
383
384         show_progress(threads, total_items);
385
386         nitems = 0;
387         *nextents = 0;
388         *nchunks = 1;
389         for (i = 0; i < num_threads; i++) {
390                 void *ret;
391                 pthread_join(threads[i].thread, &ret);
392                 nitems += threads[i].items;
393                 *nchunks += threads[i].dupes;
394         }
395
396         printf("Threads(%u): %lu items processed\n", num_threads, nitems);
397
398         *nextents = nitems;
399         *nchunks = nitems - *nchunks;
400
401         fio_mutex_remove(size_lock);
402         free(threads);
403         return err;
404 }
405
406 static int dedupe_check(const char *filename, uint64_t *nextents,
407                         uint64_t *nchunks)
408 {
409         uint64_t dev_size;
410         struct stat sb;
411         int flags;
412
413         flags = O_RDONLY;
414         if (odirect)
415                 flags |= O_DIRECT;
416
417         dev_fd = open(filename, flags);
418         if (dev_fd == -1) {
419                 perror("open");
420                 return 1;
421         }
422
423         if (fstat(dev_fd, &sb) < 0) {
424                 perror("fstat");
425                 close(dev_fd);
426                 return 1;
427         }
428
429         dev_size = get_size(dev_fd, &sb);
430         if (!dev_size) {
431                 close(dev_fd);
432                 return 1;
433         }
434
435         if (use_bloom) {
436                 uint64_t bloom_entries;
437
438                 bloom_entries = (3 * dev_size ) / (blocksize * 2);
439                 bloom = bloom_new(bloom_entries);
440         }
441
442         printf("Will check <%s>, size <%llu>, using %u threads\n", filename, (unsigned long long) dev_size, num_threads);
443
444         return run_dedupe_threads(dev_fd, dev_size, nextents, nchunks);
445 }
446
447 static void show_chunk(struct chunk *c)
448 {
449         struct flist_head *n;
450         struct extent *e;
451
452         printf("c hash %8x %8x %8x %8x, count %lu\n", c->hash[0], c->hash[1], c->hash[2], c->hash[3], (unsigned long) c->count);
453         flist_for_each(n, &c->extent_list[0]) {
454                 e = flist_entry(n, struct extent, list);
455                 printf("\toffset %llu\n", (unsigned long long) e->offset);
456         }
457 }
458
459 static void show_stat(uint64_t nextents, uint64_t nchunks)
460 {
461         double perc;
462
463         printf("Extents=%lu, Unique extents=%lu\n", (unsigned long) nextents, (unsigned long) nchunks);
464         printf("De-dupe factor: %3.2f\n", (double) nextents / (double) nchunks);
465
466         perc = 1.00 - ((double) nchunks / (double) nextents);
467         perc *= 100.0;
468         printf("Fio setting: dedupe_percentage=%u\n", (int) (perc + 0.50));
469
470 }
471
472 static void iter_rb_tree(uint64_t *nextents, uint64_t *nchunks)
473 {
474         struct rb_node *n;
475
476         *nchunks = *nextents = 0;
477
478         n = rb_first(&rb_root);
479         if (!n)
480                 return;
481
482         do {
483                 struct chunk *c;
484
485                 c = rb_entry(n, struct chunk, rb_node);
486                 (*nchunks)++;
487                 *nextents += c->count;
488
489                 if (dump_output)
490                         show_chunk(c);
491
492         } while ((n = rb_next(n)) != NULL);
493 }
494
495 static int usage(char *argv[])
496 {
497         log_err("Check for dedupable blocks on a device/file\n\n");
498         log_err("%s: [options] <device or file>\n", argv[0]);
499         log_err("\t-b\tChunk size to use\n");
500         log_err("\t-t\tNumber of threads to use\n");
501         log_err("\t-d\tFull extent/chunk debug output\n");
502         log_err("\t-o\tUse O_DIRECT\n");
503         log_err("\t-c\tFull collision check\n");
504         log_err("\t-B\tUse probabilistic bloom filter\n");
505         log_err("\t-p\tPrint progress indicator\n");
506         return 1;
507 }
508
509 int main(int argc, char *argv[])
510 {
511         uint64_t nextents, nchunks;
512         int c, ret;
513
514         debug_init();
515
516         while ((c = getopt(argc, argv, "b:t:d:o:c:p:B:")) != -1) {
517                 switch (c) {
518                 case 'b':
519                         blocksize = atoi(optarg);
520                         break;
521                 case 't':
522                         num_threads = atoi(optarg);
523                         break;
524                 case 'd':
525                         dump_output = atoi(optarg);
526                         break;
527                 case 'o':
528                         odirect = atoi(optarg);
529                         break;
530                 case 'c':
531                         collision_check = atoi(optarg);
532                         break;
533                 case 'p':
534                         print_progress = atoi(optarg);
535                         break;
536                 case 'B':
537                         use_bloom = atoi(optarg);
538                         break;
539                 case '?':
540                 default:
541                         return usage(argv);
542                 }
543         }
544
545         if (collision_check || dump_output)
546                 use_bloom = 0;
547
548         if (!num_threads)
549                 num_threads = cpus_online();
550
551         if (argc == optind)
552                 return usage(argv);
553
554         sinit();
555
556         rb_root = RB_ROOT;
557         rb_lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
558
559         ret = dedupe_check(argv[optind], &nextents, &nchunks);
560
561         if (!bloom)
562                 iter_rb_tree(&nextents, &nchunks);
563
564         show_stat(nextents, nchunks);
565
566         fio_mutex_remove(rb_lock);
567         bloom_free(bloom);
568         scleanup();
569         return ret;
570 }