Include engines in dependency check
[fio.git] / io_u.c
1 #include <unistd.h>
2 #include <fcntl.h>
3 #include <string.h>
4 #include <signal.h>
5 #include <time.h>
6 #include <assert.h>
7
8 #include "fio.h"
9 #include "os.h"
10
11 struct io_completion_data {
12         int nr;                         /* input */
13         endio_handler *handler;         /* input */
14
15         int error;                      /* output */
16         unsigned long bytes_done[2];    /* output */
17         struct timeval time;            /* output */
18 };
19
20 /*
21  * The ->file_map[] contains a map of blocks we have or have not done io
22  * to yet. Used to make sure we cover the entire range in a fair fashion.
23  */
24 static int random_map_free(struct thread_data *td, struct fio_file *f,
25                            unsigned long long block)
26 {
27         unsigned int idx = RAND_MAP_IDX(td, f, block);
28         unsigned int bit = RAND_MAP_BIT(td, f, block);
29
30         return (f->file_map[idx] & (1UL << bit)) == 0;
31 }
32
33 /*
34  * Mark a given offset as used in the map.
35  */
36 static void mark_random_map(struct thread_data *td, struct fio_file *f,
37                             struct io_u *io_u)
38 {
39         unsigned int min_bs = td->rw_min_bs;
40         unsigned long long block;
41         unsigned int blocks;
42         unsigned int nr_blocks;
43
44         block = io_u->offset / (unsigned long long) min_bs;
45         blocks = 0;
46         nr_blocks = (io_u->buflen + min_bs - 1) / min_bs;
47
48         while (blocks < nr_blocks) {
49                 unsigned int idx, bit;
50
51                 if (!random_map_free(td, f, block))
52                         break;
53
54                 idx = RAND_MAP_IDX(td, f, block);
55                 bit = RAND_MAP_BIT(td, f, block);
56
57                 fio_assert(td, idx < f->num_maps);
58
59                 f->file_map[idx] |= (1UL << bit);
60                 block++;
61                 blocks++;
62         }
63
64         if ((blocks * min_bs) < io_u->buflen)
65                 io_u->buflen = blocks * min_bs;
66 }
67
68 /*
69  * Return the next free block in the map.
70  */
71 static int get_next_free_block(struct thread_data *td, struct fio_file *f,
72                                unsigned long long *b)
73 {
74         int i;
75
76         i = f->last_free_lookup;
77         *b = (i * BLOCKS_PER_MAP);
78         while ((*b) * td->rw_min_bs < f->real_file_size) {
79                 if (f->file_map[i] != -1UL) {
80                         *b += ffz(f->file_map[i]);
81                         f->last_free_lookup = i;
82                         return 0;
83                 }
84
85                 *b += BLOCKS_PER_MAP;
86                 i++;
87         }
88
89         return 1;
90 }
91
92 /*
93  * For random io, generate a random new block and see if it's used. Repeat
94  * until we find a free one. For sequential io, just return the end of
95  * the last io issued.
96  */
97 static int get_next_offset(struct thread_data *td, struct fio_file *f,
98                            struct io_u *io_u)
99 {
100         const int ddir = io_u->ddir;
101         unsigned long long b, rb;
102         long r;
103
104         if (!td->sequential) {
105                 unsigned long long max_blocks = f->file_size / td->min_bs[ddir];
106                 int loops = 5;
107
108                 do {
109                         r = os_random_long(&td->random_state);
110                         b = ((max_blocks - 1) * r / (unsigned long long) (RAND_MAX+1.0));
111                         if (td->norandommap)
112                                 break;
113                         rb = b + (f->file_offset / td->min_bs[ddir]);
114                         loops--;
115                 } while (!random_map_free(td, f, rb) && loops);
116
117                 /*
118                  * if we failed to retrieve a truly random offset within
119                  * the loops assigned, see if there are free ones left at all
120                  */
121                 if (!loops && get_next_free_block(td, f, &b))
122                         return 1;
123         } else
124                 b = f->last_pos / td->min_bs[ddir];
125
126         io_u->offset = (b * td->min_bs[ddir]) + f->file_offset;
127         if (io_u->offset >= f->real_file_size)
128                 return 1;
129
130         return 0;
131 }
132
133 static unsigned int get_next_buflen(struct thread_data *td, struct fio_file *f,
134                                     struct io_u *io_u)
135 {
136         const int ddir = io_u->ddir;
137         unsigned int buflen;
138         long r;
139
140         if (td->min_bs[ddir] == td->max_bs[ddir])
141                 buflen = td->min_bs[ddir];
142         else {
143                 r = os_random_long(&td->bsrange_state);
144                 buflen = (unsigned int) (1 + (double) (td->max_bs[ddir] - 1) * r / (RAND_MAX + 1.0));
145                 if (!td->bs_unaligned)
146                         buflen = (buflen + td->min_bs[ddir] - 1) & ~(td->min_bs[ddir] - 1);
147         }
148
149         while (buflen + io_u->offset > f->real_file_size) {
150                 if (buflen == td->min_bs[ddir])
151                         return 0;
152
153                 buflen = td->min_bs[ddir];
154         }
155
156         return buflen;
157 }
158
159 /*
160  * Return the data direction for the next io_u. If the job is a
161  * mixed read/write workload, check the rwmix cycle and switch if
162  * necessary.
163  */
164 static enum fio_ddir get_rw_ddir(struct thread_data *td)
165 {
166         if (td_rw(td)) {
167                 struct timeval now;
168                 unsigned long elapsed;
169
170                 fio_gettime(&now, NULL);
171                 elapsed = mtime_since_now(&td->rwmix_switch);
172
173                 /*
174                  * Check if it's time to seed a new data direction.
175                  */
176                 if (elapsed >= td->rwmixcycle) {
177                         unsigned int v;
178                         long r;
179
180                         r = os_random_long(&td->rwmix_state);
181                         v = 1 + (int) (100.0 * (r / (RAND_MAX + 1.0)));
182                         if (v < td->rwmixread)
183                                 td->rwmix_ddir = DDIR_READ;
184                         else
185                                 td->rwmix_ddir = DDIR_WRITE;
186                         memcpy(&td->rwmix_switch, &now, sizeof(now));
187                 }
188                 return td->rwmix_ddir;
189         } else if (td_read(td))
190                 return DDIR_READ;
191         else
192                 return DDIR_WRITE;
193 }
194
195 void put_io_u(struct thread_data *td, struct io_u *io_u)
196 {
197         assert((io_u->flags & IO_U_F_FREE) == 0);
198         io_u->flags |= IO_U_F_FREE;
199
200         io_u->file = NULL;
201         list_del(&io_u->list);
202         list_add(&io_u->list, &td->io_u_freelist);
203         td->cur_depth--;
204 }
205
206 void requeue_io_u(struct thread_data *td, struct io_u **io_u)
207 {
208         struct io_u *__io_u = *io_u;
209
210         list_del(&__io_u->list);
211         list_add_tail(&__io_u->list, &td->io_u_requeues);
212         td->cur_depth--;
213         *io_u = NULL;
214 }
215
216 static int fill_io_u(struct thread_data *td, struct fio_file *f,
217                      struct io_u *io_u)
218 {
219         /*
220          * If using an iolog, grab next piece if any available.
221          */
222         if (td->read_iolog)
223                 return read_iolog_get(td, io_u);
224
225         /*
226          * see if it's time to sync
227          */
228         if (td->fsync_blocks && !(td->io_issues[DDIR_WRITE] % td->fsync_blocks)
229             && td->io_issues[DDIR_WRITE] && should_fsync(td)) {
230                 io_u->ddir = DDIR_SYNC;
231                 io_u->file = f;
232                 return 0;
233         }
234
235         io_u->ddir = get_rw_ddir(td);
236
237         /*
238          * No log, let the seq/rand engine retrieve the next buflen and
239          * position.
240          */
241         if (get_next_offset(td, f, io_u))
242                 return 1;
243
244         io_u->buflen = get_next_buflen(td, f, io_u);
245         if (!io_u->buflen)
246                 return 1;
247
248         /*
249          * mark entry before potentially trimming io_u
250          */
251         if (!td->read_iolog && !td->sequential && !td->norandommap)
252                 mark_random_map(td, f, io_u);
253
254         /*
255          * If using a write iolog, store this entry.
256          */
257         if (td->write_iolog_file)
258                 write_iolog_put(td, io_u);
259
260         io_u->file = f;
261         return 0;
262 }
263
264 static void io_u_mark_depth(struct thread_data *td)
265 {
266         int index = 0;
267
268         switch (td->cur_depth) {
269         default:
270                 index++;
271         case 32 ... 63:
272                 index++;
273         case 16 ... 31:
274                 index++;
275         case 8 ... 15:
276                 index++;
277         case 4 ... 7:
278                 index++;
279         case 2 ... 3:
280                 index++;
281         case 1:
282                 break;
283         }
284
285         td->io_u_map[index]++;
286         td->total_io_u++;
287 }
288
289 static void io_u_mark_latency(struct thread_data *td, unsigned long msec)
290 {
291         int index = 0;
292
293         switch (msec) {
294         default:
295                 index++;
296         case 1000 ... 1999:
297                 index++;
298         case 750 ... 999:
299                 index++;
300         case 500 ... 749:
301                 index++;
302         case 250 ... 499:
303                 index++;
304         case 100 ... 249:
305                 index++;
306         case 50 ... 99:
307                 index++;
308         case 20 ... 49:
309                 index++;
310         case 10 ... 19:
311                 index++;
312         case 4 ... 9:
313                 index++;
314         case 2 ... 3:
315                 index++;
316         case 0 ... 1:
317                 break;
318         }
319
320         td->io_u_lat[index]++;
321 }
322
323 static struct fio_file *get_next_file(struct thread_data *td)
324 {
325         unsigned int old_next_file = td->next_file;
326         struct fio_file *f;
327
328         do {
329                 f = &td->files[td->next_file];
330
331                 td->next_file++;
332                 if (td->next_file >= td->nr_files)
333                         td->next_file = 0;
334
335                 if (f->fd != -1)
336                         break;
337
338                 f = NULL;
339         } while (td->next_file != old_next_file);
340
341         return f;
342 }
343
344 struct io_u *__get_io_u(struct thread_data *td)
345 {
346         struct io_u *io_u = NULL;
347
348         if (!list_empty(&td->io_u_requeues))
349                 io_u = list_entry(td->io_u_requeues.next, struct io_u, list);
350         else if (!queue_full(td)) {
351                 io_u = list_entry(td->io_u_freelist.next, struct io_u, list);
352
353                 io_u->buflen = 0;
354                 io_u->resid = 0;
355                 io_u->file = NULL;
356         }
357
358         if (io_u) {
359                 assert(io_u->flags & IO_U_F_FREE);
360                 io_u->flags &= ~IO_U_F_FREE;
361
362                 io_u->error = 0;
363                 list_del(&io_u->list);
364                 list_add(&io_u->list, &td->io_u_busylist);
365                 td->cur_depth++;
366                 io_u_mark_depth(td);
367         }
368
369         return io_u;
370 }
371
372 /*
373  * Return an io_u to be processed. Gets a buflen and offset, sets direction,
374  * etc. The returned io_u is fully ready to be prepped and submitted.
375  */
376 struct io_u *get_io_u(struct thread_data *td)
377 {
378         struct fio_file *f;
379         struct io_u *io_u;
380
381         io_u = __get_io_u(td);
382         if (!io_u)
383                 return NULL;
384
385         /*
386          * from a requeue, io_u already setup
387          */
388         if (io_u->file)
389                 goto out;
390
391         f = get_next_file(td);
392         if (!f) {
393                 put_io_u(td, io_u);
394                 return NULL;
395         }
396
397         io_u->file = f;
398
399         if (td->zone_bytes >= td->zone_size) {
400                 td->zone_bytes = 0;
401                 f->last_pos += td->zone_skip;
402         }
403
404         if (fill_io_u(td, f, io_u)) {
405                 put_io_u(td, io_u);
406                 return NULL;
407         }
408
409         if (io_u->buflen + io_u->offset > f->real_file_size) {
410                 if (td->io_ops->flags & FIO_RAWIO) {
411                         put_io_u(td, io_u);
412                         return NULL;
413                 }
414
415                 io_u->buflen = f->real_file_size - io_u->offset;
416         }
417
418         if (io_u->ddir != DDIR_SYNC) {
419                 if (!io_u->buflen) {
420                         put_io_u(td, io_u);
421                         return NULL;
422                 }
423
424                 f->last_pos = io_u->offset + io_u->buflen;
425
426                 if (td->verify != VERIFY_NONE)
427                         populate_verify_io_u(td, io_u);
428         }
429
430         /*
431          * Set io data pointers.
432          */
433 out:
434         io_u->xfer_buf = io_u->buf;
435         io_u->xfer_buflen = io_u->buflen;
436
437         if (td_io_prep(td, io_u)) {
438                 put_io_u(td, io_u);
439                 return NULL;
440         }
441
442         fio_gettime(&io_u->start_time, NULL);
443         return io_u;
444 }
445
446 static void io_completed(struct thread_data *td, struct io_u *io_u,
447                          struct io_completion_data *icd)
448 {
449         unsigned long msec;
450
451         assert(io_u->flags & IO_U_F_FLIGHT);
452         io_u->flags &= ~IO_U_F_FLIGHT;
453
454         if (io_u->ddir == DDIR_SYNC) {
455                 td->last_was_sync = 1;
456                 return;
457         }
458
459         td->last_was_sync = 0;
460
461         if (!io_u->error) {
462                 unsigned int bytes = io_u->buflen - io_u->resid;
463                 const enum fio_ddir idx = io_u->ddir;
464                 int ret;
465
466                 td->io_blocks[idx]++;
467                 td->io_bytes[idx] += bytes;
468                 td->zone_bytes += bytes;
469                 td->this_io_bytes[idx] += bytes;
470
471                 io_u->file->last_completed_pos = io_u->offset + io_u->buflen;
472
473                 msec = mtime_since(&io_u->issue_time, &icd->time);
474
475                 add_clat_sample(td, idx, msec);
476                 add_bw_sample(td, idx, &icd->time);
477                 io_u_mark_latency(td, msec);
478
479                 if ((td_rw(td) || td_write(td)) && idx == DDIR_WRITE)
480                         log_io_piece(td, io_u);
481
482                 icd->bytes_done[idx] += bytes;
483
484                 if (icd->handler) {
485                         ret = icd->handler(io_u);
486                         if (ret && !icd->error)
487                                 icd->error = ret;
488                 }
489         } else
490                 icd->error = io_u->error;
491 }
492
493 static void init_icd(struct io_completion_data *icd, endio_handler *handler,
494                      int nr)
495 {
496         fio_gettime(&icd->time, NULL);
497
498         icd->handler = handler;
499         icd->nr = nr;
500
501         icd->error = 0;
502         icd->bytes_done[0] = icd->bytes_done[1] = 0;
503 }
504
505 static void ios_completed(struct thread_data *td,
506                           struct io_completion_data *icd)
507 {
508         struct io_u *io_u;
509         int i;
510
511         for (i = 0; i < icd->nr; i++) {
512                 io_u = td->io_ops->event(td, i);
513
514                 io_completed(td, io_u, icd);
515                 put_io_u(td, io_u);
516         }
517 }
518
519 /*
520  * Complete a single io_u for the sync engines.
521  */
522 long io_u_sync_complete(struct thread_data *td, struct io_u *io_u,
523                         endio_handler *handler)
524 {
525         struct io_completion_data icd;
526
527         init_icd(&icd, handler, 1);
528         io_completed(td, io_u, &icd);
529         put_io_u(td, io_u);
530
531         if (!icd.error)
532                 return icd.bytes_done[0] + icd.bytes_done[1];
533
534         return -1;
535 }
536
537 /*
538  * Called to complete min_events number of io for the async engines.
539  */
540 long io_u_queued_complete(struct thread_data *td, int min_events,
541                           endio_handler *handler)
542
543 {
544         struct io_completion_data icd;
545         struct timespec *tvp = NULL;
546         int ret;
547
548         if (min_events > 0) {
549                 ret = td_io_commit(td);
550                 if (ret < 0) {
551                         td_verror(td, -ret);
552                         return ret;
553                 }
554         } else {
555                 struct timespec ts = { .tv_sec = 0, .tv_nsec = 0, };
556
557                 tvp = &ts;
558         }
559
560         ret = td_io_getevents(td, min_events, td->cur_depth, tvp);
561         if (ret < 0) {
562                 td_verror(td, -ret);
563                 return ret;
564         } else if (!ret)
565                 return ret;
566
567         init_icd(&icd, handler, ret);
568         ios_completed(td, &icd);
569         if (!icd.error)
570                 return icd.bytes_done[0] + icd.bytes_done[1];
571
572         return -1;
573 }
574
575 /*
576  * Call when io_u is really queued, to update the submission latency.
577  */
578 void io_u_queued(struct thread_data *td, struct io_u *io_u)
579 {
580         unsigned long slat_time;
581
582         slat_time = mtime_since(&io_u->start_time, &io_u->issue_time);
583         add_slat_sample(td, io_u->ddir, slat_time);
584 }
585
586 void io_u_set_timeout(struct thread_data *td)
587 {
588         assert(td->cur_depth);
589
590         td->timer.it_interval.tv_sec = 0;
591         td->timer.it_interval.tv_usec = 0;
592         td->timer.it_value.tv_sec = IO_U_TIMEOUT + IO_U_TIMEOUT_INC;
593         td->timer.it_value.tv_usec = 0;
594         setitimer(ITIMER_REAL, &td->timer, NULL);
595         fio_gettime(&td->timeout_end, NULL);
596 }
597
598 static void io_u_timeout_handler(int fio_unused sig)
599 {
600         struct thread_data *td, *__td;
601         pid_t pid = getpid();
602         int i;
603
604         log_err("fio: io_u timeout\n");
605
606         /*
607          * TLS would be nice...
608          */
609         td = NULL;
610         for_each_td(__td, i) {
611                 if (__td->pid == pid) {
612                         td = __td;
613                         break;
614                 }
615         }
616
617         if (!td) {
618                 log_err("fio: io_u timeout, can't find job\n");
619                 exit(1);
620         }
621
622         if (!td->cur_depth) {
623                 log_err("fio: timeout without pending work?\n");
624                 return;
625         }
626
627         log_err("fio: io_u timeout: job=%s, pid=%d\n", td->name, td->pid);
628         td->error = ETIMEDOUT;
629         exit(1);
630 }
631
632 void io_u_init_timeout(void)
633 {
634         signal(SIGALRM, io_u_timeout_handler);
635 }