Fix wrong index bug
[fio.git] / backend.c
1 /*
2  * fio - the flexible io tester
3  *
4  * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5  * Copyright (C) 2006-2012 Jens Axboe <axboe@kernel.dk>
6  *
7  * The license below covers all files distributed with fio unless otherwise
8  * noted in the file itself.
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License version 2 as
12  *  published by the Free Software Foundation.
13  *
14  *  This program is distributed in the hope that it will be useful,
15  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *  GNU General Public License for more details.
18  *
19  *  You should have received a copy of the GNU General Public License
20  *  along with this program; if not, write to the Free Software
21  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22  *
23  */
24 #include <unistd.h>
25 #include <fcntl.h>
26 #include <string.h>
27 #include <limits.h>
28 #include <signal.h>
29 #include <time.h>
30 #include <locale.h>
31 #include <assert.h>
32 #include <time.h>
33 #include <inttypes.h>
34 #include <sys/stat.h>
35 #include <sys/wait.h>
36 #include <sys/ipc.h>
37 #include <sys/mman.h>
38
39 #include "fio.h"
40 #ifndef FIO_NO_HAVE_SHM_H
41 #include <sys/shm.h>
42 #endif
43 #include "hash.h"
44 #include "smalloc.h"
45 #include "verify.h"
46 #include "trim.h"
47 #include "diskutil.h"
48 #include "cgroup.h"
49 #include "profile.h"
50 #include "lib/rand.h"
51 #include "memalign.h"
52 #include "server.h"
53 #include "lib/getrusage.h"
54 #include "idletime.h"
55 #include "err.h"
56 #include "lib/tp.h"
57
58 static pthread_t helper_thread;
59 static pthread_mutex_t helper_lock;
60 pthread_cond_t helper_cond;
61 int helper_do_stat = 0;
62
63 static struct fio_mutex *startup_mutex;
64 static struct flist_head *cgroup_list;
65 static char *cgroup_mnt;
66 static int exit_value;
67 static volatile int fio_abort;
68 static unsigned int nr_process = 0;
69 static unsigned int nr_thread = 0;
70
71 struct io_log *agg_io_log[DDIR_RWDIR_CNT];
72
73 int groupid = 0;
74 unsigned int thread_number = 0;
75 unsigned int stat_number = 0;
76 int shm_id = 0;
77 int temp_stall_ts;
78 unsigned long done_secs = 0;
79 volatile int helper_exit = 0;
80
81 #define PAGE_ALIGN(buf) \
82         (char *) (((uintptr_t) (buf) + page_mask) & ~page_mask)
83
84 #define JOB_START_TIMEOUT       (5 * 1000)
85
86 static void sig_int(int sig)
87 {
88         if (threads) {
89                 if (is_backend)
90                         fio_server_got_signal(sig);
91                 else {
92                         log_info("\nfio: terminating on signal %d\n", sig);
93                         log_info_flush();
94                         exit_value = 128;
95                 }
96
97                 fio_terminate_threads(TERMINATE_ALL);
98         }
99 }
100
101 static void sig_show_status(int sig)
102 {
103         show_running_run_stats();
104 }
105
106 static void set_sig_handlers(void)
107 {
108         struct sigaction act;
109
110         memset(&act, 0, sizeof(act));
111         act.sa_handler = sig_int;
112         act.sa_flags = SA_RESTART;
113         sigaction(SIGINT, &act, NULL);
114
115         memset(&act, 0, sizeof(act));
116         act.sa_handler = sig_int;
117         act.sa_flags = SA_RESTART;
118         sigaction(SIGTERM, &act, NULL);
119
120 /* Windows uses SIGBREAK as a quit signal from other applications */
121 #ifdef WIN32
122         memset(&act, 0, sizeof(act));
123         act.sa_handler = sig_int;
124         act.sa_flags = SA_RESTART;
125         sigaction(SIGBREAK, &act, NULL);
126 #endif
127
128         memset(&act, 0, sizeof(act));
129         act.sa_handler = sig_show_status;
130         act.sa_flags = SA_RESTART;
131         sigaction(SIGUSR1, &act, NULL);
132
133         if (is_backend) {
134                 memset(&act, 0, sizeof(act));
135                 act.sa_handler = sig_int;
136                 act.sa_flags = SA_RESTART;
137                 sigaction(SIGPIPE, &act, NULL);
138         }
139 }
140
141 /*
142  * Check if we are above the minimum rate given.
143  */
144 static int __check_min_rate(struct thread_data *td, struct timeval *now,
145                             enum fio_ddir ddir)
146 {
147         unsigned long long bytes = 0;
148         unsigned long iops = 0;
149         unsigned long spent;
150         unsigned long rate;
151         unsigned int ratemin = 0;
152         unsigned int rate_iops = 0;
153         unsigned int rate_iops_min = 0;
154
155         assert(ddir_rw(ddir));
156
157         if (!td->o.ratemin[ddir] && !td->o.rate_iops_min[ddir])
158                 return 0;
159
160         /*
161          * allow a 2 second settle period in the beginning
162          */
163         if (mtime_since(&td->start, now) < 2000)
164                 return 0;
165
166         iops += td->this_io_blocks[ddir];
167         bytes += td->this_io_bytes[ddir];
168         ratemin += td->o.ratemin[ddir];
169         rate_iops += td->o.rate_iops[ddir];
170         rate_iops_min += td->o.rate_iops_min[ddir];
171
172         /*
173          * if rate blocks is set, sample is running
174          */
175         if (td->rate_bytes[ddir] || td->rate_blocks[ddir]) {
176                 spent = mtime_since(&td->lastrate[ddir], now);
177                 if (spent < td->o.ratecycle)
178                         return 0;
179
180                 if (td->o.rate[ddir]) {
181                         /*
182                          * check bandwidth specified rate
183                          */
184                         if (bytes < td->rate_bytes[ddir]) {
185                                 log_err("%s: min rate %u not met\n", td->o.name,
186                                                                 ratemin);
187                                 return 1;
188                         } else {
189                                 if (spent)
190                                         rate = ((bytes - td->rate_bytes[ddir]) * 1000) / spent;
191                                 else
192                                         rate = 0;
193
194                                 if (rate < ratemin ||
195                                     bytes < td->rate_bytes[ddir]) {
196                                         log_err("%s: min rate %u not met, got"
197                                                 " %luKB/sec\n", td->o.name,
198                                                         ratemin, rate);
199                                         return 1;
200                                 }
201                         }
202                 } else {
203                         /*
204                          * checks iops specified rate
205                          */
206                         if (iops < rate_iops) {
207                                 log_err("%s: min iops rate %u not met\n",
208                                                 td->o.name, rate_iops);
209                                 return 1;
210                         } else {
211                                 if (spent)
212                                         rate = ((iops - td->rate_blocks[ddir]) * 1000) / spent;
213                                 else
214                                         rate = 0;
215
216                                 if (rate < rate_iops_min ||
217                                     iops < td->rate_blocks[ddir]) {
218                                         log_err("%s: min iops rate %u not met,"
219                                                 " got %lu\n", td->o.name,
220                                                         rate_iops_min, rate);
221                                 }
222                         }
223                 }
224         }
225
226         td->rate_bytes[ddir] = bytes;
227         td->rate_blocks[ddir] = iops;
228         memcpy(&td->lastrate[ddir], now, sizeof(*now));
229         return 0;
230 }
231
232 static int check_min_rate(struct thread_data *td, struct timeval *now,
233                           uint64_t *bytes_done)
234 {
235         int ret = 0;
236
237         if (bytes_done[DDIR_READ])
238                 ret |= __check_min_rate(td, now, DDIR_READ);
239         if (bytes_done[DDIR_WRITE])
240                 ret |= __check_min_rate(td, now, DDIR_WRITE);
241         if (bytes_done[DDIR_TRIM])
242                 ret |= __check_min_rate(td, now, DDIR_TRIM);
243
244         return ret;
245 }
246
247 /*
248  * When job exits, we can cancel the in-flight IO if we are using async
249  * io. Attempt to do so.
250  */
251 static void cleanup_pending_aio(struct thread_data *td)
252 {
253         int r;
254
255         /*
256          * get immediately available events, if any
257          */
258         r = io_u_queued_complete(td, 0, NULL);
259         if (r < 0)
260                 return;
261
262         /*
263          * now cancel remaining active events
264          */
265         if (td->io_ops->cancel) {
266                 struct io_u *io_u;
267                 int i;
268
269                 io_u_qiter(&td->io_u_all, io_u, i) {
270                         if (io_u->flags & IO_U_F_FLIGHT) {
271                                 r = td->io_ops->cancel(td, io_u);
272                                 if (!r)
273                                         put_io_u(td, io_u);
274                         }
275                 }
276         }
277
278         if (td->cur_depth)
279                 r = io_u_queued_complete(td, td->cur_depth, NULL);
280 }
281
282 /*
283  * Helper to handle the final sync of a file. Works just like the normal
284  * io path, just does everything sync.
285  */
286 static int fio_io_sync(struct thread_data *td, struct fio_file *f)
287 {
288         struct io_u *io_u = __get_io_u(td);
289         int ret;
290
291         if (!io_u)
292                 return 1;
293
294         io_u->ddir = DDIR_SYNC;
295         io_u->file = f;
296
297         if (td_io_prep(td, io_u)) {
298                 put_io_u(td, io_u);
299                 return 1;
300         }
301
302 requeue:
303         ret = td_io_queue(td, io_u);
304         if (ret < 0) {
305                 td_verror(td, io_u->error, "td_io_queue");
306                 put_io_u(td, io_u);
307                 return 1;
308         } else if (ret == FIO_Q_QUEUED) {
309                 if (io_u_queued_complete(td, 1, NULL) < 0)
310                         return 1;
311         } else if (ret == FIO_Q_COMPLETED) {
312                 if (io_u->error) {
313                         td_verror(td, io_u->error, "td_io_queue");
314                         return 1;
315                 }
316
317                 if (io_u_sync_complete(td, io_u, NULL) < 0)
318                         return 1;
319         } else if (ret == FIO_Q_BUSY) {
320                 if (td_io_commit(td))
321                         return 1;
322                 goto requeue;
323         }
324
325         return 0;
326 }
327
328 static int fio_file_fsync(struct thread_data *td, struct fio_file *f)
329 {
330         int ret;
331
332         if (fio_file_open(f))
333                 return fio_io_sync(td, f);
334
335         if (td_io_open_file(td, f))
336                 return 1;
337
338         ret = fio_io_sync(td, f);
339         td_io_close_file(td, f);
340         return ret;
341 }
342
343 static inline void __update_tv_cache(struct thread_data *td)
344 {
345         fio_gettime(&td->tv_cache, NULL);
346 }
347
348 static inline void update_tv_cache(struct thread_data *td)
349 {
350         if ((++td->tv_cache_nr & td->tv_cache_mask) == td->tv_cache_mask)
351                 __update_tv_cache(td);
352 }
353
354 static inline int runtime_exceeded(struct thread_data *td, struct timeval *t)
355 {
356         if (in_ramp_time(td))
357                 return 0;
358         if (!td->o.timeout)
359                 return 0;
360         if (utime_since(&td->epoch, t) >= td->o.timeout)
361                 return 1;
362
363         return 0;
364 }
365
366 static int break_on_this_error(struct thread_data *td, enum fio_ddir ddir,
367                                int *retptr)
368 {
369         int ret = *retptr;
370
371         if (ret < 0 || td->error) {
372                 int err = td->error;
373                 enum error_type_bit eb;
374
375                 if (ret < 0)
376                         err = -ret;
377
378                 eb = td_error_type(ddir, err);
379                 if (!(td->o.continue_on_error & (1 << eb)))
380                         return 1;
381
382                 if (td_non_fatal_error(td, eb, err)) {
383                         /*
384                          * Continue with the I/Os in case of
385                          * a non fatal error.
386                          */
387                         update_error_count(td, err);
388                         td_clear_error(td);
389                         *retptr = 0;
390                         return 0;
391                 } else if (td->o.fill_device && err == ENOSPC) {
392                         /*
393                          * We expect to hit this error if
394                          * fill_device option is set.
395                          */
396                         td_clear_error(td);
397                         fio_mark_td_terminate(td);
398                         return 1;
399                 } else {
400                         /*
401                          * Stop the I/O in case of a fatal
402                          * error.
403                          */
404                         update_error_count(td, err);
405                         return 1;
406                 }
407         }
408
409         return 0;
410 }
411
412 static void check_update_rusage(struct thread_data *td)
413 {
414         if (td->update_rusage) {
415                 td->update_rusage = 0;
416                 update_rusage_stat(td);
417                 fio_mutex_up(td->rusage_sem);
418         }
419 }
420
421 static int wait_for_completions(struct thread_data *td, struct timeval *time,
422                                 uint64_t *bytes_done)
423 {
424         const int full = queue_full(td);
425         int min_evts = 0;
426         int ret;
427
428         /*
429          * if the queue is full, we MUST reap at least 1 event
430          */
431         min_evts = min(td->o.iodepth_batch_complete, td->cur_depth);
432         if (full && !min_evts)
433                 min_evts = 1;
434
435         if (time && (__should_check_rate(td, DDIR_READ) ||
436             __should_check_rate(td, DDIR_WRITE) ||
437             __should_check_rate(td, DDIR_TRIM)))
438                 fio_gettime(time, NULL);
439
440         do {
441                 ret = io_u_queued_complete(td, min_evts, bytes_done);
442                 if (ret < 0)
443                         break;
444         } while (full && (td->cur_depth > td->o.iodepth_low));
445
446         return ret;
447 }
448
449 /*
450  * The main verify engine. Runs over the writes we previously submitted,
451  * reads the blocks back in, and checks the crc/md5 of the data.
452  */
453 static void do_verify(struct thread_data *td, uint64_t verify_bytes)
454 {
455         uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
456         struct fio_file *f;
457         struct io_u *io_u;
458         int ret, min_events;
459         unsigned int i;
460
461         dprint(FD_VERIFY, "starting loop\n");
462
463         /*
464          * sync io first and invalidate cache, to make sure we really
465          * read from disk.
466          */
467         for_each_file(td, f, i) {
468                 if (!fio_file_open(f))
469                         continue;
470                 if (fio_io_sync(td, f))
471                         break;
472                 if (file_invalidate_cache(td, f))
473                         break;
474         }
475
476         check_update_rusage(td);
477
478         if (td->error)
479                 return;
480
481         td_set_runstate(td, TD_VERIFYING);
482
483         io_u = NULL;
484         while (!td->terminate) {
485                 enum fio_ddir ddir;
486                 int ret2, full;
487
488                 update_tv_cache(td);
489                 check_update_rusage(td);
490
491                 if (runtime_exceeded(td, &td->tv_cache)) {
492                         __update_tv_cache(td);
493                         if (runtime_exceeded(td, &td->tv_cache)) {
494                                 fio_mark_td_terminate(td);
495                                 break;
496                         }
497                 }
498
499                 if (flow_threshold_exceeded(td))
500                         continue;
501
502                 if (!td->o.experimental_verify) {
503                         io_u = __get_io_u(td);
504                         if (!io_u)
505                                 break;
506
507                         if (get_next_verify(td, io_u)) {
508                                 put_io_u(td, io_u);
509                                 break;
510                         }
511
512                         if (td_io_prep(td, io_u)) {
513                                 put_io_u(td, io_u);
514                                 break;
515                         }
516                 } else {
517                         if (ddir_rw_sum(bytes_done) + td->o.rw_min_bs > verify_bytes)
518                                 break;
519
520                         while ((io_u = get_io_u(td)) != NULL) {
521                                 if (IS_ERR(io_u)) {
522                                         io_u = NULL;
523                                         ret = FIO_Q_BUSY;
524                                         goto reap;
525                                 }
526
527                                 /*
528                                  * We are only interested in the places where
529                                  * we wrote or trimmed IOs. Turn those into
530                                  * reads for verification purposes.
531                                  */
532                                 if (io_u->ddir == DDIR_READ) {
533                                         /*
534                                          * Pretend we issued it for rwmix
535                                          * accounting
536                                          */
537                                         td->io_issues[DDIR_READ]++;
538                                         put_io_u(td, io_u);
539                                         continue;
540                                 } else if (io_u->ddir == DDIR_TRIM) {
541                                         io_u->ddir = DDIR_READ;
542                                         io_u->flags |= IO_U_F_TRIMMED;
543                                         break;
544                                 } else if (io_u->ddir == DDIR_WRITE) {
545                                         io_u->ddir = DDIR_READ;
546                                         break;
547                                 } else {
548                                         put_io_u(td, io_u);
549                                         continue;
550                                 }
551                         }
552
553                         if (!io_u)
554                                 break;
555                 }
556
557                 if (verify_state_should_stop(td, io_u)) {
558                         put_io_u(td, io_u);
559                         break;
560                 }
561
562                 if (td->o.verify_async)
563                         io_u->end_io = verify_io_u_async;
564                 else
565                         io_u->end_io = verify_io_u;
566
567                 ddir = io_u->ddir;
568                 if (!td->o.disable_slat)
569                         fio_gettime(&io_u->start_time, NULL);
570
571                 ret = td_io_queue(td, io_u);
572                 switch (ret) {
573                 case FIO_Q_COMPLETED:
574                         if (io_u->error) {
575                                 ret = -io_u->error;
576                                 clear_io_u(td, io_u);
577                         } else if (io_u->resid) {
578                                 int bytes = io_u->xfer_buflen - io_u->resid;
579
580                                 /*
581                                  * zero read, fail
582                                  */
583                                 if (!bytes) {
584                                         td_verror(td, EIO, "full resid");
585                                         put_io_u(td, io_u);
586                                         break;
587                                 }
588
589                                 io_u->xfer_buflen = io_u->resid;
590                                 io_u->xfer_buf += bytes;
591                                 io_u->offset += bytes;
592
593                                 if (ddir_rw(io_u->ddir))
594                                         td->ts.short_io_u[io_u->ddir]++;
595
596                                 f = io_u->file;
597                                 if (io_u->offset == f->real_file_size)
598                                         goto sync_done;
599
600                                 requeue_io_u(td, &io_u);
601                         } else {
602 sync_done:
603                                 ret = io_u_sync_complete(td, io_u, bytes_done);
604                                 if (ret < 0)
605                                         break;
606                         }
607                         continue;
608                 case FIO_Q_QUEUED:
609                         break;
610                 case FIO_Q_BUSY:
611                         requeue_io_u(td, &io_u);
612                         ret2 = td_io_commit(td);
613                         if (ret2 < 0)
614                                 ret = ret2;
615                         break;
616                 default:
617                         assert(ret < 0);
618                         td_verror(td, -ret, "td_io_queue");
619                         break;
620                 }
621
622                 if (break_on_this_error(td, ddir, &ret))
623                         break;
624
625                 /*
626                  * if we can queue more, do so. but check if there are
627                  * completed io_u's first. Note that we can get BUSY even
628                  * without IO queued, if the system is resource starved.
629                  */
630 reap:
631                 full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
632                 if (full || !td->o.iodepth_batch_complete)
633                         ret = wait_for_completions(td, NULL, bytes_done);
634
635                 if (ret < 0)
636                         break;
637         }
638
639         check_update_rusage(td);
640
641         if (!td->error) {
642                 min_events = td->cur_depth;
643
644                 if (min_events)
645                         ret = io_u_queued_complete(td, min_events, NULL);
646         } else
647                 cleanup_pending_aio(td);
648
649         td_set_runstate(td, TD_RUNNING);
650
651         dprint(FD_VERIFY, "exiting loop\n");
652 }
653
654 static unsigned int exceeds_number_ios(struct thread_data *td)
655 {
656         unsigned long long number_ios;
657
658         if (!td->o.number_ios)
659                 return 0;
660
661         number_ios = ddir_rw_sum(td->io_blocks);
662         number_ios += td->io_u_queued + td->io_u_in_flight;
663
664         return number_ios >= (td->o.number_ios * td->loops);
665 }
666
667 static int io_issue_bytes_exceeded(struct thread_data *td)
668 {
669         unsigned long long bytes, limit;
670
671         if (td_rw(td))
672                 bytes = td->io_issue_bytes[DDIR_READ] + td->io_issue_bytes[DDIR_WRITE];
673         else if (td_write(td))
674                 bytes = td->io_issue_bytes[DDIR_WRITE];
675         else if (td_read(td))
676                 bytes = td->io_issue_bytes[DDIR_READ];
677         else
678                 bytes = td->io_issue_bytes[DDIR_TRIM];
679
680         if (td->o.io_limit)
681                 limit = td->o.io_limit;
682         else
683                 limit = td->o.size;
684
685         limit *= td->loops;
686         return bytes >= limit || exceeds_number_ios(td);
687 }
688
689 static int io_complete_bytes_exceeded(struct thread_data *td)
690 {
691         unsigned long long bytes, limit;
692
693         if (td_rw(td))
694                 bytes = td->this_io_bytes[DDIR_READ] + td->this_io_bytes[DDIR_WRITE];
695         else if (td_write(td))
696                 bytes = td->this_io_bytes[DDIR_WRITE];
697         else if (td_read(td))
698                 bytes = td->this_io_bytes[DDIR_READ];
699         else
700                 bytes = td->this_io_bytes[DDIR_TRIM];
701
702         if (td->o.io_limit)
703                 limit = td->o.io_limit;
704         else
705                 limit = td->o.size;
706
707         limit *= td->loops;
708         return bytes >= limit || exceeds_number_ios(td);
709 }
710
711 /*
712  * Main IO worker function. It retrieves io_u's to process and queues
713  * and reaps them, checking for rate and errors along the way.
714  *
715  * Returns number of bytes written and trimmed.
716  */
717 static uint64_t do_io(struct thread_data *td)
718 {
719         uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
720         unsigned int i;
721         int ret = 0;
722         uint64_t total_bytes, bytes_issued = 0;
723
724         if (in_ramp_time(td))
725                 td_set_runstate(td, TD_RAMP);
726         else
727                 td_set_runstate(td, TD_RUNNING);
728
729         lat_target_init(td);
730
731         total_bytes = td->o.size;
732         /*
733         * Allow random overwrite workloads to write up to io_limit
734         * before starting verification phase as 'size' doesn't apply.
735         */
736         if (td_write(td) && td_random(td) && td->o.norandommap)
737                 total_bytes = max(total_bytes, (uint64_t) td->o.io_limit);
738         /*
739          * If verify_backlog is enabled, we'll run the verify in this
740          * handler as well. For that case, we may need up to twice the
741          * amount of bytes.
742          */
743         if (td->o.verify != VERIFY_NONE &&
744            (td_write(td) && td->o.verify_backlog))
745                 total_bytes += td->o.size;
746
747         /* In trimwrite mode, each byte is trimmed and then written, so
748          * allow total_bytes to be twice as big */
749         if (td_trimwrite(td))
750                 total_bytes += td->total_io_size;
751
752         while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) ||
753                 (!flist_empty(&td->trim_list)) || !io_issue_bytes_exceeded(td) ||
754                 td->o.time_based) {
755                 struct timeval comp_time;
756                 struct io_u *io_u;
757                 int ret2, full;
758                 enum fio_ddir ddir;
759
760                 check_update_rusage(td);
761
762                 if (td->terminate || td->done)
763                         break;
764
765                 update_tv_cache(td);
766
767                 if (runtime_exceeded(td, &td->tv_cache)) {
768                         __update_tv_cache(td);
769                         if (runtime_exceeded(td, &td->tv_cache)) {
770                                 fio_mark_td_terminate(td);
771                                 break;
772                         }
773                 }
774
775                 if (flow_threshold_exceeded(td))
776                         continue;
777
778                 if (bytes_issued >= total_bytes)
779                         break;
780
781                 io_u = get_io_u(td);
782                 if (IS_ERR_OR_NULL(io_u)) {
783                         int err = PTR_ERR(io_u);
784
785                         io_u = NULL;
786                         if (err == -EBUSY) {
787                                 ret = FIO_Q_BUSY;
788                                 goto reap;
789                         }
790                         if (td->o.latency_target)
791                                 goto reap;
792                         break;
793                 }
794
795                 ddir = io_u->ddir;
796
797                 /*
798                  * Add verification end_io handler if:
799                  *      - Asked to verify (!td_rw(td))
800                  *      - Or the io_u is from our verify list (mixed write/ver)
801                  */
802                 if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ &&
803                     ((io_u->flags & IO_U_F_VER_LIST) || !td_rw(td))) {
804
805                         if (!td->o.verify_pattern_bytes) {
806                                 io_u->rand_seed = __rand(&td->verify_state);
807                                 if (sizeof(int) != sizeof(long *))
808                                         io_u->rand_seed *= __rand(&td->verify_state);
809                         }
810
811                         if (verify_state_should_stop(td, io_u)) {
812                                 put_io_u(td, io_u);
813                                 break;
814                         }
815
816                         if (td->o.verify_async)
817                                 io_u->end_io = verify_io_u_async;
818                         else
819                                 io_u->end_io = verify_io_u;
820                         td_set_runstate(td, TD_VERIFYING);
821                 } else if (in_ramp_time(td))
822                         td_set_runstate(td, TD_RAMP);
823                 else
824                         td_set_runstate(td, TD_RUNNING);
825
826                 /*
827                  * Always log IO before it's issued, so we know the specific
828                  * order of it. The logged unit will track when the IO has
829                  * completed.
830                  */
831                 if (td_write(td) && io_u->ddir == DDIR_WRITE &&
832                     td->o.do_verify &&
833                     td->o.verify != VERIFY_NONE &&
834                     !td->o.experimental_verify)
835                         log_io_piece(td, io_u);
836
837                 ret = td_io_queue(td, io_u);
838                 switch (ret) {
839                 case FIO_Q_COMPLETED:
840                         if (io_u->error) {
841                                 ret = -io_u->error;
842                                 unlog_io_piece(td, io_u);
843                                 clear_io_u(td, io_u);
844                         } else if (io_u->resid) {
845                                 int bytes = io_u->xfer_buflen - io_u->resid;
846                                 struct fio_file *f = io_u->file;
847
848                                 bytes_issued += bytes;
849
850                                 trim_io_piece(td, io_u);
851
852                                 /*
853                                  * zero read, fail
854                                  */
855                                 if (!bytes) {
856                                         unlog_io_piece(td, io_u);
857                                         td_verror(td, EIO, "full resid");
858                                         put_io_u(td, io_u);
859                                         break;
860                                 }
861
862                                 io_u->xfer_buflen = io_u->resid;
863                                 io_u->xfer_buf += bytes;
864                                 io_u->offset += bytes;
865
866                                 if (ddir_rw(io_u->ddir))
867                                         td->ts.short_io_u[io_u->ddir]++;
868
869                                 if (io_u->offset == f->real_file_size)
870                                         goto sync_done;
871
872                                 requeue_io_u(td, &io_u);
873                         } else {
874 sync_done:
875                                 if (__should_check_rate(td, DDIR_READ) ||
876                                     __should_check_rate(td, DDIR_WRITE) ||
877                                     __should_check_rate(td, DDIR_TRIM))
878                                         fio_gettime(&comp_time, NULL);
879
880                                 ret = io_u_sync_complete(td, io_u, bytes_done);
881                                 if (ret < 0)
882                                         break;
883                                 bytes_issued += io_u->xfer_buflen;
884                         }
885                         break;
886                 case FIO_Q_QUEUED:
887                         /*
888                          * if the engine doesn't have a commit hook,
889                          * the io_u is really queued. if it does have such
890                          * a hook, it has to call io_u_queued() itself.
891                          */
892                         if (td->io_ops->commit == NULL)
893                                 io_u_queued(td, io_u);
894                         bytes_issued += io_u->xfer_buflen;
895                         break;
896                 case FIO_Q_BUSY:
897                         unlog_io_piece(td, io_u);
898                         requeue_io_u(td, &io_u);
899                         ret2 = td_io_commit(td);
900                         if (ret2 < 0)
901                                 ret = ret2;
902                         break;
903                 default:
904                         assert(ret < 0);
905                         put_io_u(td, io_u);
906                         break;
907                 }
908
909                 if (break_on_this_error(td, ddir, &ret))
910                         break;
911
912                 /*
913                  * See if we need to complete some commands. Note that we
914                  * can get BUSY even without IO queued, if the system is
915                  * resource starved.
916                  */
917 reap:
918                 full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
919                 if (full || !td->o.iodepth_batch_complete)
920                         ret = wait_for_completions(td, &comp_time, bytes_done);
921                 if (ret < 0)
922                         break;
923                 if (!ddir_rw_sum(bytes_done) && !(td->io_ops->flags & FIO_NOIO))
924                         continue;
925
926                 if (!in_ramp_time(td) && should_check_rate(td, bytes_done)) {
927                         if (check_min_rate(td, &comp_time, bytes_done)) {
928                                 if (exitall_on_terminate)
929                                         fio_terminate_threads(td->groupid);
930                                 td_verror(td, EIO, "check_min_rate");
931                                 break;
932                         }
933                 }
934                 if (!in_ramp_time(td) && td->o.latency_target)
935                         lat_target_check(td);
936
937                 if (td->o.thinktime) {
938                         unsigned long long b;
939
940                         b = ddir_rw_sum(td->io_blocks);
941                         if (!(b % td->o.thinktime_blocks)) {
942                                 int left;
943
944                                 io_u_quiesce(td);
945
946                                 if (td->o.thinktime_spin)
947                                         usec_spin(td->o.thinktime_spin);
948
949                                 left = td->o.thinktime - td->o.thinktime_spin;
950                                 if (left)
951                                         usec_sleep(td, left);
952                         }
953                 }
954         }
955
956         check_update_rusage(td);
957
958         if (td->trim_entries)
959                 log_err("fio: %lu trim entries leaked?\n", td->trim_entries);
960
961         if (td->o.fill_device && td->error == ENOSPC) {
962                 td->error = 0;
963                 fio_mark_td_terminate(td);
964         }
965         if (!td->error) {
966                 struct fio_file *f;
967
968                 i = td->cur_depth;
969                 if (i) {
970                         ret = io_u_queued_complete(td, i, bytes_done);
971                         if (td->o.fill_device && td->error == ENOSPC)
972                                 td->error = 0;
973                 }
974
975                 if (should_fsync(td) && td->o.end_fsync) {
976                         td_set_runstate(td, TD_FSYNCING);
977
978                         for_each_file(td, f, i) {
979                                 if (!fio_file_fsync(td, f))
980                                         continue;
981
982                                 log_err("fio: end_fsync failed for file %s\n",
983                                                                 f->file_name);
984                         }
985                 }
986         } else
987                 cleanup_pending_aio(td);
988
989         /*
990          * stop job if we failed doing any IO
991          */
992         if (!ddir_rw_sum(td->this_io_bytes))
993                 td->done = 1;
994
995         return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
996 }
997
998 static void cleanup_io_u(struct thread_data *td)
999 {
1000         struct io_u *io_u;
1001
1002         while ((io_u = io_u_qpop(&td->io_u_freelist)) != NULL) {
1003
1004                 if (td->io_ops->io_u_free)
1005                         td->io_ops->io_u_free(td, io_u);
1006
1007                 fio_memfree(io_u, sizeof(*io_u));
1008         }
1009
1010         free_io_mem(td);
1011
1012         io_u_rexit(&td->io_u_requeues);
1013         io_u_qexit(&td->io_u_freelist);
1014         io_u_qexit(&td->io_u_all);
1015
1016         if (td->last_write_comp)
1017                 sfree(td->last_write_comp);
1018 }
1019
1020 static int init_io_u(struct thread_data *td)
1021 {
1022         struct io_u *io_u;
1023         unsigned int max_bs, min_write;
1024         int cl_align, i, max_units;
1025         int data_xfer = 1, err;
1026         char *p;
1027
1028         max_units = td->o.iodepth;
1029         max_bs = td_max_bs(td);
1030         min_write = td->o.min_bs[DDIR_WRITE];
1031         td->orig_buffer_size = (unsigned long long) max_bs
1032                                         * (unsigned long long) max_units;
1033
1034         if ((td->io_ops->flags & FIO_NOIO) || !(td_read(td) || td_write(td)))
1035                 data_xfer = 0;
1036
1037         err = 0;
1038         err += io_u_rinit(&td->io_u_requeues, td->o.iodepth);
1039         err += io_u_qinit(&td->io_u_freelist, td->o.iodepth);
1040         err += io_u_qinit(&td->io_u_all, td->o.iodepth);
1041
1042         if (err) {
1043                 log_err("fio: failed setting up IO queues\n");
1044                 return 1;
1045         }
1046
1047         /*
1048          * if we may later need to do address alignment, then add any
1049          * possible adjustment here so that we don't cause a buffer
1050          * overflow later. this adjustment may be too much if we get
1051          * lucky and the allocator gives us an aligned address.
1052          */
1053         if (td->o.odirect || td->o.mem_align || td->o.oatomic ||
1054             (td->io_ops->flags & FIO_RAWIO))
1055                 td->orig_buffer_size += page_mask + td->o.mem_align;
1056
1057         if (td->o.mem_type == MEM_SHMHUGE || td->o.mem_type == MEM_MMAPHUGE) {
1058                 unsigned long bs;
1059
1060                 bs = td->orig_buffer_size + td->o.hugepage_size - 1;
1061                 td->orig_buffer_size = bs & ~(td->o.hugepage_size - 1);
1062         }
1063
1064         if (td->orig_buffer_size != (size_t) td->orig_buffer_size) {
1065                 log_err("fio: IO memory too large. Reduce max_bs or iodepth\n");
1066                 return 1;
1067         }
1068
1069         if (data_xfer && allocate_io_mem(td))
1070                 return 1;
1071
1072         if (td->o.odirect || td->o.mem_align || td->o.oatomic ||
1073             (td->io_ops->flags & FIO_RAWIO))
1074                 p = PAGE_ALIGN(td->orig_buffer) + td->o.mem_align;
1075         else
1076                 p = td->orig_buffer;
1077
1078         cl_align = os_cache_line_size();
1079
1080         for (i = 0; i < max_units; i++) {
1081                 void *ptr;
1082
1083                 if (td->terminate)
1084                         return 1;
1085
1086                 ptr = fio_memalign(cl_align, sizeof(*io_u));
1087                 if (!ptr) {
1088                         log_err("fio: unable to allocate aligned memory\n");
1089                         break;
1090                 }
1091
1092                 io_u = ptr;
1093                 memset(io_u, 0, sizeof(*io_u));
1094                 INIT_FLIST_HEAD(&io_u->verify_list);
1095                 dprint(FD_MEM, "io_u alloc %p, index %u\n", io_u, i);
1096
1097                 if (data_xfer) {
1098                         io_u->buf = p;
1099                         dprint(FD_MEM, "io_u %p, mem %p\n", io_u, io_u->buf);
1100
1101                         if (td_write(td))
1102                                 io_u_fill_buffer(td, io_u, min_write, max_bs);
1103                         if (td_write(td) && td->o.verify_pattern_bytes) {
1104                                 /*
1105                                  * Fill the buffer with the pattern if we are
1106                                  * going to be doing writes.
1107                                  */
1108                                 fill_verify_pattern(td, io_u->buf, max_bs, io_u, 0, 0);
1109                         }
1110                 }
1111
1112                 io_u->index = i;
1113                 io_u->flags = IO_U_F_FREE;
1114                 io_u_qpush(&td->io_u_freelist, io_u);
1115
1116                 /*
1117                  * io_u never leaves this stack, used for iteration of all
1118                  * io_u buffers.
1119                  */
1120                 io_u_qpush(&td->io_u_all, io_u);
1121
1122                 if (td->io_ops->io_u_init) {
1123                         int ret = td->io_ops->io_u_init(td, io_u);
1124
1125                         if (ret) {
1126                                 log_err("fio: failed to init engine data: %d\n", ret);
1127                                 return 1;
1128                         }
1129                 }
1130
1131                 p += max_bs;
1132         }
1133
1134         if (td->o.verify != VERIFY_NONE) {
1135                 td->last_write_comp = scalloc(max_units, sizeof(uint64_t));
1136                 if (!td->last_write_comp) {
1137                         log_err("fio: failed to alloc write comp data\n");
1138                         return 1;
1139                 }
1140         }
1141
1142         return 0;
1143 }
1144
1145 static int switch_ioscheduler(struct thread_data *td)
1146 {
1147         char tmp[256], tmp2[128];
1148         FILE *f;
1149         int ret;
1150
1151         if (td->io_ops->flags & FIO_DISKLESSIO)
1152                 return 0;
1153
1154         sprintf(tmp, "%s/queue/scheduler", td->sysfs_root);
1155
1156         f = fopen(tmp, "r+");
1157         if (!f) {
1158                 if (errno == ENOENT) {
1159                         log_err("fio: os or kernel doesn't support IO scheduler"
1160                                 " switching\n");
1161                         return 0;
1162                 }
1163                 td_verror(td, errno, "fopen iosched");
1164                 return 1;
1165         }
1166
1167         /*
1168          * Set io scheduler.
1169          */
1170         ret = fwrite(td->o.ioscheduler, strlen(td->o.ioscheduler), 1, f);
1171         if (ferror(f) || ret != 1) {
1172                 td_verror(td, errno, "fwrite");
1173                 fclose(f);
1174                 return 1;
1175         }
1176
1177         rewind(f);
1178
1179         /*
1180          * Read back and check that the selected scheduler is now the default.
1181          */
1182         ret = fread(tmp, sizeof(tmp), 1, f);
1183         if (ferror(f) || ret < 0) {
1184                 td_verror(td, errno, "fread");
1185                 fclose(f);
1186                 return 1;
1187         }
1188         tmp[sizeof(tmp) - 1] = '\0';
1189
1190
1191         sprintf(tmp2, "[%s]", td->o.ioscheduler);
1192         if (!strstr(tmp, tmp2)) {
1193                 log_err("fio: io scheduler %s not found\n", td->o.ioscheduler);
1194                 td_verror(td, EINVAL, "iosched_switch");
1195                 fclose(f);
1196                 return 1;
1197         }
1198
1199         fclose(f);
1200         return 0;
1201 }
1202
1203 static int keep_running(struct thread_data *td)
1204 {
1205         unsigned long long limit;
1206
1207         if (td->done)
1208                 return 0;
1209         if (td->o.time_based)
1210                 return 1;
1211         if (td->o.loops) {
1212                 td->o.loops--;
1213                 return 1;
1214         }
1215         if (exceeds_number_ios(td))
1216                 return 0;
1217
1218         if (td->o.io_limit)
1219                 limit = td->o.io_limit;
1220         else
1221                 limit = td->o.size;
1222
1223         if (limit != -1ULL && ddir_rw_sum(td->io_bytes) < limit) {
1224                 uint64_t diff;
1225
1226                 /*
1227                  * If the difference is less than the minimum IO size, we
1228                  * are done.
1229                  */
1230                 diff = limit - ddir_rw_sum(td->io_bytes);
1231                 if (diff < td_max_bs(td))
1232                         return 0;
1233
1234                 if (fio_files_done(td))
1235                         return 0;
1236
1237                 return 1;
1238         }
1239
1240         return 0;
1241 }
1242
1243 static int exec_string(struct thread_options *o, const char *string, const char *mode)
1244 {
1245         int ret, newlen = strlen(string) + strlen(o->name) + strlen(mode) + 9 + 1;
1246         char *str;
1247
1248         str = malloc(newlen);
1249         sprintf(str, "%s &> %s.%s.txt", string, o->name, mode);
1250
1251         log_info("%s : Saving output of %s in %s.%s.txt\n",o->name, mode, o->name, mode);
1252         ret = system(str);
1253         if (ret == -1)
1254                 log_err("fio: exec of cmd <%s> failed\n", str);
1255
1256         free(str);
1257         return ret;
1258 }
1259
1260 /*
1261  * Dry run to compute correct state of numberio for verification.
1262  */
1263 static uint64_t do_dry_run(struct thread_data *td)
1264 {
1265         uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
1266
1267         td_set_runstate(td, TD_RUNNING);
1268
1269         while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) ||
1270                 (!flist_empty(&td->trim_list)) || !io_complete_bytes_exceeded(td)) {
1271                 struct io_u *io_u;
1272                 int ret;
1273
1274                 if (td->terminate || td->done)
1275                         break;
1276
1277                 io_u = get_io_u(td);
1278                 if (!io_u)
1279                         break;
1280
1281                 io_u->flags |= IO_U_F_FLIGHT;
1282                 io_u->error = 0;
1283                 io_u->resid = 0;
1284                 if (ddir_rw(acct_ddir(io_u)))
1285                         td->io_issues[acct_ddir(io_u)]++;
1286                 if (ddir_rw(io_u->ddir)) {
1287                         io_u_mark_depth(td, 1);
1288                         td->ts.total_io_u[io_u->ddir]++;
1289                 }
1290
1291                 if (td_write(td) && io_u->ddir == DDIR_WRITE &&
1292                     td->o.do_verify &&
1293                     td->o.verify != VERIFY_NONE &&
1294                     !td->o.experimental_verify)
1295                         log_io_piece(td, io_u);
1296
1297                 ret = io_u_sync_complete(td, io_u, bytes_done);
1298                 (void) ret;
1299         }
1300
1301         return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
1302 }
1303
1304 /*
1305  * Entry point for the thread based jobs. The process based jobs end up
1306  * here as well, after a little setup.
1307  */
1308 static void *thread_main(void *data)
1309 {
1310         unsigned long long elapsed;
1311         struct thread_data *td = data;
1312         struct thread_options *o = &td->o;
1313         pthread_condattr_t attr;
1314         int clear_state;
1315         int ret;
1316
1317         if (!o->use_thread) {
1318                 setsid();
1319                 td->pid = getpid();
1320         } else
1321                 td->pid = gettid();
1322
1323         fio_local_clock_init(o->use_thread);
1324
1325         dprint(FD_PROCESS, "jobs pid=%d started\n", (int) td->pid);
1326
1327         if (is_backend)
1328                 fio_server_send_start(td);
1329
1330         INIT_FLIST_HEAD(&td->io_log_list);
1331         INIT_FLIST_HEAD(&td->io_hist_list);
1332         INIT_FLIST_HEAD(&td->verify_list);
1333         INIT_FLIST_HEAD(&td->trim_list);
1334         INIT_FLIST_HEAD(&td->next_rand_list);
1335         pthread_mutex_init(&td->io_u_lock, NULL);
1336         td->io_hist_tree = RB_ROOT;
1337
1338         pthread_condattr_init(&attr);
1339         pthread_cond_init(&td->verify_cond, &attr);
1340         pthread_cond_init(&td->free_cond, &attr);
1341
1342         td_set_runstate(td, TD_INITIALIZED);
1343         dprint(FD_MUTEX, "up startup_mutex\n");
1344         fio_mutex_up(startup_mutex);
1345         dprint(FD_MUTEX, "wait on td->mutex\n");
1346         fio_mutex_down(td->mutex);
1347         dprint(FD_MUTEX, "done waiting on td->mutex\n");
1348
1349         /*
1350          * A new gid requires privilege, so we need to do this before setting
1351          * the uid.
1352          */
1353         if (o->gid != -1U && setgid(o->gid)) {
1354                 td_verror(td, errno, "setgid");
1355                 goto err;
1356         }
1357         if (o->uid != -1U && setuid(o->uid)) {
1358                 td_verror(td, errno, "setuid");
1359                 goto err;
1360         }
1361
1362         /*
1363          * If we have a gettimeofday() thread, make sure we exclude that
1364          * thread from this job
1365          */
1366         if (o->gtod_cpu)
1367                 fio_cpu_clear(&o->cpumask, o->gtod_cpu);
1368
1369         /*
1370          * Set affinity first, in case it has an impact on the memory
1371          * allocations.
1372          */
1373         if (fio_option_is_set(o, cpumask)) {
1374                 if (o->cpus_allowed_policy == FIO_CPUS_SPLIT) {
1375                         ret = fio_cpus_split(&o->cpumask, td->thread_number - 1);
1376                         if (!ret) {
1377                                 log_err("fio: no CPUs set\n");
1378                                 log_err("fio: Try increasing number of available CPUs\n");
1379                                 td_verror(td, EINVAL, "cpus_split");
1380                                 goto err;
1381                         }
1382                 }
1383                 ret = fio_setaffinity(td->pid, o->cpumask);
1384                 if (ret == -1) {
1385                         td_verror(td, errno, "cpu_set_affinity");
1386                         goto err;
1387                 }
1388         }
1389
1390 #ifdef CONFIG_LIBNUMA
1391         /* numa node setup */
1392         if (fio_option_is_set(o, numa_cpunodes) ||
1393             fio_option_is_set(o, numa_memnodes)) {
1394                 struct bitmask *mask;
1395
1396                 if (numa_available() < 0) {
1397                         td_verror(td, errno, "Does not support NUMA API\n");
1398                         goto err;
1399                 }
1400
1401                 if (fio_option_is_set(o, numa_cpunodes)) {
1402                         mask = numa_parse_nodestring(o->numa_cpunodes);
1403                         ret = numa_run_on_node_mask(mask);
1404                         numa_free_nodemask(mask);
1405                         if (ret == -1) {
1406                                 td_verror(td, errno, \
1407                                         "numa_run_on_node_mask failed\n");
1408                                 goto err;
1409                         }
1410                 }
1411
1412                 if (fio_option_is_set(o, numa_memnodes)) {
1413                         mask = NULL;
1414                         if (o->numa_memnodes)
1415                                 mask = numa_parse_nodestring(o->numa_memnodes);
1416
1417                         switch (o->numa_mem_mode) {
1418                         case MPOL_INTERLEAVE:
1419                                 numa_set_interleave_mask(mask);
1420                                 break;
1421                         case MPOL_BIND:
1422                                 numa_set_membind(mask);
1423                                 break;
1424                         case MPOL_LOCAL:
1425                                 numa_set_localalloc();
1426                                 break;
1427                         case MPOL_PREFERRED:
1428                                 numa_set_preferred(o->numa_mem_prefer_node);
1429                                 break;
1430                         case MPOL_DEFAULT:
1431                         default:
1432                                 break;
1433                         }
1434
1435                         if (mask)
1436                                 numa_free_nodemask(mask);
1437
1438                 }
1439         }
1440 #endif
1441
1442         if (fio_pin_memory(td))
1443                 goto err;
1444
1445         /*
1446          * May alter parameters that init_io_u() will use, so we need to
1447          * do this first.
1448          */
1449         if (init_iolog(td))
1450                 goto err;
1451
1452         if (init_io_u(td))
1453                 goto err;
1454
1455         if (o->verify_async && verify_async_init(td))
1456                 goto err;
1457
1458         if (fio_option_is_set(o, ioprio) ||
1459             fio_option_is_set(o, ioprio_class)) {
1460                 ret = ioprio_set(IOPRIO_WHO_PROCESS, 0, o->ioprio_class, o->ioprio);
1461                 if (ret == -1) {
1462                         td_verror(td, errno, "ioprio_set");
1463                         goto err;
1464                 }
1465         }
1466
1467         if (o->cgroup && cgroup_setup(td, cgroup_list, &cgroup_mnt))
1468                 goto err;
1469
1470         errno = 0;
1471         if (nice(o->nice) == -1 && errno != 0) {
1472                 td_verror(td, errno, "nice");
1473                 goto err;
1474         }
1475
1476         if (o->ioscheduler && switch_ioscheduler(td))
1477                 goto err;
1478
1479         if (!o->create_serialize && setup_files(td))
1480                 goto err;
1481
1482         if (td_io_init(td))
1483                 goto err;
1484
1485         if (init_random_map(td))
1486                 goto err;
1487
1488         if (o->exec_prerun && exec_string(o, o->exec_prerun, (const char *)"prerun"))
1489                 goto err;
1490
1491         if (o->pre_read) {
1492                 if (pre_read_files(td) < 0)
1493                         goto err;
1494         }
1495
1496         if (td->flags & TD_F_COMPRESS_LOG)
1497                 tp_init(&td->tp_data);
1498
1499         fio_verify_init(td);
1500
1501         fio_gettime(&td->epoch, NULL);
1502         fio_getrusage(&td->ru_start);
1503         clear_state = 0;
1504         while (keep_running(td)) {
1505                 uint64_t verify_bytes;
1506
1507                 fio_gettime(&td->start, NULL);
1508                 memcpy(&td->bw_sample_time, &td->start, sizeof(td->start));
1509                 memcpy(&td->iops_sample_time, &td->start, sizeof(td->start));
1510                 memcpy(&td->tv_cache, &td->start, sizeof(td->start));
1511
1512                 if (o->ratemin[DDIR_READ] || o->ratemin[DDIR_WRITE] ||
1513                                 o->ratemin[DDIR_TRIM]) {
1514                         memcpy(&td->lastrate[DDIR_READ], &td->bw_sample_time,
1515                                                 sizeof(td->bw_sample_time));
1516                         memcpy(&td->lastrate[DDIR_WRITE], &td->bw_sample_time,
1517                                                 sizeof(td->bw_sample_time));
1518                         memcpy(&td->lastrate[DDIR_TRIM], &td->bw_sample_time,
1519                                                 sizeof(td->bw_sample_time));
1520                 }
1521
1522                 if (clear_state)
1523                         clear_io_state(td);
1524
1525                 prune_io_piece_log(td);
1526
1527                 if (td->o.verify_only && (td_write(td) || td_rw(td)))
1528                         verify_bytes = do_dry_run(td);
1529                 else
1530                         verify_bytes = do_io(td);
1531
1532                 clear_state = 1;
1533
1534                 /*
1535                  * Make sure we've successfully updated the rusage stats
1536                  * before waiting on the stat mutex. Otherwise we could have
1537                  * the stat thread holding stat mutex and waiting for
1538                  * the rusage_sem, which would never get upped because
1539                  * this thread is waiting for the stat mutex.
1540                  */
1541                 check_update_rusage(td);
1542
1543                 fio_mutex_down(stat_mutex);
1544                 if (td_read(td) && td->io_bytes[DDIR_READ]) {
1545                         elapsed = mtime_since_now(&td->start);
1546                         td->ts.runtime[DDIR_READ] += elapsed;
1547                 }
1548                 if (td_write(td) && td->io_bytes[DDIR_WRITE]) {
1549                         elapsed = mtime_since_now(&td->start);
1550                         td->ts.runtime[DDIR_WRITE] += elapsed;
1551                 }
1552                 if (td_trim(td) && td->io_bytes[DDIR_TRIM]) {
1553                         elapsed = mtime_since_now(&td->start);
1554                         td->ts.runtime[DDIR_TRIM] += elapsed;
1555                 }
1556                 fio_gettime(&td->start, NULL);
1557                 fio_mutex_up(stat_mutex);
1558
1559                 if (td->error || td->terminate)
1560                         break;
1561
1562                 if (!o->do_verify ||
1563                     o->verify == VERIFY_NONE ||
1564                     (td->io_ops->flags & FIO_UNIDIR))
1565                         continue;
1566
1567                 clear_io_state(td);
1568
1569                 fio_gettime(&td->start, NULL);
1570
1571                 do_verify(td, verify_bytes);
1572
1573                 /*
1574                  * See comment further up for why this is done here.
1575                  */
1576                 check_update_rusage(td);
1577
1578                 fio_mutex_down(stat_mutex);
1579                 td->ts.runtime[DDIR_READ] += mtime_since_now(&td->start);
1580                 fio_gettime(&td->start, NULL);
1581                 fio_mutex_up(stat_mutex);
1582
1583                 if (td->error || td->terminate)
1584                         break;
1585         }
1586
1587         update_rusage_stat(td);
1588         td->ts.total_run_time = mtime_since_now(&td->epoch);
1589         td->ts.io_bytes[DDIR_READ] = td->io_bytes[DDIR_READ];
1590         td->ts.io_bytes[DDIR_WRITE] = td->io_bytes[DDIR_WRITE];
1591         td->ts.io_bytes[DDIR_TRIM] = td->io_bytes[DDIR_TRIM];
1592
1593         if (td->o.verify_state_save && !(td->flags & TD_F_VSTATE_SAVED) &&
1594             (td->o.verify != VERIFY_NONE && td_write(td))) {
1595                 struct all_io_list *state;
1596                 size_t sz;
1597
1598                 state = get_all_io_list(td->thread_number, &sz);
1599                 if (state) {
1600                         __verify_save_state(state, "local");
1601                         free(state);
1602                 }
1603         }
1604
1605         fio_unpin_memory(td);
1606
1607         fio_writeout_logs(td);
1608
1609         if (td->flags & TD_F_COMPRESS_LOG)
1610                 tp_exit(&td->tp_data);
1611
1612         if (o->exec_postrun)
1613                 exec_string(o, o->exec_postrun, (const char *)"postrun");
1614
1615         if (exitall_on_terminate)
1616                 fio_terminate_threads(td->groupid);
1617
1618 err:
1619         if (td->error)
1620                 log_info("fio: pid=%d, err=%d/%s\n", (int) td->pid, td->error,
1621                                                         td->verror);
1622
1623         if (o->verify_async)
1624                 verify_async_exit(td);
1625
1626         close_and_free_files(td);
1627         cleanup_io_u(td);
1628         close_ioengine(td);
1629         cgroup_shutdown(td, &cgroup_mnt);
1630         verify_free_state(td);
1631
1632         if (fio_option_is_set(o, cpumask)) {
1633                 ret = fio_cpuset_exit(&o->cpumask);
1634                 if (ret)
1635                         td_verror(td, ret, "fio_cpuset_exit");
1636         }
1637
1638         /*
1639          * do this very late, it will log file closing as well
1640          */
1641         if (o->write_iolog_file)
1642                 write_iolog_close(td);
1643
1644         fio_mutex_remove(td->mutex);
1645         td->mutex = NULL;
1646
1647         td_set_runstate(td, TD_EXITED);
1648
1649         /*
1650          * Do this last after setting our runstate to exited, so we
1651          * know that the stat thread is signaled.
1652          */
1653         check_update_rusage(td);
1654
1655         return (void *) (uintptr_t) td->error;
1656 }
1657
1658
1659 /*
1660  * We cannot pass the td data into a forked process, so attach the td and
1661  * pass it to the thread worker.
1662  */
1663 static int fork_main(int shmid, int offset)
1664 {
1665         struct thread_data *td;
1666         void *data, *ret;
1667
1668 #if !defined(__hpux) && !defined(CONFIG_NO_SHM)
1669         data = shmat(shmid, NULL, 0);
1670         if (data == (void *) -1) {
1671                 int __err = errno;
1672
1673                 perror("shmat");
1674                 return __err;
1675         }
1676 #else
1677         /*
1678          * HP-UX inherits shm mappings?
1679          */
1680         data = threads;
1681 #endif
1682
1683         td = data + offset * sizeof(struct thread_data);
1684         ret = thread_main(td);
1685         shmdt(data);
1686         return (int) (uintptr_t) ret;
1687 }
1688
1689 static void dump_td_info(struct thread_data *td)
1690 {
1691         log_err("fio: job '%s' hasn't exited in %lu seconds, it appears to "
1692                 "be stuck. Doing forceful exit of this job.\n", td->o.name,
1693                         (unsigned long) time_since_now(&td->terminate_time));
1694 }
1695
1696 /*
1697  * Run over the job map and reap the threads that have exited, if any.
1698  */
1699 static void reap_threads(unsigned int *nr_running, unsigned int *t_rate,
1700                          unsigned int *m_rate)
1701 {
1702         struct thread_data *td;
1703         unsigned int cputhreads, realthreads, pending;
1704         int i, status, ret;
1705
1706         /*
1707          * reap exited threads (TD_EXITED -> TD_REAPED)
1708          */
1709         realthreads = pending = cputhreads = 0;
1710         for_each_td(td, i) {
1711                 int flags = 0;
1712
1713                 /*
1714                  * ->io_ops is NULL for a thread that has closed its
1715                  * io engine
1716                  */
1717                 if (td->io_ops && !strcmp(td->io_ops->name, "cpuio"))
1718                         cputhreads++;
1719                 else
1720                         realthreads++;
1721
1722                 if (!td->pid) {
1723                         pending++;
1724                         continue;
1725                 }
1726                 if (td->runstate == TD_REAPED)
1727                         continue;
1728                 if (td->o.use_thread) {
1729                         if (td->runstate == TD_EXITED) {
1730                                 td_set_runstate(td, TD_REAPED);
1731                                 goto reaped;
1732                         }
1733                         continue;
1734                 }
1735
1736                 flags = WNOHANG;
1737                 if (td->runstate == TD_EXITED)
1738                         flags = 0;
1739
1740                 /*
1741                  * check if someone quit or got killed in an unusual way
1742                  */
1743                 ret = waitpid(td->pid, &status, flags);
1744                 if (ret < 0) {
1745                         if (errno == ECHILD) {
1746                                 log_err("fio: pid=%d disappeared %d\n",
1747                                                 (int) td->pid, td->runstate);
1748                                 td->sig = ECHILD;
1749                                 td_set_runstate(td, TD_REAPED);
1750                                 goto reaped;
1751                         }
1752                         perror("waitpid");
1753                 } else if (ret == td->pid) {
1754                         if (WIFSIGNALED(status)) {
1755                                 int sig = WTERMSIG(status);
1756
1757                                 if (sig != SIGTERM && sig != SIGUSR2)
1758                                         log_err("fio: pid=%d, got signal=%d\n",
1759                                                         (int) td->pid, sig);
1760                                 td->sig = sig;
1761                                 td_set_runstate(td, TD_REAPED);
1762                                 goto reaped;
1763                         }
1764                         if (WIFEXITED(status)) {
1765                                 if (WEXITSTATUS(status) && !td->error)
1766                                         td->error = WEXITSTATUS(status);
1767
1768                                 td_set_runstate(td, TD_REAPED);
1769                                 goto reaped;
1770                         }
1771                 }
1772
1773                 /*
1774                  * If the job is stuck, do a forceful timeout of it and
1775                  * move on.
1776                  */
1777                 if (td->terminate &&
1778                     time_since_now(&td->terminate_time) >= FIO_REAP_TIMEOUT) {
1779                         dump_td_info(td);
1780                         td_set_runstate(td, TD_REAPED);
1781                         goto reaped;
1782                 }
1783
1784                 /*
1785                  * thread is not dead, continue
1786                  */
1787                 pending++;
1788                 continue;
1789 reaped:
1790                 (*nr_running)--;
1791                 (*m_rate) -= ddir_rw_sum(td->o.ratemin);
1792                 (*t_rate) -= ddir_rw_sum(td->o.rate);
1793                 if (!td->pid)
1794                         pending--;
1795
1796                 if (td->error)
1797                         exit_value++;
1798
1799                 done_secs += mtime_since_now(&td->epoch) / 1000;
1800                 profile_td_exit(td);
1801         }
1802
1803         if (*nr_running == cputhreads && !pending && realthreads)
1804                 fio_terminate_threads(TERMINATE_ALL);
1805 }
1806
1807 static int __check_trigger_file(void)
1808 {
1809         struct stat sb;
1810
1811         if (!trigger_file)
1812                 return 0;
1813
1814         if (stat(trigger_file, &sb))
1815                 return 0;
1816
1817         if (unlink(trigger_file) < 0)
1818                 log_err("fio: failed to unlink %s: %s\n", trigger_file,
1819                                                         strerror(errno));
1820
1821         return 1;
1822 }
1823
1824 static int trigger_timedout(void)
1825 {
1826         if (trigger_timeout)
1827                 return time_since_genesis() >= trigger_timeout;
1828
1829         return 0;
1830 }
1831
1832 void exec_trigger(const char *cmd)
1833 {
1834         int ret;
1835
1836         if (!cmd)
1837                 return;
1838
1839         ret = system(cmd);
1840         if (ret == -1)
1841                 log_err("fio: failed executing %s trigger\n", cmd);
1842 }
1843
1844 void check_trigger_file(void)
1845 {
1846         if (__check_trigger_file() || trigger_timedout()) {
1847                 if (nr_clients)
1848                         fio_clients_send_trigger(trigger_remote_cmd);
1849                 else {
1850                         verify_save_state();
1851                         fio_terminate_threads(TERMINATE_ALL);
1852                         exec_trigger(trigger_cmd);
1853                 }
1854         }
1855 }
1856
1857 static int fio_verify_load_state(struct thread_data *td)
1858 {
1859         int ret;
1860
1861         if (!td->o.verify_state)
1862                 return 0;
1863
1864         if (is_backend) {
1865                 void *data;
1866
1867                 ret = fio_server_get_verify_state(td->o.name,
1868                                         td->thread_number - 1, &data);
1869                 if (!ret)
1870                         verify_convert_assign_state(td, data);
1871         } else
1872                 ret = verify_load_state(td, "local");
1873
1874         return ret;
1875 }
1876
1877 static void do_usleep(unsigned int usecs)
1878 {
1879         check_for_running_stats();
1880         check_trigger_file();
1881         usleep(usecs);
1882 }
1883
1884 /*
1885  * Main function for kicking off and reaping jobs, as needed.
1886  */
1887 static void run_threads(void)
1888 {
1889         struct thread_data *td;
1890         unsigned int i, todo, nr_running, m_rate, t_rate, nr_started;
1891         uint64_t spent;
1892
1893         if (fio_gtod_offload && fio_start_gtod_thread())
1894                 return;
1895
1896         fio_idle_prof_init();
1897
1898         set_sig_handlers();
1899
1900         nr_thread = nr_process = 0;
1901         for_each_td(td, i) {
1902                 if (td->o.use_thread)
1903                         nr_thread++;
1904                 else
1905                         nr_process++;
1906         }
1907
1908         if (output_format == FIO_OUTPUT_NORMAL) {
1909                 log_info("Starting ");
1910                 if (nr_thread)
1911                         log_info("%d thread%s", nr_thread,
1912                                                 nr_thread > 1 ? "s" : "");
1913                 if (nr_process) {
1914                         if (nr_thread)
1915                                 log_info(" and ");
1916                         log_info("%d process%s", nr_process,
1917                                                 nr_process > 1 ? "es" : "");
1918                 }
1919                 log_info("\n");
1920                 log_info_flush();
1921         }
1922
1923         todo = thread_number;
1924         nr_running = 0;
1925         nr_started = 0;
1926         m_rate = t_rate = 0;
1927
1928         for_each_td(td, i) {
1929                 print_status_init(td->thread_number - 1);
1930
1931                 if (!td->o.create_serialize)
1932                         continue;
1933
1934                 if (fio_verify_load_state(td))
1935                         goto reap;
1936
1937                 /*
1938                  * do file setup here so it happens sequentially,
1939                  * we don't want X number of threads getting their
1940                  * client data interspersed on disk
1941                  */
1942                 if (setup_files(td)) {
1943 reap:
1944                         exit_value++;
1945                         if (td->error)
1946                                 log_err("fio: pid=%d, err=%d/%s\n",
1947                                         (int) td->pid, td->error, td->verror);
1948                         td_set_runstate(td, TD_REAPED);
1949                         todo--;
1950                 } else {
1951                         struct fio_file *f;
1952                         unsigned int j;
1953
1954                         /*
1955                          * for sharing to work, each job must always open
1956                          * its own files. so close them, if we opened them
1957                          * for creation
1958                          */
1959                         for_each_file(td, f, j) {
1960                                 if (fio_file_open(f))
1961                                         td_io_close_file(td, f);
1962                         }
1963                 }
1964         }
1965
1966         /* start idle threads before io threads start to run */
1967         fio_idle_prof_start();
1968
1969         set_genesis_time();
1970
1971         while (todo) {
1972                 struct thread_data *map[REAL_MAX_JOBS];
1973                 struct timeval this_start;
1974                 int this_jobs = 0, left;
1975
1976                 /*
1977                  * create threads (TD_NOT_CREATED -> TD_CREATED)
1978                  */
1979                 for_each_td(td, i) {
1980                         if (td->runstate != TD_NOT_CREATED)
1981                                 continue;
1982
1983                         /*
1984                          * never got a chance to start, killed by other
1985                          * thread for some reason
1986                          */
1987                         if (td->terminate) {
1988                                 todo--;
1989                                 continue;
1990                         }
1991
1992                         if (td->o.start_delay) {
1993                                 spent = utime_since_genesis();
1994
1995                                 if (td->o.start_delay > spent)
1996                                         continue;
1997                         }
1998
1999                         if (td->o.stonewall && (nr_started || nr_running)) {
2000                                 dprint(FD_PROCESS, "%s: stonewall wait\n",
2001                                                         td->o.name);
2002                                 break;
2003                         }
2004
2005                         init_disk_util(td);
2006
2007                         td->rusage_sem = fio_mutex_init(FIO_MUTEX_LOCKED);
2008                         td->update_rusage = 0;
2009
2010                         /*
2011                          * Set state to created. Thread will transition
2012                          * to TD_INITIALIZED when it's done setting up.
2013                          */
2014                         td_set_runstate(td, TD_CREATED);
2015                         map[this_jobs++] = td;
2016                         nr_started++;
2017
2018                         if (td->o.use_thread) {
2019                                 int ret;
2020
2021                                 dprint(FD_PROCESS, "will pthread_create\n");
2022                                 ret = pthread_create(&td->thread, NULL,
2023                                                         thread_main, td);
2024                                 if (ret) {
2025                                         log_err("pthread_create: %s\n",
2026                                                         strerror(ret));
2027                                         nr_started--;
2028                                         break;
2029                                 }
2030                                 ret = pthread_detach(td->thread);
2031                                 if (ret)
2032                                         log_err("pthread_detach: %s",
2033                                                         strerror(ret));
2034                         } else {
2035                                 pid_t pid;
2036                                 dprint(FD_PROCESS, "will fork\n");
2037                                 pid = fork();
2038                                 if (!pid) {
2039                                         int ret = fork_main(shm_id, i);
2040
2041                                         _exit(ret);
2042                                 } else if (i == fio_debug_jobno)
2043                                         *fio_debug_jobp = pid;
2044                         }
2045                         dprint(FD_MUTEX, "wait on startup_mutex\n");
2046                         if (fio_mutex_down_timeout(startup_mutex, 10)) {
2047                                 log_err("fio: job startup hung? exiting.\n");
2048                                 fio_terminate_threads(TERMINATE_ALL);
2049                                 fio_abort = 1;
2050                                 nr_started--;
2051                                 break;
2052                         }
2053                         dprint(FD_MUTEX, "done waiting on startup_mutex\n");
2054                 }
2055
2056                 /*
2057                  * Wait for the started threads to transition to
2058                  * TD_INITIALIZED.
2059                  */
2060                 fio_gettime(&this_start, NULL);
2061                 left = this_jobs;
2062                 while (left && !fio_abort) {
2063                         if (mtime_since_now(&this_start) > JOB_START_TIMEOUT)
2064                                 break;
2065
2066                         do_usleep(100000);
2067
2068                         for (i = 0; i < this_jobs; i++) {
2069                                 td = map[i];
2070                                 if (!td)
2071                                         continue;
2072                                 if (td->runstate == TD_INITIALIZED) {
2073                                         map[i] = NULL;
2074                                         left--;
2075                                 } else if (td->runstate >= TD_EXITED) {
2076                                         map[i] = NULL;
2077                                         left--;
2078                                         todo--;
2079                                         nr_running++; /* work-around... */
2080                                 }
2081                         }
2082                 }
2083
2084                 if (left) {
2085                         log_err("fio: %d job%s failed to start\n", left,
2086                                         left > 1 ? "s" : "");
2087                         for (i = 0; i < this_jobs; i++) {
2088                                 td = map[i];
2089                                 if (!td)
2090                                         continue;
2091                                 kill(td->pid, SIGTERM);
2092                         }
2093                         break;
2094                 }
2095
2096                 /*
2097                  * start created threads (TD_INITIALIZED -> TD_RUNNING).
2098                  */
2099                 for_each_td(td, i) {
2100                         if (td->runstate != TD_INITIALIZED)
2101                                 continue;
2102
2103                         if (in_ramp_time(td))
2104                                 td_set_runstate(td, TD_RAMP);
2105                         else
2106                                 td_set_runstate(td, TD_RUNNING);
2107                         nr_running++;
2108                         nr_started--;
2109                         m_rate += ddir_rw_sum(td->o.ratemin);
2110                         t_rate += ddir_rw_sum(td->o.rate);
2111                         todo--;
2112                         fio_mutex_up(td->mutex);
2113                 }
2114
2115                 reap_threads(&nr_running, &t_rate, &m_rate);
2116
2117                 if (todo)
2118                         do_usleep(100000);
2119         }
2120
2121         while (nr_running) {
2122                 reap_threads(&nr_running, &t_rate, &m_rate);
2123                 do_usleep(10000);
2124         }
2125
2126         fio_idle_prof_stop();
2127
2128         update_io_ticks();
2129 }
2130
2131 static void wait_for_helper_thread_exit(void)
2132 {
2133         void *ret;
2134
2135         helper_exit = 1;
2136         pthread_cond_signal(&helper_cond);
2137         pthread_join(helper_thread, &ret);
2138 }
2139
2140 static void free_disk_util(void)
2141 {
2142         disk_util_prune_entries();
2143
2144         pthread_cond_destroy(&helper_cond);
2145 }
2146
2147 static void *helper_thread_main(void *data)
2148 {
2149         int ret = 0;
2150
2151         fio_mutex_up(startup_mutex);
2152
2153         while (!ret) {
2154                 uint64_t sec = DISK_UTIL_MSEC / 1000;
2155                 uint64_t nsec = (DISK_UTIL_MSEC % 1000) * 1000000;
2156                 struct timespec ts;
2157                 struct timeval tv;
2158
2159                 gettimeofday(&tv, NULL);
2160                 ts.tv_sec = tv.tv_sec + sec;
2161                 ts.tv_nsec = (tv.tv_usec * 1000) + nsec;
2162
2163                 if (ts.tv_nsec >= 1000000000ULL) {
2164                         ts.tv_nsec -= 1000000000ULL;
2165                         ts.tv_sec++;
2166                 }
2167
2168                 pthread_cond_timedwait(&helper_cond, &helper_lock, &ts);
2169
2170                 ret = update_io_ticks();
2171
2172                 if (helper_do_stat) {
2173                         helper_do_stat = 0;
2174                         __show_running_run_stats();
2175                 }
2176
2177                 if (!is_backend)
2178                         print_thread_status();
2179         }
2180
2181         return NULL;
2182 }
2183
2184 static int create_helper_thread(void)
2185 {
2186         int ret;
2187
2188         setup_disk_util();
2189
2190         pthread_cond_init(&helper_cond, NULL);
2191         pthread_mutex_init(&helper_lock, NULL);
2192
2193         ret = pthread_create(&helper_thread, NULL, helper_thread_main, NULL);
2194         if (ret) {
2195                 log_err("Can't create helper thread: %s\n", strerror(ret));
2196                 return 1;
2197         }
2198
2199         dprint(FD_MUTEX, "wait on startup_mutex\n");
2200         fio_mutex_down(startup_mutex);
2201         dprint(FD_MUTEX, "done waiting on startup_mutex\n");
2202         return 0;
2203 }
2204
2205 int fio_backend(void)
2206 {
2207         struct thread_data *td;
2208         int i;
2209
2210         if (exec_profile) {
2211                 if (load_profile(exec_profile))
2212                         return 1;
2213                 free(exec_profile);
2214                 exec_profile = NULL;
2215         }
2216         if (!thread_number)
2217                 return 0;
2218
2219         if (write_bw_log) {
2220                 struct log_params p = {
2221                         .log_type = IO_LOG_TYPE_BW,
2222                 };
2223
2224                 setup_log(&agg_io_log[DDIR_READ], &p, "agg-read_bw.log");
2225                 setup_log(&agg_io_log[DDIR_WRITE], &p, "agg-write_bw.log");
2226                 setup_log(&agg_io_log[DDIR_TRIM], &p, "agg-trim_bw.log");
2227         }
2228
2229         startup_mutex = fio_mutex_init(FIO_MUTEX_LOCKED);
2230         if (startup_mutex == NULL)
2231                 return 1;
2232
2233         set_genesis_time();
2234         stat_init();
2235         create_helper_thread();
2236
2237         cgroup_list = smalloc(sizeof(*cgroup_list));
2238         INIT_FLIST_HEAD(cgroup_list);
2239
2240         run_threads();
2241
2242         wait_for_helper_thread_exit();
2243
2244         if (!fio_abort) {
2245                 __show_run_stats();
2246                 if (write_bw_log) {
2247                         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
2248                                 struct io_log *log = agg_io_log[i];
2249
2250                                 flush_log(log);
2251                                 free_log(log);
2252                         }
2253                 }
2254         }
2255
2256         for_each_td(td, i) {
2257                 fio_options_free(td);
2258                 if (td->rusage_sem) {
2259                         fio_mutex_remove(td->rusage_sem);
2260                         td->rusage_sem = NULL;
2261                 }
2262         }
2263
2264         free_disk_util();
2265         cgroup_kill(cgroup_list);
2266         sfree(cgroup_list);
2267         sfree(cgroup_mnt);
2268
2269         fio_mutex_remove(startup_mutex);
2270         stat_exit();
2271         return exit_value;
2272 }