configure: Add <linux/blkzoned.h> test
[fio.git] / t / dedupe.c
1 /*
2  * Small tool to check for dedupable blocks in a file or device. Basically
3  * just scans the filename for extents of the given size, checksums them,
4  * and orders them up.
5  */
6 #include <fcntl.h>
7 #include <inttypes.h>
8 #include <stdio.h>
9 #include <string.h>
10 #include <unistd.h>
11 #include <sys/stat.h>
12
13 #include "../flist.h"
14 #include "../log.h"
15 #include "../fio_sem.h"
16 #include "../smalloc.h"
17 #include "../minmax.h"
18 #include "../crc/md5.h"
19 #include "../lib/memalign.h"
20 #include "../os/os.h"
21 #include "../gettime.h"
22 #include "../fio_time.h"
23 #include "../lib/rbtree.h"
24
25 #include "../lib/bloom.h"
26 #include "debug.h"
27
28 struct worker_thread {
29         pthread_t thread;
30
31         volatile int done;
32
33         int fd;
34         uint64_t cur_offset;
35         uint64_t size;
36
37         unsigned long items;
38         unsigned long dupes;
39         int err;
40 };
41
42 struct extent {
43         struct flist_head list;
44         uint64_t offset;
45 };
46
47 struct chunk {
48         struct fio_rb_node rb_node;
49         uint64_t count;
50         uint32_t hash[MD5_HASH_WORDS];
51         struct flist_head extent_list[0];
52 };
53
54 struct item {
55         uint64_t offset;
56         uint32_t hash[MD5_HASH_WORDS];
57 };
58
59 static struct rb_root rb_root;
60 static struct bloom *bloom;
61 static struct fio_sem *rb_lock;
62
63 static unsigned int blocksize = 4096;
64 static unsigned int num_threads;
65 static unsigned int chunk_size = 1048576;
66 static unsigned int dump_output;
67 static unsigned int odirect;
68 static unsigned int collision_check;
69 static unsigned int print_progress = 1;
70 static unsigned int use_bloom = 1;
71
72 static uint64_t total_size;
73 static uint64_t cur_offset;
74 static struct fio_sem *size_lock;
75
76 static struct fio_file file;
77
78 static uint64_t get_size(struct fio_file *f, struct stat *sb)
79 {
80         uint64_t ret;
81
82         if (S_ISBLK(sb->st_mode)) {
83                 unsigned long long bytes = 0;
84
85                 if (blockdev_size(f, &bytes)) {
86                         log_err("dedupe: failed getting bdev size\n");
87                         return 0;
88                 }
89                 ret = bytes;
90         } else
91                 ret = sb->st_size;
92
93         return (ret & ~((uint64_t)blocksize - 1));
94 }
95
96 static int get_work(uint64_t *offset, uint64_t *size)
97 {
98         uint64_t this_chunk;
99         int ret = 1;
100
101         fio_sem_down(size_lock);
102
103         if (cur_offset < total_size) {
104                 *offset = cur_offset;
105                 this_chunk = min((uint64_t)chunk_size, total_size - cur_offset);
106                 *size = this_chunk;
107                 cur_offset += this_chunk;
108                 ret = 0;
109         }
110
111         fio_sem_up(size_lock);
112         return ret;
113 }
114
115 static int __read_block(int fd, void *buf, off_t offset, size_t count)
116 {
117         ssize_t ret;
118
119         ret = pread(fd, buf, count, offset);
120         if (ret < 0) {
121                 perror("pread");
122                 return 1;
123         } else if (!ret)
124                 return 1;
125         else if (ret != count) {
126                 log_err("dedupe: short read on block\n");
127                 return 1;
128         }
129
130         return 0;
131 }
132
133 static int read_block(int fd, void *buf, off_t offset)
134 {
135         return __read_block(fd, buf, offset, blocksize);
136 }
137
138 static void add_item(struct chunk *c, struct item *i)
139 {
140         /*      
141          * Save some memory and don't add extent items, if we don't
142          * use them.
143          */
144         if (dump_output || collision_check) {
145                 struct extent *e;
146
147                 e = malloc(sizeof(*e));
148                 e->offset = i->offset;
149                 flist_add_tail(&e->list, &c->extent_list[0]);
150         }
151
152         c->count++;
153 }
154
155 static int col_check(struct chunk *c, struct item *i)
156 {
157         struct extent *e;
158         char *cbuf, *ibuf;
159         int ret = 1;
160
161         cbuf = fio_memalign(blocksize, blocksize);
162         ibuf = fio_memalign(blocksize, blocksize);
163
164         e = flist_entry(c->extent_list[0].next, struct extent, list);
165         if (read_block(file.fd, cbuf, e->offset))
166                 goto out;
167
168         if (read_block(file.fd, ibuf, i->offset))
169                 goto out;
170
171         ret = memcmp(ibuf, cbuf, blocksize);
172 out:
173         fio_memfree(cbuf, blocksize);
174         fio_memfree(ibuf, blocksize);
175         return ret;
176 }
177
178 static struct chunk *alloc_chunk(void)
179 {
180         struct chunk *c;
181
182         if (collision_check || dump_output) {
183                 c = malloc(sizeof(struct chunk) + sizeof(struct flist_head));
184                 INIT_FLIST_HEAD(&c->extent_list[0]);
185         } else
186                 c = malloc(sizeof(struct chunk));
187
188         return c;
189 }
190
191 static void insert_chunk(struct item *i)
192 {
193         struct fio_rb_node **p, *parent;
194         struct chunk *c;
195         int diff;
196
197         p = &rb_root.rb_node;
198         parent = NULL;
199         while (*p) {
200                 parent = *p;
201
202                 c = rb_entry(parent, struct chunk, rb_node);
203                 diff = memcmp(i->hash, c->hash, sizeof(i->hash));
204                 if (diff < 0)
205                         p = &(*p)->rb_left;
206                 else if (diff > 0)
207                         p = &(*p)->rb_right;
208                 else {
209                         int ret;
210
211                         if (!collision_check)
212                                 goto add;
213
214                         fio_sem_up(rb_lock);
215                         ret = col_check(c, i);
216                         fio_sem_down(rb_lock);
217
218                         if (!ret)
219                                 goto add;
220
221                         p = &(*p)->rb_right;
222                 }
223         }
224
225         c = alloc_chunk();
226         RB_CLEAR_NODE(&c->rb_node);
227         c->count = 0;
228         memcpy(c->hash, i->hash, sizeof(i->hash));
229         rb_link_node(&c->rb_node, parent, p);
230         rb_insert_color(&c->rb_node, &rb_root);
231 add:
232         add_item(c, i);
233 }
234
235 static void insert_chunks(struct item *items, unsigned int nitems,
236                           uint64_t *ndupes)
237 {
238         int i;
239
240         fio_sem_down(rb_lock);
241
242         for (i = 0; i < nitems; i++) {
243                 if (bloom) {
244                         unsigned int s;
245                         int r;
246
247                         s = sizeof(items[i].hash) / sizeof(uint32_t);
248                         r = bloom_set(bloom, items[i].hash, s);
249                         *ndupes += r;
250                 } else
251                         insert_chunk(&items[i]);
252         }
253
254         fio_sem_up(rb_lock);
255 }
256
257 static void crc_buf(void *buf, uint32_t *hash)
258 {
259         struct fio_md5_ctx ctx = { .hash = hash };
260
261         fio_md5_init(&ctx);
262         fio_md5_update(&ctx, buf, blocksize);
263         fio_md5_final(&ctx);
264 }
265
266 static unsigned int read_blocks(int fd, void *buf, off_t offset, size_t size)
267 {
268         if (__read_block(fd, buf, offset, size))
269                 return 0;
270
271         return size / blocksize;
272 }
273
274 static int do_work(struct worker_thread *thread, void *buf)
275 {
276         unsigned int nblocks, i;
277         off_t offset;
278         int nitems = 0;
279         uint64_t ndupes = 0;
280         struct item *items;
281
282         offset = thread->cur_offset;
283
284         nblocks = read_blocks(thread->fd, buf, offset, min(thread->size, (uint64_t)chunk_size));
285         if (!nblocks)
286                 return 1;
287
288         items = malloc(sizeof(*items) * nblocks);
289
290         for (i = 0; i < nblocks; i++) {
291                 void *thisptr = buf + (i * blocksize);
292
293                 items[i].offset = offset;
294                 crc_buf(thisptr, items[i].hash);
295                 offset += blocksize;
296                 nitems++;
297         }
298
299         insert_chunks(items, nitems, &ndupes);
300
301         free(items);
302         thread->items += nitems;
303         thread->dupes += ndupes;
304         return 0;
305 }
306
307 static void *thread_fn(void *data)
308 {
309         struct worker_thread *thread = data;
310         void *buf;
311
312         buf = fio_memalign(blocksize, chunk_size);
313
314         do {
315                 if (get_work(&thread->cur_offset, &thread->size)) {
316                         thread->err = 1;
317                         break;
318                 }
319                 if (do_work(thread, buf)) {
320                         thread->err = 1;
321                         break;
322                 }
323         } while (1);
324
325         thread->done = 1;
326         fio_memfree(buf, chunk_size);
327         return NULL;
328 }
329
330 static void show_progress(struct worker_thread *threads, unsigned long total)
331 {
332         unsigned long last_nitems = 0;
333         struct timespec last_tv;
334
335         fio_gettime(&last_tv, NULL);
336
337         while (print_progress) {
338                 unsigned long this_items;
339                 unsigned long nitems = 0;
340                 uint64_t tdiff;
341                 float perc;
342                 int some_done = 0;
343                 int i;
344
345                 for (i = 0; i < num_threads; i++) {
346                         nitems += threads[i].items;
347                         some_done = threads[i].done;
348                         if (some_done)
349                                 break;
350                 }
351
352                 if (some_done)
353                         break;
354
355                 perc = (float) nitems / (float) total;
356                 perc *= 100.0;
357                 this_items = nitems - last_nitems;
358                 this_items *= blocksize;
359                 tdiff = mtime_since_now(&last_tv);
360                 if (tdiff) {
361                         this_items = (this_items * 1000) / (tdiff * 1024);
362                         printf("%3.2f%% done (%luKiB/sec)\r", perc, this_items);
363                         last_nitems = nitems;
364                         fio_gettime(&last_tv, NULL);
365                 } else
366                         printf("%3.2f%% done\r", perc);
367                 fflush(stdout);
368                 usleep(250000);
369         };
370 }
371
372 static int run_dedupe_threads(struct fio_file *f, uint64_t dev_size,
373                               uint64_t *nextents, uint64_t *nchunks)
374 {
375         struct worker_thread *threads;
376         unsigned long nitems, total_items;
377         int i, err = 0;
378
379         total_size = dev_size;
380         total_items = dev_size / blocksize;
381         cur_offset = 0;
382         size_lock = fio_sem_init(FIO_SEM_UNLOCKED);
383
384         threads = malloc(num_threads * sizeof(struct worker_thread));
385         for (i = 0; i < num_threads; i++) {
386                 memset(&threads[i], 0, sizeof(struct worker_thread));
387                 threads[i].fd = f->fd;
388
389                 err = pthread_create(&threads[i].thread, NULL, thread_fn, &threads[i]);
390                 if (err) {
391                         log_err("fio: thread startup failed\n");
392                         break;
393                 }
394         }
395
396         show_progress(threads, total_items);
397
398         nitems = 0;
399         *nextents = 0;
400         *nchunks = 1;
401         for (i = 0; i < num_threads; i++) {
402                 void *ret;
403                 pthread_join(threads[i].thread, &ret);
404                 nitems += threads[i].items;
405                 *nchunks += threads[i].dupes;
406         }
407
408         printf("Threads(%u): %lu items processed\n", num_threads, nitems);
409
410         *nextents = nitems;
411         *nchunks = nitems - *nchunks;
412
413         fio_sem_remove(size_lock);
414         free(threads);
415         return err;
416 }
417
418 static int dedupe_check(const char *filename, uint64_t *nextents,
419                         uint64_t *nchunks)
420 {
421         uint64_t dev_size;
422         struct stat sb;
423         int flags;
424
425         flags = O_RDONLY;
426         if (odirect)
427                 flags |= OS_O_DIRECT;
428
429         memset(&file, 0, sizeof(file));
430         file.file_name = strdup(filename);
431
432         file.fd = open(filename, flags);
433         if (file.fd == -1) {
434                 perror("open");
435                 goto err;
436         }
437
438         if (fstat(file.fd, &sb) < 0) {
439                 perror("fstat");
440                 goto err;
441         }
442
443         dev_size = get_size(&file, &sb);
444         if (!dev_size)
445                 goto err;
446
447         if (use_bloom) {
448                 uint64_t bloom_entries;
449
450                 bloom_entries = 8 * (dev_size / blocksize);
451                 bloom = bloom_new(bloom_entries);
452         }
453
454         printf("Will check <%s>, size <%llu>, using %u threads\n", filename, (unsigned long long) dev_size, num_threads);
455
456         return run_dedupe_threads(&file, dev_size, nextents, nchunks);
457 err:
458         if (file.fd != -1)
459                 close(file.fd);
460         free(file.file_name);
461         return 1;
462 }
463
464 static void show_chunk(struct chunk *c)
465 {
466         struct flist_head *n;
467         struct extent *e;
468
469         printf("c hash %8x %8x %8x %8x, count %lu\n", c->hash[0], c->hash[1], c->hash[2], c->hash[3], (unsigned long) c->count);
470         flist_for_each(n, &c->extent_list[0]) {
471                 e = flist_entry(n, struct extent, list);
472                 printf("\toffset %llu\n", (unsigned long long) e->offset);
473         }
474 }
475
476 static void show_stat(uint64_t nextents, uint64_t nchunks)
477 {
478         double perc, ratio;
479
480         printf("Extents=%lu, Unique extents=%lu\n", (unsigned long) nextents, (unsigned long) nchunks);
481
482         if (nchunks) {
483                 ratio = (double) nextents / (double) nchunks;
484                 printf("De-dupe ratio: 1:%3.2f\n", ratio - 1.0);
485         } else
486                 printf("De-dupe ratio: 1:infinite\n");
487
488         perc = 1.00 - ((double) nchunks / (double) nextents);
489         perc *= 100.0;
490         printf("Fio setting: dedupe_percentage=%u\n", (int) (perc + 0.50));
491
492 }
493
494 static void iter_rb_tree(uint64_t *nextents, uint64_t *nchunks)
495 {
496         struct fio_rb_node *n;
497
498         *nchunks = *nextents = 0;
499
500         n = rb_first(&rb_root);
501         if (!n)
502                 return;
503
504         do {
505                 struct chunk *c;
506
507                 c = rb_entry(n, struct chunk, rb_node);
508                 (*nchunks)++;
509                 *nextents += c->count;
510
511                 if (dump_output)
512                         show_chunk(c);
513
514         } while ((n = rb_next(n)) != NULL);
515 }
516
517 static int usage(char *argv[])
518 {
519         log_err("Check for dedupable blocks on a device/file\n\n");
520         log_err("%s: [options] <device or file>\n", argv[0]);
521         log_err("\t-b\tChunk size to use\n");
522         log_err("\t-t\tNumber of threads to use\n");
523         log_err("\t-d\tFull extent/chunk debug output\n");
524         log_err("\t-o\tUse O_DIRECT\n");
525         log_err("\t-c\tFull collision check\n");
526         log_err("\t-B\tUse probabilistic bloom filter\n");
527         log_err("\t-p\tPrint progress indicator\n");
528         return 1;
529 }
530
531 int main(int argc, char *argv[])
532 {
533         uint64_t nextents = 0, nchunks = 0;
534         int c, ret;
535
536         arch_init(argv);
537         debug_init();
538
539         while ((c = getopt(argc, argv, "b:t:d:o:c:p:B:")) != -1) {
540                 switch (c) {
541                 case 'b':
542                         blocksize = atoi(optarg);
543                         break;
544                 case 't':
545                         num_threads = atoi(optarg);
546                         break;
547                 case 'd':
548                         dump_output = atoi(optarg);
549                         break;
550                 case 'o':
551                         odirect = atoi(optarg);
552                         break;
553                 case 'c':
554                         collision_check = atoi(optarg);
555                         break;
556                 case 'p':
557                         print_progress = atoi(optarg);
558                         break;
559                 case 'B':
560                         use_bloom = atoi(optarg);
561                         break;
562                 case '?':
563                 default:
564                         return usage(argv);
565                 }
566         }
567
568         if (collision_check || dump_output)
569                 use_bloom = 0;
570
571         if (!num_threads)
572                 num_threads = cpus_online();
573
574         if (argc == optind)
575                 return usage(argv);
576
577         sinit();
578
579         rb_root = RB_ROOT;
580         rb_lock = fio_sem_init(FIO_SEM_UNLOCKED);
581
582         ret = dedupe_check(argv[optind], &nextents, &nchunks);
583
584         if (!ret) {
585                 if (!bloom)
586                         iter_rb_tree(&nextents, &nchunks);
587
588                 show_stat(nextents, nchunks);
589         }
590
591         fio_sem_remove(rb_lock);
592         if (bloom)
593                 bloom_free(bloom);
594         scleanup();
595         return ret;
596 }