t/io_uring: change fatal map buffers condition with multiple files
[fio.git] / t / dedupe.c
1 /*
2  * Small tool to check for dedupable blocks in a file or device. Basically
3  * just scans the filename for extents of the given size, checksums them,
4  * and orders them up.
5  */
6 #include <fcntl.h>
7 #include <inttypes.h>
8 #include <stdio.h>
9 #include <string.h>
10 #include <unistd.h>
11 #include <sys/stat.h>
12
13 #include "../fio.h"
14 #include "../flist.h"
15 #include "../log.h"
16 #include "../fio_sem.h"
17 #include "../smalloc.h"
18 #include "../minmax.h"
19 #include "../crc/md5.h"
20 #include "../os/os.h"
21 #include "../gettime.h"
22 #include "../fio_time.h"
23 #include "../lib/rbtree.h"
24
25 #include "../lib/bloom.h"
26 #include "debug.h"
27 #include "zlib.h"
28
29 struct zlib_ctrl {
30         z_stream stream;
31         unsigned char *buf_in;
32         unsigned char *buf_out;
33 };
34
35 struct worker_thread {
36         struct zlib_ctrl zc;
37         pthread_t thread;
38         uint64_t cur_offset;
39         uint64_t size;
40         unsigned long long unique_capacity;
41         unsigned long items;
42         unsigned long dupes;
43         int err;
44         int fd;
45         volatile int done;
46 };
47
48 struct extent {
49         struct flist_head list;
50         uint64_t offset;
51 };
52
53 struct chunk {
54         struct fio_rb_node rb_node;
55         uint64_t count;
56         uint32_t hash[MD5_HASH_WORDS];
57         struct flist_head extent_list[0];
58 };
59
60 struct item {
61         uint64_t offset;
62         uint32_t hash[MD5_HASH_WORDS];
63 };
64
65 static struct rb_root rb_root;
66 static struct bloom *bloom;
67 static struct fio_sem *rb_lock;
68
69 static unsigned int blocksize = 4096;
70 static unsigned int num_threads;
71 static unsigned int chunk_size = 1048576;
72 static unsigned int dump_output;
73 static unsigned int odirect;
74 static unsigned int collision_check;
75 static unsigned int print_progress = 1;
76 static unsigned int use_bloom = 1;
77 static unsigned int compression = 0;
78
79 static uint64_t total_size;
80 static uint64_t cur_offset;
81 static struct fio_sem *size_lock;
82
83 static struct fio_file file;
84
85 static uint64_t get_size(struct fio_file *f, struct stat *sb)
86 {
87         uint64_t ret;
88
89         if (S_ISBLK(sb->st_mode)) {
90                 unsigned long long bytes = 0;
91
92                 if (blockdev_size(f, &bytes)) {
93                         log_err("dedupe: failed getting bdev size\n");
94                         return 0;
95                 }
96                 ret = bytes;
97         } else {
98                 ret = sb->st_size;
99         }
100
101         return (ret & ~((uint64_t)blocksize - 1));
102 }
103
104 static int get_work(uint64_t *offset, uint64_t *size)
105 {
106         uint64_t this_chunk;
107         int ret = 1;
108
109         fio_sem_down(size_lock);
110
111         if (cur_offset < total_size) {
112                 *offset = cur_offset;
113                 this_chunk = min((uint64_t)chunk_size, total_size - cur_offset);
114                 *size = this_chunk;
115                 cur_offset += this_chunk;
116                 ret = 0;
117         }
118
119         fio_sem_up(size_lock);
120         return ret;
121 }
122
123 static int __read_block(int fd, void *buf, off_t offset, size_t count)
124 {
125         ssize_t ret;
126
127         ret = pread(fd, buf, count, offset);
128         if (ret < 0) {
129                 perror("pread");
130                 return 1;
131         } else if (!ret) {
132                 return 1;
133         } else if (ret != count) {
134                 log_err("dedupe: short read on block\n");
135                 return 1;
136         }
137
138         return 0;
139 }
140
141 static int read_block(int fd, void *buf, off_t offset)
142 {
143         return __read_block(fd, buf, offset, blocksize);
144 }
145
146 static void account_unique_capacity(uint64_t offset, uint64_t *unique_capacity,
147                                     struct zlib_ctrl *zc)
148 {
149         z_stream *stream = &zc->stream;
150         unsigned int compressed_len;
151         int ret;
152
153         if (read_block(file.fd, zc->buf_in, offset))
154                 return;
155
156         stream->next_in = zc->buf_in;
157         stream->avail_in = blocksize;
158         stream->avail_out = deflateBound(stream, blocksize);
159         stream->next_out = zc->buf_out;
160
161         ret = deflate(stream, Z_FINISH);
162         assert(ret != Z_STREAM_ERROR);
163         compressed_len = blocksize - stream->avail_out;
164
165         if (dump_output)
166                 printf("offset 0x%lx compressed to %d blocksize %d ratio %.2f \n",
167                                 (unsigned long) offset, compressed_len, blocksize,
168                                 (float)compressed_len / (float)blocksize);
169
170         *unique_capacity += compressed_len;
171         deflateReset(stream);
172 }
173
174 static void add_item(struct chunk *c, struct item *i)
175 {
176         /*      
177          * Save some memory and don't add extent items, if we don't
178          * use them.
179          */
180         if (dump_output || collision_check) {
181                 struct extent *e;
182
183                 e = malloc(sizeof(*e));
184                 e->offset = i->offset;
185                 flist_add_tail(&e->list, &c->extent_list[0]);
186         }
187
188         c->count++;
189 }
190
191 static int col_check(struct chunk *c, struct item *i)
192 {
193         struct extent *e;
194         char *cbuf, *ibuf;
195         int ret = 1;
196
197         cbuf = fio_memalign(blocksize, blocksize, false);
198         ibuf = fio_memalign(blocksize, blocksize, false);
199
200         e = flist_entry(c->extent_list[0].next, struct extent, list);
201         if (read_block(file.fd, cbuf, e->offset))
202                 goto out;
203
204         if (read_block(file.fd, ibuf, i->offset))
205                 goto out;
206
207         ret = memcmp(ibuf, cbuf, blocksize);
208 out:
209         fio_memfree(cbuf, blocksize, false);
210         fio_memfree(ibuf, blocksize, false);
211         return ret;
212 }
213
214 static struct chunk *alloc_chunk(void)
215 {
216         struct chunk *c;
217
218         if (collision_check || dump_output) {
219                 c = malloc(sizeof(struct chunk) + sizeof(struct flist_head));
220                 INIT_FLIST_HEAD(&c->extent_list[0]);
221         } else {
222                 c = malloc(sizeof(struct chunk));
223         }
224
225         return c;
226 }
227
228 static void insert_chunk(struct item *i, uint64_t *unique_capacity,
229                          struct zlib_ctrl *zc)
230 {
231         struct fio_rb_node **p, *parent;
232         struct chunk *c;
233         int diff;
234
235         p = &rb_root.rb_node;
236         parent = NULL;
237         while (*p) {
238                 parent = *p;
239
240                 c = rb_entry(parent, struct chunk, rb_node);
241                 diff = memcmp(i->hash, c->hash, sizeof(i->hash));
242                 if (diff < 0) {
243                         p = &(*p)->rb_left;
244                 } else if (diff > 0) {
245                         p = &(*p)->rb_right;
246                 } else {
247                         int ret;
248
249                         if (!collision_check)
250                                 goto add;
251
252                         fio_sem_up(rb_lock);
253                         ret = col_check(c, i);
254                         fio_sem_down(rb_lock);
255
256                         if (!ret)
257                                 goto add;
258
259                         p = &(*p)->rb_right;
260                 }
261         }
262
263         c = alloc_chunk();
264         RB_CLEAR_NODE(&c->rb_node);
265         c->count = 0;
266         memcpy(c->hash, i->hash, sizeof(i->hash));
267         rb_link_node(&c->rb_node, parent, p);
268         rb_insert_color(&c->rb_node, &rb_root);
269         if (compression)
270                 account_unique_capacity(i->offset, unique_capacity, zc);
271 add:
272         add_item(c, i);
273 }
274
275 static void insert_chunks(struct item *items, unsigned int nitems,
276                           uint64_t *ndupes, uint64_t *unique_capacity,
277                           struct zlib_ctrl *zc)
278 {
279         int i;
280
281         fio_sem_down(rb_lock);
282
283         for (i = 0; i < nitems; i++) {
284                 if (bloom) {
285                         unsigned int s;
286                         int r;
287
288                         s = sizeof(items[i].hash) / sizeof(uint32_t);
289                         r = bloom_set(bloom, items[i].hash, s);
290                         *ndupes += r;
291                 } else
292                         insert_chunk(&items[i], unique_capacity, zc);
293         }
294
295         fio_sem_up(rb_lock);
296 }
297
298 static void crc_buf(void *buf, uint32_t *hash)
299 {
300         struct fio_md5_ctx ctx = { .hash = hash };
301
302         fio_md5_init(&ctx);
303         fio_md5_update(&ctx, buf, blocksize);
304         fio_md5_final(&ctx);
305 }
306
307 static unsigned int read_blocks(int fd, void *buf, off_t offset, size_t size)
308 {
309         if (__read_block(fd, buf, offset, size))
310                 return 0;
311
312         return size / blocksize;
313 }
314
315 static int do_work(struct worker_thread *thread, void *buf)
316 {
317         unsigned int nblocks, i;
318         off_t offset;
319         int nitems = 0;
320         uint64_t ndupes = 0;
321         uint64_t unique_capacity = 0;
322         struct item *items;
323
324         offset = thread->cur_offset;
325
326         nblocks = read_blocks(thread->fd, buf, offset,
327                                 min(thread->size, (uint64_t) chunk_size));
328         if (!nblocks)
329                 return 1;
330
331         items = malloc(sizeof(*items) * nblocks);
332
333         for (i = 0; i < nblocks; i++) {
334                 void *thisptr = buf + (i * blocksize);
335
336                 items[i].offset = offset;
337                 crc_buf(thisptr, items[i].hash);
338                 offset += blocksize;
339                 nitems++;
340         }
341
342         insert_chunks(items, nitems, &ndupes, &unique_capacity, &thread->zc);
343
344         free(items);
345         thread->items += nitems;
346         thread->dupes += ndupes;
347         thread->unique_capacity += unique_capacity;
348         return 0;
349 }
350
351 static void thread_init_zlib_control(struct worker_thread *thread)
352 {
353         size_t sz;
354
355         z_stream *stream = &thread->zc.stream;
356         stream->zalloc = Z_NULL;
357         stream->zfree = Z_NULL;
358         stream->opaque = Z_NULL;
359
360         if (deflateInit(stream, Z_DEFAULT_COMPRESSION) != Z_OK)
361                 return;
362
363         thread->zc.buf_in = fio_memalign(blocksize, blocksize, false);
364         sz = deflateBound(stream, blocksize);
365         thread->zc.buf_out = fio_memalign(blocksize, sz, false);
366 }
367
368 static void *thread_fn(void *data)
369 {
370         struct worker_thread *thread = data;
371         void *buf;
372
373         buf = fio_memalign(blocksize, chunk_size, false);
374         thread_init_zlib_control(thread);
375
376         do {
377                 if (get_work(&thread->cur_offset, &thread->size)) {
378                         thread->err = 1;
379                         break;
380                 }
381                 if (do_work(thread, buf)) {
382                         thread->err = 1;
383                         break;
384                 }
385         } while (1);
386
387         thread->done = 1;
388         fio_memfree(buf, chunk_size, false);
389         return NULL;
390 }
391
392 static void show_progress(struct worker_thread *threads, unsigned long total)
393 {
394         unsigned long last_nitems = 0;
395         struct timespec last_tv;
396
397         fio_gettime(&last_tv, NULL);
398
399         while (print_progress) {
400                 unsigned long this_items;
401                 unsigned long nitems = 0;
402                 uint64_t tdiff;
403                 float perc;
404                 int some_done = 0;
405                 int i;
406
407                 for (i = 0; i < num_threads; i++) {
408                         nitems += threads[i].items;
409                         some_done = threads[i].done;
410                         if (some_done)
411                                 break;
412                 }
413
414                 if (some_done)
415                         break;
416
417                 perc = (float) nitems / (float) total;
418                 perc *= 100.0;
419                 this_items = nitems - last_nitems;
420                 this_items *= blocksize;
421                 tdiff = mtime_since_now(&last_tv);
422                 if (tdiff) {
423                         this_items = (this_items * 1000) / (tdiff * 1024);
424                         printf("%3.2f%% done (%luKiB/sec)\r", perc, this_items);
425                         last_nitems = nitems;
426                         fio_gettime(&last_tv, NULL);
427                 } else {
428                         printf("%3.2f%% done\r", perc);
429                 }
430                 fflush(stdout);
431                 usleep(250000);
432         };
433 }
434
435 static int run_dedupe_threads(struct fio_file *f, uint64_t dev_size,
436                               uint64_t *nextents, uint64_t *nchunks,
437                               uint64_t *unique_capacity)
438 {
439         struct worker_thread *threads;
440         unsigned long nitems, total_items;
441         int i, err = 0;
442
443         total_size = dev_size;
444         total_items = dev_size / blocksize;
445         cur_offset = 0;
446         size_lock = fio_sem_init(FIO_SEM_UNLOCKED);
447
448         threads = malloc(num_threads * sizeof(struct worker_thread));
449         for (i = 0; i < num_threads; i++) {
450                 memset(&threads[i], 0, sizeof(struct worker_thread));
451                 threads[i].fd = f->fd;
452
453                 err = pthread_create(&threads[i].thread, NULL, thread_fn, &threads[i]);
454                 if (err) {
455                         log_err("fio: thread startup failed\n");
456                         break;
457                 }
458         }
459
460         show_progress(threads, total_items);
461
462         nitems = 0;
463         *nextents = 0;
464         *nchunks = 1;
465         *unique_capacity = 0;
466         for (i = 0; i < num_threads; i++) {
467                 void *ret;
468                 pthread_join(threads[i].thread, &ret);
469                 nitems += threads[i].items;
470                 *nchunks += threads[i].dupes;
471                 *unique_capacity += threads[i].unique_capacity;
472         }
473
474         printf("Threads(%u): %lu items processed\n", num_threads, nitems);
475
476         *nextents = nitems;
477         *nchunks = nitems - *nchunks;
478
479         fio_sem_remove(size_lock);
480         free(threads);
481         return err;
482 }
483
484 static int dedupe_check(const char *filename, uint64_t *nextents,
485                         uint64_t *nchunks, uint64_t *unique_capacity)
486 {
487         uint64_t dev_size;
488         struct stat sb;
489         int flags;
490
491         flags = O_RDONLY;
492         if (odirect)
493                 flags |= OS_O_DIRECT;
494
495         memset(&file, 0, sizeof(file));
496         file.file_name = strdup(filename);
497
498         file.fd = open(filename, flags);
499         if (file.fd == -1) {
500                 perror("open");
501                 goto err;
502         }
503
504         if (fstat(file.fd, &sb) < 0) {
505                 perror("fstat");
506                 goto err;
507         }
508
509         dev_size = get_size(&file, &sb);
510         if (!dev_size)
511                 goto err;
512
513         if (use_bloom) {
514                 uint64_t bloom_entries;
515
516                 bloom_entries = 8 * (dev_size / blocksize);
517                 bloom = bloom_new(bloom_entries);
518         }
519
520         printf("Will check <%s>, size <%llu>, using %u threads\n", filename,
521                                 (unsigned long long) dev_size, num_threads);
522
523         return run_dedupe_threads(&file, dev_size, nextents, nchunks,
524                                         unique_capacity);
525 err:
526         if (file.fd != -1)
527                 close(file.fd);
528         free(file.file_name);
529         return 1;
530 }
531
532 static void show_chunk(struct chunk *c)
533 {
534         struct flist_head *n;
535         struct extent *e;
536
537         printf("c hash %8x %8x %8x %8x, count %lu\n", c->hash[0], c->hash[1],
538                         c->hash[2], c->hash[3], (unsigned long) c->count);
539         flist_for_each(n, &c->extent_list[0]) {
540                 e = flist_entry(n, struct extent, list);
541                 printf("\toffset %llu\n", (unsigned long long) e->offset);
542         }
543 }
544
545 static const char *capacity_unit[] = {"b","KB", "MB", "GB", "TB", "PB", "EB"};
546
547 static uint64_t bytes_to_human_readable_unit(uint64_t n, const char **unit_out)
548 {
549         uint8_t i = 0;
550
551         while (n >= 1024) {
552                 i++;
553                 n /= 1024;
554         }
555
556         *unit_out = capacity_unit[i];
557         return n;
558 }
559
560 static void show_stat(uint64_t nextents, uint64_t nchunks, uint64_t ndupextents,
561                       uint64_t unique_capacity)
562 {
563         double perc, ratio;
564         const char *unit;
565         uint64_t uc_human;
566
567         printf("Extents=%lu, Unique extents=%lu", (unsigned long) nextents,
568                                                 (unsigned long) nchunks);
569         if (!bloom)
570                 printf(" Duplicated extents=%lu", (unsigned long) ndupextents);
571         printf("\n");
572
573         if (nchunks) {
574                 ratio = (double) nextents / (double) nchunks;
575                 printf("De-dupe ratio: 1:%3.2f\n", ratio - 1.0);
576         } else {
577                 printf("De-dupe ratio: 1:infinite\n");
578         }
579
580         if (ndupextents) {
581                 printf("De-dupe working set at least: %3.2f%%\n",
582                         100.0 * (double) ndupextents / (double) nextents);
583         }
584
585         perc = 1.00 - ((double) nchunks / (double) nextents);
586         perc *= 100.0;
587         printf("Fio setting: dedupe_percentage=%u\n", (int) (perc + 0.50));
588
589
590         if (compression) {
591                 uc_human = bytes_to_human_readable_unit(unique_capacity, &unit);
592                 printf("Unique capacity %lu%s\n", (unsigned long) uc_human, unit);
593         }
594 }
595
596 static void iter_rb_tree(uint64_t *nextents, uint64_t *nchunks, uint64_t *ndupextents)
597 {
598         struct fio_rb_node *n;
599         *nchunks = *nextents = *ndupextents = 0;
600
601         n = rb_first(&rb_root);
602         if (!n)
603                 return;
604
605         do {
606                 struct chunk *c;
607
608                 c = rb_entry(n, struct chunk, rb_node);
609                 (*nchunks)++;
610                 *nextents += c->count;
611                 *ndupextents += (c->count > 1);
612
613                 if (dump_output)
614                         show_chunk(c);
615
616         } while ((n = rb_next(n)) != NULL);
617 }
618
619 static int usage(char *argv[])
620 {
621         log_err("Check for dedupable blocks on a device/file\n\n");
622         log_err("%s: [options] <device or file>\n", argv[0]);
623         log_err("\t-b\tChunk size to use\n");
624         log_err("\t-t\tNumber of threads to use\n");
625         log_err("\t-d\tFull extent/chunk debug output\n");
626         log_err("\t-o\tUse O_DIRECT\n");
627         log_err("\t-c\tFull collision check\n");
628         log_err("\t-B\tUse probabilistic bloom filter\n");
629         log_err("\t-p\tPrint progress indicator\n");
630         log_err("\t-C\tCalculate compressible size\n");
631         return 1;
632 }
633
634 int main(int argc, char *argv[])
635 {
636         uint64_t nextents = 0, nchunks = 0, ndupextents = 0, unique_capacity;
637         int c, ret;
638
639         arch_init(argv);
640         debug_init();
641
642         while ((c = getopt(argc, argv, "b:t:d:o:c:p:B:C:")) != -1) {
643                 switch (c) {
644                 case 'b':
645                         blocksize = atoi(optarg);
646                         break;
647                 case 't':
648                         num_threads = atoi(optarg);
649                         break;
650                 case 'd':
651                         dump_output = atoi(optarg);
652                         break;
653                 case 'o':
654                         odirect = atoi(optarg);
655                         break;
656                 case 'c':
657                         collision_check = atoi(optarg);
658                         break;
659                 case 'p':
660                         print_progress = atoi(optarg);
661                         break;
662                 case 'B':
663                         use_bloom = atoi(optarg);
664                         break;
665                 case 'C':
666                         compression = atoi(optarg);
667                         break;
668                 case '?':
669                 default:
670                         return usage(argv);
671                 }
672         }
673
674         if (collision_check || dump_output || compression)
675                 use_bloom = 0;
676
677         if (!num_threads)
678                 num_threads = cpus_online();
679
680         if (argc == optind)
681                 return usage(argv);
682
683         sinit();
684
685         rb_root = RB_ROOT;
686         rb_lock = fio_sem_init(FIO_SEM_UNLOCKED);
687
688         ret = dedupe_check(argv[optind], &nextents, &nchunks, &unique_capacity);
689
690         if (!ret) {
691                 if (!bloom)
692                         iter_rb_tree(&nextents, &nchunks, &ndupextents);
693
694                 show_stat(nextents, nchunks, ndupextents, unique_capacity);
695         }
696
697         fio_sem_remove(rb_lock);
698         if (bloom)
699                 bloom_free(bloom);
700         scleanup();
701         return ret;
702 }