Ensure that stat outputs are serialized
[fio.git] / backend.c
index 592ea8bfaa05cc17b4a8a3e23c1cd54b5aebea91..89ffee1277f5ad47f733b56e9bf5c86ad9b39fc8 100644 (file)
--- a/backend.c
+++ b/backend.c
 #include <sys/stat.h>
 #include <sys/wait.h>
 #include <sys/ipc.h>
-#ifndef FIO_NO_HAVE_SHM_H
-#include <sys/shm.h>
-#endif
 #include <sys/mman.h>
 
 #include "fio.h"
+#ifndef FIO_NO_HAVE_SHM_H
+#include <sys/shm.h>
+#endif
 #include "hash.h"
 #include "smalloc.h"
 #include "verify.h"
@@ -322,6 +322,21 @@ requeue:
        return 0;
 }
 
+static int fio_file_fsync(struct thread_data *td, struct fio_file *f)
+{
+       int ret;
+
+       if (fio_file_open(f))
+               return fio_io_sync(td, f);
+
+       if (td_io_open_file(td, f))
+               return 1;
+
+       ret = fio_io_sync(td, f);
+       td_io_close_file(td, f);
+       return ret;
+}
+
 static inline void __update_tv_cache(struct thread_data *td)
 {
        fio_gettime(&td->tv_cache, NULL);
@@ -391,6 +406,15 @@ static int break_on_this_error(struct thread_data *td, enum fio_ddir ddir,
        return 0;
 }
 
+static void check_update_rusage(struct thread_data *td)
+{
+       if (td->update_rusage) {
+               td->update_rusage = 0;
+               update_rusage_stat(td);
+               fio_mutex_up(td->rusage_sem);
+       }
+}
+
 /*
  * The main verify engine. Runs over the writes we previously submitted,
  * reads the blocks back in, and checks the crc/md5 of the data.
@@ -418,6 +442,8 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes)
                        break;
        }
 
+       check_update_rusage(td);
+
        if (td->error)
                return;
 
@@ -429,6 +455,7 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes)
                int ret2, full;
 
                update_tv_cache(td);
+               check_update_rusage(td);
 
                if (runtime_exceeded(td, &td->tv_cache)) {
                        __update_tv_cache(td);
@@ -582,6 +609,8 @@ sync_done:
                        break;
        }
 
+       check_update_rusage(td);
+
        if (!td->error) {
                min_events = td->cur_depth;
 
@@ -637,6 +666,8 @@ static uint64_t do_io(struct thread_data *td)
                int ret2, full;
                enum fio_ddir ddir;
 
+               check_update_rusage(td);
+
                if (td->terminate || td->done)
                        break;
 
@@ -801,8 +832,10 @@ sync_done:
                }
        }
 
+       check_update_rusage(td);
+
        if (td->trim_entries)
-               log_err("fio: %d trim entries leaked?\n", td->trim_entries);
+               log_err("fio: %lu trim entries leaked?\n", td->trim_entries);
 
        if (td->o.fill_device && td->error == ENOSPC) {
                td->error = 0;
@@ -822,9 +855,11 @@ sync_done:
                        td_set_runstate(td, TD_FSYNCING);
 
                        for_each_file(td, f, i) {
-                               if (!fio_file_open(f))
+                               if (!fio_file_fsync(td, f))
                                        continue;
-                               fio_io_sync(td, f);
+
+                               log_err("fio: end_fsync failed for file %s\n",
+                                                               f->file_name);
                        }
                }
        } else
@@ -867,8 +902,7 @@ static int init_io_u(struct thread_data *td)
        char *p;
 
        max_units = td->o.iodepth;
-       max_bs = max(td->o.max_bs[DDIR_READ], td->o.max_bs[DDIR_WRITE]);
-       max_bs = max(td->o.max_bs[DDIR_TRIM], max_bs);
+       max_bs = td_max_bs(td);
        min_write = td->o.min_bs[DDIR_WRITE];
        td->orig_buffer_size = (unsigned long long) max_bs
                                        * (unsigned long long) max_units;
@@ -876,6 +910,15 @@ static int init_io_u(struct thread_data *td)
        if ((td->io_ops->flags & FIO_NOIO) || !(td_read(td) || td_write(td)))
                data_xfer = 0;
 
+       /*
+        * if we may later need to do address alignment, then add any
+        * possible adjustment here so that we don't cause a buffer
+        * overflow later. this adjustment may be too much if we get
+        * lucky and the allocator gives us an aligned address.
+        */
+       if (td->o.odirect || td->o.mem_align || (td->io_ops->flags & FIO_RAWIO))
+               td->orig_buffer_size += page_mask + td->o.mem_align;
+
        if (td->o.mem_type == MEM_SHMHUGE || td->o.mem_type == MEM_MMAPHUGE) {
                unsigned long bs;
 
@@ -1017,7 +1060,7 @@ static int keep_running(struct thread_data *td)
                return 1;
        }
 
-       if (ddir_rw_sum(td->io_bytes) < td->o.size) {
+       if (td->o.size != -1ULL && ddir_rw_sum(td->io_bytes) < td->o.size) {
                uint64_t diff;
 
                /*
@@ -1025,7 +1068,7 @@ static int keep_running(struct thread_data *td)
                 * are done.
                 */
                diff = td->o.size - ddir_rw_sum(td->io_bytes);
-               if (diff < td->o.rw_min_bs)
+               if (diff < td_max_bs(td))
                        return 0;
 
                return 1;
@@ -1069,7 +1112,7 @@ static void *thread_main(void *data)
        } else
                td->pid = gettid();
 
-       fio_local_clock_init(td->o.use_thread);
+       fio_local_clock_init(o->use_thread);
 
        dprint(FD_PROCESS, "jobs pid=%d started\n", (int) td->pid);
 
@@ -1139,7 +1182,7 @@ static void *thread_main(void *data)
 
 #ifdef CONFIG_LIBNUMA
        /* numa node setup */
-       if (td->o.numa_cpumask_set || td->o.numa_memmask_set) {
+       if (o->numa_cpumask_set || o->numa_memmask_set) {
                int ret;
 
                if (numa_available() < 0) {
@@ -1147,8 +1190,8 @@ static void *thread_main(void *data)
                        goto err;
                }
 
-               if (td->o.numa_cpumask_set) {
-                       ret = numa_run_on_node_mask(td->o.numa_cpunodesmask);
+               if (o->numa_cpumask_set) {
+                       ret = numa_run_on_node_mask(o->numa_cpunodesmask);
                        if (ret == -1) {
                                td_verror(td, errno, \
                                        "numa_run_on_node_mask failed\n");
@@ -1156,20 +1199,20 @@ static void *thread_main(void *data)
                        }
                }
 
-               if (td->o.numa_memmask_set) {
+               if (o->numa_memmask_set) {
 
-                       switch (td->o.numa_mem_mode) {
+                       switch (o->numa_mem_mode) {
                        case MPOL_INTERLEAVE:
-                               numa_set_interleave_mask(td->o.numa_memnodesmask);
+                               numa_set_interleave_mask(o->numa_memnodesmask);
                                break;
                        case MPOL_BIND:
-                               numa_set_membind(td->o.numa_memnodesmask);
+                               numa_set_membind(o->numa_memnodesmask);
                                break;
                        case MPOL_LOCAL:
                                numa_set_localalloc();
                                break;
                        case MPOL_PREFERRED:
-                               numa_set_preferred(td->o.numa_mem_prefer_node);
+                               numa_set_preferred(o->numa_mem_prefer_node);
                                break;
                        case MPOL_DEFAULT:
                        default:
@@ -1180,6 +1223,9 @@ static void *thread_main(void *data)
        }
 #endif
 
+       if (fio_pin_memory(td))
+               goto err;
+
        /*
         * May alter parameters that init_io_u() will use, so we need to
         * do this first.
@@ -1201,7 +1247,7 @@ static void *thread_main(void *data)
                }
        }
 
-       if (td->o.cgroup && cgroup_setup(td, cgroup_list, &cgroup_mnt))
+       if (o->cgroup && cgroup_setup(td, cgroup_list, &cgroup_mnt))
                goto err;
 
        errno = 0;
@@ -1243,8 +1289,8 @@ static void *thread_main(void *data)
                memcpy(&td->iops_sample_time, &td->start, sizeof(td->start));
                memcpy(&td->tv_cache, &td->start, sizeof(td->start));
 
-               if (td->o.ratemin[DDIR_READ] || td->o.ratemin[DDIR_WRITE] ||
-                               td->o.ratemin[DDIR_TRIM]) {
+               if (o->ratemin[DDIR_READ] || o->ratemin[DDIR_WRITE] ||
+                               o->ratemin[DDIR_TRIM]) {
                        memcpy(&td->lastrate[DDIR_READ], &td->bw_sample_time,
                                                sizeof(td->bw_sample_time));
                        memcpy(&td->lastrate[DDIR_WRITE], &td->bw_sample_time,
@@ -1278,8 +1324,8 @@ static void *thread_main(void *data)
                if (td->error || td->terminate)
                        break;
 
-               if (!td->o.do_verify ||
-                   td->o.verify == VERIFY_NONE ||
+               if (!o->do_verify ||
+                   o->verify == VERIFY_NONE ||
                    (td->io_ops->flags & FIO_UNIDIR))
                        continue;
 
@@ -1308,44 +1354,44 @@ static void *thread_main(void *data)
 
        fio_mutex_down(writeout_mutex);
        if (td->bw_log) {
-               if (td->o.bw_log_file) {
+               if (o->bw_log_file) {
                        finish_log_named(td, td->bw_log,
-                                               td->o.bw_log_file, "bw");
+                                               o->bw_log_file, "bw");
                } else
                        finish_log(td, td->bw_log, "bw");
        }
        if (td->lat_log) {
-               if (td->o.lat_log_file) {
+               if (o->lat_log_file) {
                        finish_log_named(td, td->lat_log,
-                                               td->o.lat_log_file, "lat");
+                                               o->lat_log_file, "lat");
                } else
                        finish_log(td, td->lat_log, "lat");
        }
        if (td->slat_log) {
-               if (td->o.lat_log_file) {
+               if (o->lat_log_file) {
                        finish_log_named(td, td->slat_log,
-                                               td->o.lat_log_file, "slat");
+                                               o->lat_log_file, "slat");
                } else
                        finish_log(td, td->slat_log, "slat");
        }
        if (td->clat_log) {
-               if (td->o.lat_log_file) {
+               if (o->lat_log_file) {
                        finish_log_named(td, td->clat_log,
-                                               td->o.lat_log_file, "clat");
+                                               o->lat_log_file, "clat");
                } else
                        finish_log(td, td->clat_log, "clat");
        }
        if (td->iops_log) {
-               if (td->o.iops_log_file) {
+               if (o->iops_log_file) {
                        finish_log_named(td, td->iops_log,
-                                               td->o.iops_log_file, "iops");
+                                               o->iops_log_file, "iops");
                } else
                        finish_log(td, td->iops_log, "iops");
        }
 
        fio_mutex_up(writeout_mutex);
-       if (td->o.exec_postrun)
-               exec_string(td->o.exec_postrun);
+       if (o->exec_postrun)
+               exec_string(o->exec_postrun);
 
        if (exitall_on_terminate)
                fio_terminate_threads(td->groupid);
@@ -1355,7 +1401,7 @@ err:
                log_info("fio: pid=%d, err=%d/%s\n", (int) td->pid, td->error,
                                                        td->verror);
 
-       if (td->o.verify_async)
+       if (o->verify_async)
                verify_async_exit(td);
 
        close_and_free_files(td);
@@ -1372,9 +1418,12 @@ err:
        /*
         * do this very late, it will log file closing as well
         */
-       if (td->o.write_iolog_file)
+       if (o->write_iolog_file)
                write_iolog_close(td);
 
+       fio_mutex_remove(td->rusage_sem);
+       td->rusage_sem = NULL;
+
        td_set_runstate(td, TD_EXITED);
        return (void *) (uintptr_t) td->error;
 }
@@ -1509,6 +1558,12 @@ reaped:
                fio_terminate_threads(TERMINATE_ALL);
 }
 
+static void do_usleep(unsigned int usecs)
+{
+       check_for_running_stats();
+       usleep(usecs);
+}
+
 /*
  * Main function for kicking off and reaping jobs, as needed.
  */
@@ -1628,6 +1683,9 @@ static void run_threads(void)
 
                        init_disk_util(td);
 
+                       td->rusage_sem = fio_mutex_init(FIO_MUTEX_LOCKED);
+                       td->update_rusage = 0;
+
                        /*
                         * Set state to created. Thread will transition
                         * to TD_INITIALIZED when it's done setting up.
@@ -1684,7 +1742,7 @@ static void run_threads(void)
                        if (mtime_since_now(&this_start) > JOB_START_TIMEOUT)
                                break;
 
-                       usleep(100000);
+                       do_usleep(100000);
 
                        for (i = 0; i < this_jobs; i++) {
                                td = map[i];
@@ -1736,12 +1794,12 @@ static void run_threads(void)
                reap_threads(&nr_running, &t_rate, &m_rate);
 
                if (todo)
-                       usleep(100000);
+                       do_usleep(100000);
        }
 
        while (nr_running) {
                reap_threads(&nr_running, &t_rate, &m_rate);
-               usleep(10000);
+               do_usleep(10000);
        }
 
        fio_idle_prof_stop();
@@ -1837,6 +1895,7 @@ int fio_backend(void)
                return 1;
 
        set_genesis_time();
+       stat_init();
        create_disk_util_thread();
 
        cgroup_list = smalloc(sizeof(*cgroup_list));
@@ -1866,5 +1925,6 @@ int fio_backend(void)
        fio_mutex_remove(startup_mutex);
        fio_mutex_remove(writeout_mutex);
        fio_mutex_remove(disk_thread_mutex);
+       stat_exit();
        return exit_value;
 }