5998138b507435e580b5305eaa5e41025d6a727f
[fio.git] / t / dedupe.c
1 /*
2  * Small tool to check for dedupable blocks in a file or device. Basically
3  * just scans the filename for extents of the given size, checksums them,
4  * and orders them up.
5  */
6 #include <stdio.h>
7 #include <stdio.h>
8 #include <unistd.h>
9 #include <inttypes.h>
10 #include <assert.h>
11 #include <sys/types.h>
12 #include <sys/stat.h>
13 #include <sys/ioctl.h>
14 #include <linux/fs.h>
15 #include <fcntl.h>
16 #include <string.h>
17
18 #include "../lib/rbtree.h"
19 #include "../flist.h"
20 #include "../log.h"
21 #include "../mutex.h"
22 #include "../smalloc.h"
23 #include "../minmax.h"
24 #include "../crc/md5.h"
25 #include "../memalign.h"
26 #include "../os/os.h"
27 #include "../gettime.h"
28 #include "../fio_time.h"
29
30 #include "../lib/bloom.h"
31 #include "debug.h"
32
33 struct worker_thread {
34         pthread_t thread;
35
36         volatile int done;
37
38         int fd;
39         uint64_t cur_offset;
40         uint64_t size;
41
42         unsigned long items;
43         unsigned long dupes;
44         int err;
45 };
46
47 struct extent {
48         struct flist_head list;
49         uint64_t offset;
50 };
51
52 struct chunk {
53         struct rb_node rb_node;
54         uint64_t count;
55         uint32_t hash[MD5_HASH_WORDS];
56         struct flist_head extent_list[0];
57 };
58
59 struct item {
60         uint64_t offset;
61         uint32_t hash[MD5_HASH_WORDS];
62 };
63
64 static struct rb_root rb_root;
65 static struct bloom *bloom;
66 static struct fio_mutex *rb_lock;
67
68 static unsigned int blocksize = 4096;
69 static unsigned int num_threads;
70 static unsigned int chunk_size = 1048576;
71 static unsigned int dump_output;
72 static unsigned int odirect;
73 static unsigned int collision_check;
74 static unsigned int print_progress = 1;
75 static unsigned int use_bloom = 1;
76
77 static uint64_t total_size;
78 static uint64_t cur_offset;
79 static struct fio_mutex *size_lock;
80
81 static int dev_fd;
82
83 static uint64_t get_size(int fd, struct stat *sb)
84 {
85         uint64_t ret;
86
87         if (S_ISBLK(sb->st_mode)) {
88                 if (ioctl(fd, BLKGETSIZE64, &ret) < 0) {
89                         perror("ioctl");
90                         return 0;
91                 }
92         } else
93                 ret = sb->st_size;
94
95         return (ret & ~((uint64_t)blocksize - 1));
96 }
97
98 static int get_work(uint64_t *offset, uint64_t *size)
99 {
100         uint64_t this_chunk;
101         int ret = 1;
102
103         fio_mutex_down(size_lock);
104
105         if (cur_offset < total_size) {
106                 *offset = cur_offset;
107                 this_chunk = min((uint64_t)chunk_size, total_size - cur_offset);
108                 *size = this_chunk;
109                 cur_offset += this_chunk;
110                 ret = 0;
111         }
112
113         fio_mutex_up(size_lock);
114         return ret;
115 }
116
117 static int __read_block(int fd, void *buf, off_t offset, size_t count)
118 {
119         ssize_t ret;
120
121         ret = pread(fd, buf, count, offset);
122         if (ret < 0) {
123                 perror("pread");
124                 return 1;
125         } else if (!ret)
126                 return 1;
127         else if (ret != count) {
128                 log_err("dedupe: short read on block\n");
129                 return 1;
130         }
131
132         return 0;
133 }
134
135 static int read_block(int fd, void *buf, off_t offset)
136 {
137         return __read_block(fd, buf, offset, blocksize);
138 }
139
140 static void add_item(struct chunk *c, struct item *i)
141 {
142         /*      
143          * Save some memory and don't add extent items, if we don't
144          * use them.
145          */
146         if (dump_output || collision_check) {
147                 struct extent *e;
148
149                 e = malloc(sizeof(*e));
150                 e->offset = i->offset;
151                 flist_add_tail(&e->list, &c->extent_list[0]);
152         }
153
154         c->count++;
155 }
156
157 static int col_check(struct chunk *c, struct item *i)
158 {
159         struct extent *e;
160         char *cbuf, *ibuf;
161         int ret = 1;
162
163         cbuf = fio_memalign(blocksize, blocksize);
164         ibuf = fio_memalign(blocksize, blocksize);
165
166         e = flist_entry(c->extent_list[0].next, struct extent, list);
167         if (read_block(dev_fd, cbuf, e->offset))
168                 goto out;
169
170         if (read_block(dev_fd, ibuf, i->offset))
171                 goto out;
172
173         ret = memcmp(ibuf, cbuf, blocksize);
174 out:
175         fio_memfree(cbuf, blocksize);
176         fio_memfree(ibuf, blocksize);
177         return ret;
178 }
179
180 static struct chunk *alloc_chunk(void)
181 {
182         struct chunk *c;
183
184         if (collision_check || dump_output) {
185                 c = malloc(sizeof(struct chunk) + sizeof(struct flist_head));
186                 INIT_FLIST_HEAD(&c->extent_list[0]);
187         } else
188                 c = malloc(sizeof(struct chunk));
189
190         return c;
191 }
192
193 static void insert_chunk(struct item *i)
194 {
195         struct rb_node **p, *parent;
196         struct chunk *c;
197         int diff;
198
199         p = &rb_root.rb_node;
200         parent = NULL;
201         while (*p) {
202                 parent = *p;
203
204                 c = rb_entry(parent, struct chunk, rb_node);
205                 diff = memcmp(i->hash, c->hash, sizeof(i->hash));
206                 if (diff < 0)
207                         p = &(*p)->rb_left;
208                 else if (diff > 0)
209                         p = &(*p)->rb_right;
210                 else {
211                         int ret;
212
213                         if (!collision_check)
214                                 goto add;
215
216                         fio_mutex_up(rb_lock);
217                         ret = col_check(c, i);
218                         fio_mutex_down(rb_lock);
219
220                         if (!ret)
221                                 goto add;
222
223                         p = &(*p)->rb_right;
224                 }
225         }
226
227         c = alloc_chunk();
228         RB_CLEAR_NODE(&c->rb_node);
229         c->count = 0;
230         memcpy(c->hash, i->hash, sizeof(i->hash));
231         rb_link_node(&c->rb_node, parent, p);
232         rb_insert_color(&c->rb_node, &rb_root);
233 add:
234         add_item(c, i);
235 }
236
237 static void insert_chunks(struct item *items, unsigned int nitems,
238                           uint64_t *ndupes)
239 {
240         int i;
241
242         fio_mutex_down(rb_lock);
243
244         for (i = 0; i < nitems; i++) {
245                 if (bloom) {
246                         unsigned int s;
247                         int r;
248
249                         s = sizeof(items[i].hash) / sizeof(uint32_t);
250                         r = bloom_set(bloom, items[i].hash, s);
251                         *ndupes += r;
252                 } else
253                         insert_chunk(&items[i]);
254         }
255
256         fio_mutex_up(rb_lock);
257 }
258
259 static void crc_buf(void *buf, uint32_t *hash)
260 {
261         struct fio_md5_ctx ctx = { .hash = hash };
262
263         fio_md5_init(&ctx);
264         fio_md5_update(&ctx, buf, blocksize);
265         fio_md5_final(&ctx);
266 }
267
268 static unsigned int read_blocks(int fd, void *buf, off_t offset, size_t size)
269 {
270         if (__read_block(fd, buf, offset, size))
271                 return 0;
272
273         return size / blocksize;
274 }
275
276 static int do_work(struct worker_thread *thread, void *buf)
277 {
278         unsigned int nblocks, i;
279         off_t offset;
280         int nitems = 0;
281         uint64_t ndupes = 0;
282         struct item *items;
283
284         offset = thread->cur_offset;
285
286         nblocks = read_blocks(thread->fd, buf, offset, min(thread->size, (uint64_t)chunk_size));
287         if (!nblocks)
288                 return 1;
289
290         items = malloc(sizeof(*items) * nblocks);
291
292         for (i = 0; i < nblocks; i++) {
293                 void *thisptr = buf + (i * blocksize);
294
295                 if (items)
296                         items[i].offset = offset;
297                 crc_buf(thisptr, items[i].hash);
298                 offset += blocksize;
299                 nitems++;
300         }
301
302         insert_chunks(items, nitems, &ndupes);
303
304         free(items);
305         thread->items += nitems;
306         thread->dupes += ndupes;
307         return 0;
308 }
309
310 static void *thread_fn(void *data)
311 {
312         struct worker_thread *thread = data;
313         void *buf;
314
315         buf = fio_memalign(blocksize, chunk_size);
316
317         do {
318                 if (get_work(&thread->cur_offset, &thread->size)) {
319                         thread->err = 1;
320                         break;
321                 }
322                 if (do_work(thread, buf)) {
323                         thread->err = 1;
324                         break;
325                 }
326         } while (1);
327
328         thread->done = 1;
329         fio_memfree(buf, chunk_size);
330         return NULL;
331 }
332
333 static void show_progress(struct worker_thread *threads, unsigned long total)
334 {
335         unsigned long last_nitems = 0;
336         struct timeval last_tv;
337
338         fio_gettime(&last_tv, NULL);
339
340         while (print_progress) {
341                 unsigned long this_items;
342                 unsigned long nitems = 0;
343                 uint64_t tdiff;
344                 float perc;
345                 int some_done;
346                 int i;
347
348                 for (i = 0; i < num_threads; i++) {
349                         nitems += threads[i].items;
350                         some_done = threads[i].done;
351                         if (some_done)
352                                 break;
353                 }
354
355                 if (some_done)
356                         break;
357
358                 perc = (float) nitems / (float) total;
359                 perc *= 100.0;
360                 this_items = nitems - last_nitems;
361                 this_items *= blocksize;
362                 tdiff = mtime_since_now(&last_tv);
363                 if (tdiff) {
364                         this_items /= tdiff;
365                         printf("%3.2f%% done (%luKB/sec)\r", perc, this_items);
366                         last_nitems = nitems;
367                         fio_gettime(&last_tv, NULL);
368                 } else
369                         printf("%3.2f%% done\r", perc);
370                 fflush(stdout);
371                 usleep(250000);
372         };
373 }
374
375 static int run_dedupe_threads(int fd, uint64_t dev_size, uint64_t *nextents,
376                                 uint64_t *nchunks)
377 {
378         struct worker_thread *threads;
379         unsigned long nitems, total_items;
380         int i, err = 0;
381
382         total_size = dev_size;
383         total_items = dev_size / blocksize;
384         cur_offset = 0;
385         size_lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
386
387         threads = malloc(num_threads * sizeof(struct worker_thread));
388         for (i = 0; i < num_threads; i++) {
389                 threads[i].fd = fd;
390                 threads[i].items = 0;
391                 threads[i].err = 0;
392                 threads[i].done = 0;
393
394                 err = pthread_create(&threads[i].thread, NULL, thread_fn, &threads[i]);
395                 if (err) {
396                         log_err("fio: thread startup failed\n");
397                         break;
398                 }
399         }
400
401         show_progress(threads, total_items);
402
403         nitems = 0;
404         *nextents = 0;
405         *nchunks = 1;
406         for (i = 0; i < num_threads; i++) {
407                 void *ret;
408                 pthread_join(threads[i].thread, &ret);
409                 nitems += threads[i].items;
410                 *nchunks += threads[i].dupes;
411         }
412
413         printf("Threads(%u): %lu items processed\n", num_threads, nitems);
414
415         *nextents = nitems;
416         *nchunks = nitems - *nchunks;
417
418         fio_mutex_remove(size_lock);
419         free(threads);
420         return err;
421 }
422
423 static int dedupe_check(const char *filename, uint64_t *nextents,
424                         uint64_t *nchunks)
425 {
426         uint64_t dev_size;
427         struct stat sb;
428         int flags;
429
430         flags = O_RDONLY;
431         if (odirect)
432                 flags |= O_DIRECT;
433
434         dev_fd = open(filename, flags);
435         if (dev_fd == -1) {
436                 perror("open");
437                 return 1;
438         }
439
440         if (fstat(dev_fd, &sb) < 0) {
441                 perror("fstat");
442                 close(dev_fd);
443                 return 1;
444         }
445
446         dev_size = get_size(dev_fd, &sb);
447         if (!dev_size) {
448                 close(dev_fd);
449                 return 1;
450         }
451
452         if (use_bloom) {
453                 uint64_t bloom_entries;
454
455                 bloom_entries = (3 * dev_size ) / (blocksize * 2);
456                 bloom = bloom_new(bloom_entries);
457         }
458
459         printf("Will check <%s>, size <%llu>, using %u threads\n", filename, (unsigned long long) dev_size, num_threads);
460
461         return run_dedupe_threads(dev_fd, dev_size, nextents, nchunks);
462 }
463
464 static void show_chunk(struct chunk *c)
465 {
466         struct flist_head *n;
467         struct extent *e;
468
469         printf("c hash %8x %8x %8x %8x, count %lu\n", c->hash[0], c->hash[1], c->hash[2], c->hash[3], (unsigned long) c->count);
470         flist_for_each(n, &c->extent_list[0]) {
471                 e = flist_entry(n, struct extent, list);
472                 printf("\toffset %llu\n", (unsigned long long) e->offset);
473         }
474 }
475
476 static void show_stat(uint64_t nextents, uint64_t nchunks)
477 {
478         double perc;
479
480         printf("Extents=%lu, Unique extents=%lu\n", (unsigned long) nextents, (unsigned long) nchunks);
481         printf("De-dupe factor: %3.2f\n", (double) nextents / (double) nchunks);
482
483         perc = 1.00 - ((double) nchunks / (double) nextents);
484         perc *= 100.0;
485         printf("Fio setting: dedupe_percentage=%u\n", (int) (perc + 0.50));
486
487 }
488
489 static void iter_rb_tree(uint64_t *nextents, uint64_t *nchunks)
490 {
491         struct rb_node *n;
492
493         *nchunks = *nextents = 0;
494
495         n = rb_first(&rb_root);
496         if (!n)
497                 return;
498
499         do {
500                 struct chunk *c;
501
502                 c = rb_entry(n, struct chunk, rb_node);
503                 (*nchunks)++;
504                 *nextents += c->count;
505
506                 if (dump_output)
507                         show_chunk(c);
508
509         } while ((n = rb_next(n)) != NULL);
510 }
511
512 static int usage(char *argv[])
513 {
514         log_err("Check for dedupable blocks on a device/file\n\n");
515         log_err("%s: [options] <device or file>\n", argv[0]);
516         log_err("\t-b\tChunk size to use\n");
517         log_err("\t-t\tNumber of threads to use\n");
518         log_err("\t-d\tFull extent/chunk debug output\n");
519         log_err("\t-o\tUse O_DIRECT\n");
520         log_err("\t-c\tFull collision check\n");
521         log_err("\t-B\tUse probabilistic bloom filter\n");
522         log_err("\t-p\tPrint progress indicator\n");
523         return 1;
524 }
525
526 int main(int argc, char *argv[])
527 {
528         uint64_t nextents = 0, nchunks = 0;
529         int c, ret;
530
531         debug_init();
532
533         while ((c = getopt(argc, argv, "b:t:d:o:c:p:B:")) != -1) {
534                 switch (c) {
535                 case 'b':
536                         blocksize = atoi(optarg);
537                         break;
538                 case 't':
539                         num_threads = atoi(optarg);
540                         break;
541                 case 'd':
542                         dump_output = atoi(optarg);
543                         break;
544                 case 'o':
545                         odirect = atoi(optarg);
546                         break;
547                 case 'c':
548                         collision_check = atoi(optarg);
549                         break;
550                 case 'p':
551                         print_progress = atoi(optarg);
552                         break;
553                 case 'B':
554                         use_bloom = atoi(optarg);
555                         break;
556                 case '?':
557                 default:
558                         return usage(argv);
559                 }
560         }
561
562         if (collision_check || dump_output)
563                 use_bloom = 0;
564
565         if (!num_threads)
566                 num_threads = cpus_online();
567
568         if (argc == optind)
569                 return usage(argv);
570
571         sinit();
572
573         rb_root = RB_ROOT;
574         rb_lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
575
576         ret = dedupe_check(argv[optind], &nextents, &nchunks);
577
578         if (!bloom)
579                 iter_rb_tree(&nextents, &nchunks);
580
581         show_stat(nextents, nchunks);
582
583         fio_mutex_remove(rb_lock);
584         if (bloom)
585                 bloom_free(bloom);
586         scleanup();
587         return ret;
588 }